消息队列 Kafka 是一个分布式的、高吞吐量、高可扩展性消息队列服务,广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等,是大数据生态中不可或缺的产品之一。Apache Kafka起源于LinkedIn,后来于2011年成为开源Apache项目,然后于2012年成为First-class Apache项目。Kafka是用Scala和Java编写的。 Apache Kafka是基于发布订阅的容错消息系统。 它是快速,可扩展和设计分布。
Kafka 从原理开始
消息队列
一、消息队列简介
首先,我们需要了解一下什么是消息队列
消息队列解释
1.点对点模式 (一对一,消费者主动拉取数据,消息收到后消息清除) 点对点模型通常是一个基于拉取或者轮训的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户端,这个模型的特点是发送到队列的消息被一个且只有一个接收者接瘦处理,即使有多个消息监听者也是如此。 2.发布/订阅模式 (一对多,数据生产后,推送给所有订阅者) 发布订阅模型则是一个机遇推送的消息传送模型,发布订阅类型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接受消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态
两种模式优缺点
为什么要使用消息队列
1)解耦 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。 2)冗余 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列采用的“插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕 3)扩展性 因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可 4)灵活性& 峰值处理能力 (有了集群峰值处理能力自然提升) 5)可恢复性 (数据是有冗余的) 6)顺序保证 7)缓冲 8)异步通信
二、Kafka 介绍
什么是Kafka
在流式计算中,Kafka一般用于来缓存数据,Storm通过消费Kafka的数据进行计算。
Apache Kafka是一个开源消息系统,由Scala写成,是由Apache软件基金会开发的一个开源消息系统项目
Kafka最初由Linkedin公司开发,并与2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
Kafka是一个分布式消息队列。Kafka对消息保存是根据Topic进行归类,发送消息称为Producer,消息接受者称为Consumer,此外Kafka集群有多个Kafka实例组成,每个实例(Server)称为broker
无论是kafka集群,还是consumer都依赖于zookeeper集群,主要保存一些meta信息,来保证系统可用性 [友情提示:新版本的kafka,consumer已经不存储在zookeeper集群里]
Kafka架构图
消费者概念
消费者消费数据也是找的leader,在kafka集群中,消费者和生产者都相当于客户端,只有leader会客户端进行相应。
如果有读请求发给follower,follower会进行读取,但是如果有写请求发送给follower,follwer会转发给leader。当leader处理完之后会将结果在反回给follower,由follower反回给客户端
kafka流程
kafka cluster集群 → Broker节点→ Topic (相当于硬盘) → partition (分区和副本)→ leader、follower处理消费者或者生产者的数据
这里有一个消费者概念需要注意
消费者有一个组的概念,同一个组的消费者不可以同时消费一个分区的数据,结合上图来看,consumerA可以消费Partition0的数据,consumerC也可以消费Partition0的数据,但是consumerB不可以同时消费Partition0的数据。
consumer是可以消费不同分区的数据Partition0/1,一个消费者也是可以消费多个topic的数据
consumer集群:我们可以使用多个线程去读取数据,将多个消费者放在一个消费者组(CG)里面,让一个组里面的不同消费者消费不同分区里的数据
三、Kafka集群搭建
环境说明
CentOS Linux release 7.5.1804 (Core) 10.4.82.125 #kafka 10.4.82.127 #zookeeper kafka 10.4.82.128 #kafka
Kafka官方网站:http://kafka.apache.org
这里需要说的一点,kafka_2.11这个版本为Scala的版本,2.1.1是kafka版本
wget http://mirrors.shu.edu.cn/apache/kafka/2.1.1/kafka_2.11-2.1.1.tgz tar xf kafka_2.11-2.1.1.tgz -C /usr/local ln -s /usr/local/kafka_2.11-2.1.1 /usr/local/kafka #因为我们下载的二进制包,不需要编译,直接mv就可以使用
当我们下载完kafka还需要下载jdk和zookeeper,前面也说了kafka依赖于zookeeper,zookeeper和kafka同时也需要JDK的支持
JDK下载地址
JDK历史版本下载
Java环境安装配置
wget --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" https://download.oracle.com/otn-pub/java/jdk/8u201-b09/42970487e3af4f5aa5bca3f542482c60/jdk-8u201-linux-x64.tar.gz 如果当前下载连接失效,只需要更换后面的下载地址即可 [root@abcdocker ~]# tar xf jdk-8u121-linux-x64.tar.gz -C /usr/local/ root@abcdocker ~]# ln -s /usr/local/jdk1.8.0_121/ /usr/local/jdk [root@abcdocker ~]# vim /etc/profile export JAVA_HOME=/usr/local/jdk export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH export CLASSPATH=.$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib:$JAVA_HOME/lib/tools.jar [root@abcdocker ~]# . /etc/profile [root@abcdocker ~]# java -version java version "1.8.0_121" Java(TM) SE Runtime Environment (build 1.8.0_121-b13) Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)
Zookeeper安装
安装文档:https://i4t.com/2195.html
这里zookeeper不详细说了
# 1.下载zookeeper [root@abcdocker ~]# wget http://www-eu.apache.org/dist/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz [root@abcdocker ~]# tar xf zookeeper-3.4.13.tar.gz -C /usr/local/ [root@abcdocker ~]# ln -s /usr/local/zookeeper-3.4.13/ /usr/local/zookeeper # 2.配置zookeeper [root@abcdocker ~]# cd /usr/local/zookeeper/conf/ [root@abcdocker conf]# cp zoo_sample.cfg zoo.cfg [root@abcdocker conf]# mkdir ../data [root@abcdocker conf]# mkdir /var/log/zookeeper [root@abcdocker conf]# vim zoo.cfg tickTime=2000 initLimit=10 syncLimit=5 dataDir=/usr/local/zookeeper/data/ dataLogDir=/var/log/zookeeper/ clientPort=2181 # 3.启动zookeeper [root@abcdocker ~]# /usr/local/zookeeper/bin/zkServer.sh start JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED [root@abcdocker conf]# ps -ef|grep zook root 2875 1 9 12:31 pts/1 00:00:00 /usr/local/jdk/bin/java -Dzookeeper.log.dir=. -Dzookeeper.root.logger=INFO,CONSOLE -cp /usr/local/zookeeper/bin/../build/classes:/usr/local/zookeeper/bin/../build/lib/*.jar:/usr/local/zookeeper/bin/../lib/slf4j-log4j12-1.7.5.jar:/usr/local/zookeeper/bin/../lib/slf4j-api-1.7.5.jar:/usr/local/zookeeper/bin/../lib/servlet-api-2.5-20081211.jar:/usr/local/zookeeper/bin/../lib/netty-3.7.0.Final.jar:/usr/local/zookeeper/bin/../lib/log4j-1.2.16.jar:/usr/local/zookeeper/bin/../lib/jline-2.11.jar:/usr/local/zookeeper/bin/../lib/jetty-util-6.1.26.jar:/usr/local/zookeeper/bin/../lib/jetty-6.1.26.jar:/usr/local/zookeeper/bin/../lib/javacc.jar:/usr/local/zookeeper/bin/../lib/jackson-mapper-asl-1.9.11.jar:/usr/local/zookeeper/bin/../lib/jackson-core-asl-1.9.11.jar:/usr/local/zookeeper/bin/../lib/commons-cli-1.2.jar:/usr/local/zookeeper/bin/../zookeeper-3.5.0-alpha.jar:/usr/local/zookeeper/bin/../src/java/lib/*.jar:/usr/local/zookeeper/bin/../conf:........:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib/tools.jar:/application/jdk/lib:/application/jdk/jre/lib:/application/jdk/lib/tools.jar:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib/tools.jar:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib/tools.jar:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib/tools.jar:/application/jdk/lib:/application/jdk/jre/lib:/application/jdk/lib/tools.jar:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib/tools.jar:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib/tools.jar -Xmx1000m -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain /usr/local/zookeeper/bin/../conf/zoo.cfg
zookeeper端口为2181,后面有使用到的地方
zookeeper安装完毕,现在我们配置kafka
[root@yzsjhl82-125 config]# cd /usr/local/kafka/config [root@yzsjhl82-125 config]# ll 总用量 68 -rw-r--r-- 1 root root 906 2月 9 02:30 connect-console-sink.properties -rw-r--r-- 1 root root 909 2月 9 02:30 connect-console-source.properties -rw-r--r-- 1 root root 5321 2月 9 02:30 connect-distributed.properties -rw-r--r-- 1 root root 883 2月 9 02:30 connect-file-sink.properties -rw-r--r-- 1 root root 881 2月 9 02:30 connect-file-source.properties -rw-r--r-- 1 root root 1111 2月 9 02:30 connect-log4j.properties -rw-r--r-- 1 root root 2262 2月 9 02:30 connect-standalone.properties -rw-r--r-- 1 root root 1221 2月 9 02:30 consumer.properties -rw-r--r-- 1 root root 4727 2月 9 02:30 log4j.properties -rw-r--r-- 1 root root 1925 2月 9 02:30 producer.properties -rw-r--r-- 1 root root 6851 2月 9 02:30 server.properties #需要修改的配置文件 -rw-r--r-- 1 root root 1032 2月 9 02:30 tools-log4j.properties -rw-r--r-- 1 root root 1169 2月 9 02:30 trogdor.conf -rw-r--r-- 1 root root 1023 2月 9 02:30 zookeeper.properties
server.properties文件修改
[root@i4t config]# grep -Ev "^$|#" server.properties broker.id=3 # 这里是kafka节点名称,不可以重复,在集群内部要唯一 listeners=PLAINTEXT://:9092 #broker的默认端口端口为9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data/logs/kafaka #存储和日志的路径 num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 #kafka存储时间,单位为小时(默认七天) log.segment.bytes=1073741824 #存储文件大小(默认为1个G,超过1个G的数据会被删除) log.retention.check.interval.ms=300000 zookeeper.connect=10.4.82.127:2181 #zookeeper连接地址,集群以逗号分隔 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 delete.topic.enable=true #不添加这个参数,topic只会被标记删除
说完conf目录,下面介绍一下bin目录下的脚本
[root@i4t bin]# tree -L 1 . ├── connect-distributed.sh ├── connect-standalone.sh ├── kafka-acls.sh ├── kafka-broker-api-versions.sh ├── kafka-configs.sh ├── kafka-console-consumer.sh #控制台消费者,主要用于测试 ├── kafka-console-producer.sh #控制台生产者 ├── kafka-consumer-groups.sh ├── kafka-consumer-perf-test.sh ├── kafka-delegation-tokens.sh ├── kafka-delete-records.sh ├── kafka-dump-log.sh ├── kafka-log-dirs.sh ├── kafka-mirror-maker.sh ├── kafka-preferred-replica-election.sh ├── kafka-producer-perf-test.sh ├── kafka-reassign-partitions.sh ├── kafka-replica-verification.sh ├── kafka-run-class.sh ├── kafka-server-start.sh #启动脚本 ├── kafka-server-stop.sh #停止脚本 ├── kafka-streams-application-reset.sh ├── kafka-topics.sh #topic管理脚本,只要是topic相关的,都要运行这个脚本 ├── kafka-verifiable-consumer.sh ├── kafka-verifiable-producer.sh ├── trogdor.sh ├── windows ├── zookeeper-security-migration.sh ├── zookeeper-server-start.sh ├── zookeeper-server-stop.sh └── zookeeper-shell.sh
启动kafka
在启动kafka之前,要确保zookeeper是正常的,否则连接会出现问题
[root@i4t ~]# /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &>/data/logs/kafaka/kafka.log & 启动的时候我们需要指定server.properties配置文件 默认日志输出到前台,我们程序使用后台启动日志输出到指定目录 另外一种启动方式 [root@i4t ~]# /usr/local/kafka/bin/kafka-server-start.sh -daemon config/server.properties
ID为3的server已经启动
[root@i4t kafaka]# netstat -lntup|grep 9092 tcp6 0 0 :::9092 :::* LISTEN 21529/java
kafka常见命令创建
创建topic
[root@i4t ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 10.4.82.127:2181 --partitions 2 --replication-factor 2 --topic one Created topic "one". #这里显示我们已经创建名称为one的topic,分区数和副本数为2 --create 创建topics --zookeeper zookeeper地址 --partitions 分区数 --replication-factor 副本数 --topic topic名称
这里的副本数只可以根据broker的数量创建
[root@i4t kafka]# ./bin/kafka-topics.sh --create --zookeeper 10.4.82.127:2181 --partitions 2 --replication-factor 4 --topic two Error while executing topic command : Replication factor: 4 larger than available brokers: 3. [2019-03-20 16:41:05,012] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 4 larger than available brokers: 3. (kafka.admin.TopicCommand$) #在kafka上,如果节点(broker)达不到设置的副本数,将无法进行创建。
查看当前服务器中所有的topic
[root@i4t ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 10.4.82.127:2181 one
我们可以去我们创建的目录查看一下
[root@i4t kafka]# ll /data/logs/kafka/ 总用量 48 -rw-r--r-- 1 root root 0 3月 20 15:07 cleaner-offset-checkpoint -rw-r--r-- 1 root root 4 3月 20 16:34 log-start-offset-checkpoint -rw-r--r-- 1 root root 54 3月 20 15:07 meta.properties drwxr-xr-x 2 root root 137 3月 20 16:30 one-0 ##这里是分区 drwxr-xr-x 2 root root 137 3月 20 16:30 one-1 #我们创建了2个分区,所以这里都是2个 -rw-r--r-- 1 root root 29808 3月 20 16:34 out.log -rw-r--r-- 1 root root 20 3月 20 16:34 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 20 3月 20 16:35 replication-offset-checkpoint 另一个集群同样也是2个,起到一个冗余的作用
发送消息
[root@i4t ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 10.4.82.127:9092 --topic one >i4t.com > # 参数解释 kafka-console-producer.sh 生产者脚本 --broker-list borker节点(生产者连接broker,消费者连接zookeeper) --topic topic名称
现在我们已经创建好生产者,但是现在没有消费者,我们在>符号下面输入完成后窗口不关闭,在打开一台当做消费者,进行模拟测试
在我们集群中任意启动一台当做消费者都是可以的
消费信息
旧版本 /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 10.4.82.127:2181 --topic one 新版本 [root@i4t kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server 10.4.82.127:9092 --from-beginning --topic one i4t.com #参数解释 --bootstrap-server 如果使用了--bootstrap-server参数,那么consumer的信息将会存放在kafka之中.新版本推荐存储在--bootstrap-server 10.4.82.127:9092 这里的端口是kafka地址 --from-beginning 控制台consumer只获取最新的数据,如果需要以前所有的数据需要加上--from-beginning
在kafka新版本中,consumer已经将数据存储在kafka本地
[root@i4t ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 10.4.82.127:2181 __consumer_offsets one 我们查看topic的时候,发现新创建了一个名称为__consumer_offsets,出现这个topic的原因是我们将consumer连接到kafka上。这个topic保存我们的访问时间
查看Topic的详情
[root@yzsjhl82-127 ~]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper 10.4.82.127:2181 --describe --topic one Topic:one PartitionCount:2 ReplicationFactor:2 Configs: Topic: one Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1 Topic: one Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1 #参数解释 Tocpic:One #Topic名称 PartitionCount:2 #分区数 ReplicationFactor:2 #副本数 Configs: #其它配置 Topic: one Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1 名称 分区 这里的leader1就是borker1 副本(数据)是我们存储目录里的-位数 Isr选举, 生产者会把数据写入到leader里,follower和其他副本都是去leader上拉取数据。Isr是根据和leader数据相差最小排在最前面,第一个是0代表leader,第二或者第三个就是按照和leader差异大小排列
删除topic
[root@i4t ~]# /usr/local/kafka/bin/kafka-topic.sh --zookeeper 10.4.82.127:2181 --delete --topic one Topic one is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. --delete 删除topic参数 --topic 后面接topic名称,不需要指定分区 #删除的时候提示我们,没有进行物理删除,而是进行标记删除。我们只需要在server.properties添加delete.topic.enable=true重启kafka即可 例子:删除topic [root@i4t kafka]# ./bin/kafka-topics.sh --list --zookeeper 10.4.82.127:2181 __consumer_offsets one [root@i4t kafka]# ./bin/kafka-topics.sh --delete --zookeeper 10.4.82.127:2181 --topic one Topic one is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. [root@i4t kafka]# ./bin/kafka-topics.sh --list --zookeeper 10.4.82.127:2181 __consumer_offsets #删除以后还是会提示我们没有开启delete.topic.enable=true,只要我们查看topic没有one即可 删除topic时,同时也会将/data/logs/kafka里面的one-(1-2)给删除,这里简单说一下one-(1-2)其实就是one下的2个分区
我们在新建一个
[root@i4t kafka]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 10.4.82.127:2181 --partitions 1 --replication-factor 3 --topic one Created topic "one". 我们这次创建了一个分区,所以集群的服务器上在/data目录只会有一个分区(因为是3个副本),不会出现one-1或者是one-2 [root@i4t kafka]# ll /data/logs/kafka/ 总用量 96 -rw-r--r-- 1 root root 4 3月 25 14:49 cleaner-offset-checkpoint drwxr-xr-x 2 root root 137 3月 21 18:02 __consumer_offsets-11 drwxr-xr-x 2 root root 137 3月 21 18:02 __consumer_offsets-14 drwxr-xr-x 2 root root 137 3月 21 18:02 __consumer_offsets-17 drwxr-xr-x 2 root root 137 3月 21 18:02 __consumer_offsets-2 drwxr-xr-x 2 root root 137 3月 21 18:02 __consumer_offsets-20 drwxr-xr-x 2 root root 137 3月 21 18:02 __consumer_offsets-23 drwxr-xr-x 2 root root 137 3月 21 18:02 __consumer_offsets-26 drwxr-xr-x 2 root root 137 3月 21 18:02 __consumer_offsets-29 drwxr-xr-x 2 root root 137 3月 21 18:02 __consumer_offsets-32 drwxr-xr-x 2 root root 137 3月 21 18:02 __consumer_offsets-35 drwxr-xr-x 2 root root 137 3月 21 18:02 __consumer_offsets-38 drwxr-xr-x 2 root root 137 3月 21 18:02 __consumer_offsets-41 drwxr-xr-x 2 root root 137 3月 21 18:02 __consumer_offsets-44 drwxr-xr-x 2 root root 137 3月 21 18:02 __consumer_offsets-47 drwxr-xr-x 2 root root 137 3月 21 18:02 __consumer_offsets-5 drwxr-xr-x 2 root root 137 3月 21 18:02 __consumer_offsets-8 -rw-r--r-- 1 root root 4 3月 25 14:54 log-start-offset-checkpoint -rw-r--r-- 1 root root 54 3月 20 15:07 meta.properties drwxr-xr-x 2 root root 137 3月 25 14:53 one-0 -rw-r--r-- 1 root root 73881 3月 21 17:59 out.log -rw-r--r-- 1 root root 394 3月 25 14:54 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 394 3月 25 14:55 replication-offset-checkpoint
通过zookeeper查看kafka集群
/usr/local/zookeeper/bin/zkCli.sh [zk: localhost:2181(CONNECTED) 0] ls /brokers/ids #查看broker的id [0] [zk: localhost:2181(CONNECTED) 1] ls /brokers/topics #查看消息 [k8s-fb-staging-log4j-tomcat-renren-jinkong-kylin-standard-server
相关文章:
- ZooKeeper 配置与安装
- 自动化运维工具之–Cobbler
- Kubernetes 1.14 二进制集群安装
- Nagios 配置及监控