登录 |  注册 |  繁體中文


Apache Kafka详解

分类: 人工智能&大数据 颜色:橙色 默认  字号: 阅读(2475) | 评论(0)

Apache Kafka可以帮助你解决在发布/订阅架构中遇到消费数百万消息的问题。


Kafka是一个实时消息传输的解决方案,可处理大量实时信息,并把这些信息快速路由到各种消费者。Kafka提供了信息生产者和消费者之间的无缝集成,无需对生产者的信息进行阻塞,也无需告诉生产者那些消费者的位置。

Apache Kafka是一个开源、分布式的消息发布/订阅系统,其主要设计特性如下:
1)消息持久化
要从大数据中获取真正的价值,那么不能丢失任何信息。Apache Kafka设计上是时间复杂度O(1)的磁盘结构,它提供了常量时间的性能,即使是存储海量的信息(TB级)。
2)高吞吐
记住大数据,Kafka的设计是工作在标准硬件之上,支持每秒数百万的消息。
3)分布式
Kafka明确支持在Kafka服务器上的消息分区,以及在消费机器集群上的分发消费,维护每个分区的排序语义。
4)多客户端支持
Kafka系统支持与来自不同平台(如java、.NET、PHP、Ruby或Python等)的客户端相集成。
5)实时
生产者线程产生的消息对消费者线程应该立即可见,此特性对基于事件的系统(比如CEP系统)是至关重要的。
注:CEP即Complex Event Processing,复杂事件处理。

Apache Kafka提供了一个实时的发布/订阅解决方案,它客服了消费实时大数据的挑战,这些数据量可能在数量级的增长、真实的数据。Kafka还支持在Hadoop系统上做并行数据载入。

二. 几个基本概念:
 
Topic:特指Kafka处理的消息源(feeds of messages)的不同分类。
Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
Message:消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。
Producers:消息和数据生产者,向Kafka的一个topic发布消息的过程叫做producers。
Consumers:消息和数据消费者,订阅topics并处理其发布的消息的过程叫做consumers。
Broker:缓存代理,Kafa集群中的一台或多台服务器统称为broker。


下面的视图显示了Apache Kafka消息系统支持的一个典型的大数据汇聚和分析的场景:



在生产者端,有数种不同的生产者:
1)前端Web应用产生的应用日志
2)生产者代理产生的Web分析日志
3)生产者适配器产生的传输日志
4)生产者服务产生的调用跟踪日志

在消费者端,有数种不同的消费者:
1)离线消费者:消费消息并在Hadoop或传统的数据仓库中存储消息用于离线分析
2)近乎实时的消费者:消费消息并在任意NoSQL数据库(如HBase或Cassandra)中存储消息用于近实时分析
3)实时消费者:在内存数据库中过滤消息,并在相关的群组中触发警告事件

Kafka集群保持所有的消息,直到它们过期, 无论消息是否被消费了。
每一个分区(partition)都是一个顺序的、不可变的消息队列, 并且可以持续的添加。分区中的消息都被分配了一个序列号,称之为偏移量(offset),在每个分区中此偏移量都是唯一的。实际上消费者所持有的仅有的元数据就是这个偏移量,也就是消费者在这个log中的位置。 这个偏移量由消费者控制:正常情况当消费者消费消息的时候,偏移量也线性的的增加。但是实际偏移量由消费者控制,消费者可以将偏移量重置为更老的一个偏移量,重新读取消息。

Kafka可以用于:
    消息系统, 例如ActiveMQ 和 RabbitMQ.
    站点的用户活动追踪。 用来记录用户的页面浏览,搜索,点击等。
    操作审计。 用户/管理员的网站操作的监控。
    日志聚合。收集数据,集中处理。
    流处理。    
    Commit Log
 
三. offset管理
auto.commit.enable 默认为true, 也就是offset定时保存到zookeeper里,下次读取的时候,从上次commit的offset读取。
auto.commit.interval.ms为多久提交一次offset, 默认1分钟
auto.offset.reset  默认为largest
       此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始消费.
       largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的开始位置消费所有消息.  
 

四. consumer和partition

  1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
  2. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀
  3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
  4. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
  5. High-level接口中获取不到数据的时候是会block的

负载低的情况下可以每个线程消费多个partition。但负载高的情况下,Consumer 线程数最好和Partition数量保持一致。如果还是消费不过来,应该再开 Consumer 进程,进程内线程数同样和分区数一致。

 
kafka安装配置参考:
 
1、下载安装KAFKA
$ wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz
 
解压并进入目录:
 
$ tar -xzvf kafka_2.11-0.8.2.1.tgz
$ cd kafka_2.11-0.8.2.1
 
运行 kafka ,需要依赖 zookeeper,你可以使用已有的 zookeeper 集群或者利用 kafka 提供的脚本启动一个 zookeeper 实例
 
启动Kafka server:
 
$ bin/kafka-server-start.sh config/server.properties &  
 
停止 Kafka server :
 
$ bin/kafka-server-stop.sh
 
2、单 broker 测试
 
2.1 新建一个TOPIC(replication-factor=num of brokers)
$ kafka-topics.sh --create --topic test --replication-factor 1 --partitions 1 --zookeeper localhost:2181
 
2.2 查看topic
$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
$ ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test   #查看test topic的情况
 
删除topic
./bin/kafka-topics.sh  --delete --topic test  --zookeeper hd1:2181
 
2.3 在启动 kafka-server 之后启动,运行producer:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test  
 
2.4 在另一个终端运行 consumer:
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning  
 
在 producer 端输入字符串并回车,查看 consumer 端是否显示。
 
 
3、多 broker,即Kafka集群配置 测试
 
2)配置$KAFKA_HOME/config/server.properties
 
我们安装3个broker,分别在3个vm上:zk1,zk2,zk3:
 
zk1:
 
$ vi $KAFKA_HOME/config/server.properties 
broker.id=0
port=9092
host.name=zk1
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
 
zk2:
 
broker.id=1
port=9092
host.name=zk2
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
 
zk3: 
 
broker.id=2
port=9092
host.name=zk3
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
 
 
3)启动zookeeper服务, 在zk1,zk2,zk3上分别运行: 
$ zkServer.sh start
 
4)启动kafka服务, 在zk1,zk2,zk3上分别运行:
$ kafka-server-start.sh  ../config/server.properties
 
5) 新建一个TOPIC(replication-factor=num of brokers)
$ kafka-topics.sh --create --topic test --replication-factor 3 --partitions 2 --zookeeper zk1:2181
 
6)假设我们在zk2上,开一个终端,发送消息至kafka(zk2模拟producer)
$ kafka-console-producer.sh --broker-list zk1:9092 --sync --topic test
 
在发送消息的终端输入:Hello Kafka
 
7)假设我们在zk3上,开一个终端,显示消息的消费(zk3模拟consumer)
 
$ kafka-console-consumer.sh --zookeeper zk1:2181 --topic test --from-beginning
 
 
将日志推送到 kafka
 
例如,将 apache 或者 nginx 或者 tomcat 等产生的日志 push 到 kafka,只需要执行下面代码即可:
 
$ tail -n 0 -f  /var/log/nginx/access.log | bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --sync --topic zerg.hydra
 
 
Kafka文件存储机制 请参考 Kafka文件存储机制

 



上一篇:Kafka文件存储机制   下一篇:php declare详解

姓 名: *
邮 箱:
内 容: *
验证码: 点击刷新 *   

回到顶部