消息队列 Kafka 未完

2023年 5月 4日 31.2k 0

消息队列 Kafka 是一个分布式的、高吞吐量、高可扩展性消息队列服务,广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等,是大数据生态中不可或缺的产品之一。Apache Kafka起源于LinkedIn,后来于2011年成为开源Apache项目,然后于2012年成为First-class Apache项目。Kafka是用Scala和Java编写的。 Apache Kafka是基于发布订阅的容错消息系统。 它是快速,可扩展和设计分布。
Kafka 从原理开始
消息队列

  • Kafka 从原理开始一、消息队列简介
  • 二、Kafka 介绍
  • 三、Kafka集群搭建kafka常见命令创建
  • 一、消息队列简介

    首先,我们需要了解一下什么是消息队列
    image_1d6drev25flv10101no21i481nhn9.png-126.8kB
    消息队列解释

    1.点对点模式 (一对一,消费者主动拉取数据,消息收到后消息清除)
    点对点模型通常是一个基于拉取或者轮训的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户端,这个模型的特点是发送到队列的消息被一个且只有一个接收者接瘦处理,即使有多个消息监听者也是如此。
    
    2.发布/订阅模式 (一对多,数据生产后,推送给所有订阅者)
    发布订阅模型则是一个机遇推送的消息传送模型,发布订阅类型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接受消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态
    

    两种模式优缺点

  • 主动模式缺点
  • 客户端B需要有一个线程实时监控队列(Queue)里的数据,因为不确定什么时候有数据。相反订阅模式不需要客户端知道是否有数据,有数据写入时,会由队列进行发送
  • 订阅模式的缺点
  • 客户端订阅会节省进程,但是还会有一个问题,拉取速度是由客户端控制,客户端的带宽限制不相同,所以推送速度需要统一。例如客户端C是2M,客户端B是10M,就会产生状态不一致。有可能丢数据等问题
  • 为什么要使用消息队列

    1)解耦
    允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
    2)冗余
    消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列采用的“插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕
    3)扩展性
    因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可
    4)灵活性& 峰值处理能力  (有了集群峰值处理能力自然提升)
    5)可恢复性  (数据是有冗余的)
    6)顺序保证
    7)缓冲
    8)异步通信
    

    二、Kafka 介绍

    什么是Kafka
    在流式计算中,Kafka一般用于来缓存数据,Storm通过消费Kafka的数据进行计算。
    Apache Kafka是一个开源消息系统,由Scala写成,是由Apache软件基金会开发的一个开源消息系统项目
    Kafka最初由Linkedin公司开发,并与2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
    Kafka是一个分布式消息队列。Kafka对消息保存是根据Topic进行归类,发送消息称为Producer,消息接受者称为Consumer,此外Kafka集群有多个Kafka实例组成,每个实例(Server)称为broker
    无论是kafka集群,还是consumer都依赖于zookeeper集群,主要保存一些meta信息,来保证系统可用性 [友情提示:新版本的kafka,consumer已经不存储在zookeeper集群里]
    Kafka架构图
    image_1d6dtqunhfujpg4hdl13en1jse13.png-51.7kB
    QQ20190321-0.png-469.6kB

  • Producer 发送消息的称为: 生产者,向Kafka Broker发消息的客户端
  • Consumer 消息消息的称为: 消费者,向Kafka Broker取消息的客户端 (新版本的consumer信息已经不存储在zookeeper里,是保存在kafka。后面安装会有提示)
  • Topic 相当于硬盘,broker相当于服务器,需要使用topic(硬盘)存储数据
  • Consumer Group (CG);这是kafka用来实现一个topic消息的广播(发送给所有的consumer)和单播(发给任意一个consumer)的手段。一个Topic可以有多个CG。topic的消息会复制(不是真正意义上的复制)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分区而不需要多次发送消息到不同的Topic
  • Broker 一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic
  • Partition 为了实现扩展性,一个非常大的topic可以分布到多个brocker上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序
  • offset kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。
  • Replication 相当于一个分区
  • leader 一个分区一个leader,无论生产还是消费都要找leader
  • follower 备份作用,当leader挂掉后,follower会自动升级为leader。follower和leader不会在一台服务器上,是出现在2台服务器上,以保证数据的冗余
  • 分区和副本都是我们创建topic自己指定,可以只有一个分区
  • zookeeper 这里使用zookeeper主要是用于consumer存储数据推送的时间,以及kafka集群信息(在新版本里面,2.11 consumer的数据已经不存在zookeeper里)
  • 消费者概念
    消费者消费数据也是找的leader,在kafka集群中,消费者和生产者都相当于客户端,只有leader会客户端进行相应。
    如果有读请求发给follower,follower会进行读取,但是如果有写请求发送给follower,follwer会转发给leader。当leader处理完之后会将结果在反回给follower,由follower反回给客户端
    kafka流程
    kafka cluster集群 → Broker节点→ Topic (相当于硬盘) → partition (分区和副本)→ leader、follower处理消费者或者生产者的数据
    这里有一个消费者概念需要注意
    消费者有一个组的概念,同一个组的消费者不可以同时消费一个分区的数据,结合上图来看,consumerA可以消费Partition0的数据,consumerC也可以消费Partition0的数据,但是consumerB不可以同时消费Partition0的数据。
    consumer是可以消费不同分区的数据Partition0/1,一个消费者也是可以消费多个topic的数据
    consumer集群:我们可以使用多个线程去读取数据,将多个消费者放在一个消费者组(CG)里面,让一个组里面的不同消费者消费不同分区里的数据

    三、Kafka集群搭建

    环境说明

    CentOS Linux release 7.5.1804 (Core)
    10.4.82.125     #kafka
    10.4.82.127     #zookeeper  kafka
    10.4.82.128     #kafka
    

    Kafka官方网站:http://kafka.apache.org
    image_1d6ak5j191sh78ud1clkkcv1fbr9.png-172.6kB
    这里需要说的一点,kafka_2.11这个版本为Scala的版本,2.1.1是kafka版本

    wget http://mirrors.shu.edu.cn/apache/kafka/2.1.1/kafka_2.11-2.1.1.tgz 
    tar xf kafka_2.11-2.1.1.tgz   -C /usr/local
    ln -s /usr/local/kafka_2.11-2.1.1 /usr/local/kafka
    
    #因为我们下载的二进制包,不需要编译,直接mv就可以使用
    

    当我们下载完kafka还需要下载jdk和zookeeper,前面也说了kafka依赖于zookeeper,zookeeper和kafka同时也需要JDK的支持
    JDK下载地址
    JDK历史版本下载
    image_1d6f81ceq15ibf6k12mv1dkdqpo25.png-308.3kB
    Java环境安装配置

    wget --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" https://download.oracle.com/otn-pub/java/jdk/8u201-b09/42970487e3af4f5aa5bca3f542482c60/jdk-8u201-linux-x64.tar.gz
    
    如果当前下载连接失效,只需要更换后面的下载地址即可
    
    [root@abcdocker ~]# tar xf jdk-8u121-linux-x64.tar.gz -C /usr/local/
    root@abcdocker ~]# ln -s /usr/local/jdk1.8.0_121/ /usr/local/jdk
    [root@abcdocker ~]# vim /etc/profile
    export JAVA_HOME=/usr/local/jdk
    export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH
    export CLASSPATH=.$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib:$JAVA_HOME/lib/tools.jar
    [root@abcdocker ~]# . /etc/profile
    [root@abcdocker ~]# java -version
    java version "1.8.0_121"
    Java(TM) SE Runtime Environment (build 1.8.0_121-b13)
    Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)
    

    Zookeeper安装
    安装文档:https://i4t.com/2195.html
    这里zookeeper不详细说了

    # 1.下载zookeeper
    
    [root@abcdocker ~]# wget http://www-eu.apache.org/dist/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz
    [root@abcdocker ~]# tar xf zookeeper-3.4.13.tar.gz -C /usr/local/
    [root@abcdocker ~]# ln -s /usr/local/zookeeper-3.4.13/ /usr/local/zookeeper
    
    # 2.配置zookeeper
    [root@abcdocker ~]# cd /usr/local/zookeeper/conf/
    [root@abcdocker conf]# cp zoo_sample.cfg zoo.cfg
    [root@abcdocker conf]# mkdir ../data
    [root@abcdocker conf]# mkdir /var/log/zookeeper
    [root@abcdocker conf]# vim zoo.cfg 
    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/usr/local/zookeeper/data/
    dataLogDir=/var/log/zookeeper/
    clientPort=2181
    
    # 3.启动zookeeper 
    [root@abcdocker ~]# /usr/local/zookeeper/bin/zkServer.sh start
    JMX enabled by default
    Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED
    
    [root@abcdocker conf]# ps -ef|grep zook
    root       2875      1  9 12:31 pts/1    00:00:00 /usr/local/jdk/bin/java -Dzookeeper.log.dir=. -Dzookeeper.root.logger=INFO,CONSOLE -cp /usr/local/zookeeper/bin/../build/classes:/usr/local/zookeeper/bin/../build/lib/*.jar:/usr/local/zookeeper/bin/../lib/slf4j-log4j12-1.7.5.jar:/usr/local/zookeeper/bin/../lib/slf4j-api-1.7.5.jar:/usr/local/zookeeper/bin/../lib/servlet-api-2.5-20081211.jar:/usr/local/zookeeper/bin/../lib/netty-3.7.0.Final.jar:/usr/local/zookeeper/bin/../lib/log4j-1.2.16.jar:/usr/local/zookeeper/bin/../lib/jline-2.11.jar:/usr/local/zookeeper/bin/../lib/jetty-util-6.1.26.jar:/usr/local/zookeeper/bin/../lib/jetty-6.1.26.jar:/usr/local/zookeeper/bin/../lib/javacc.jar:/usr/local/zookeeper/bin/../lib/jackson-mapper-asl-1.9.11.jar:/usr/local/zookeeper/bin/../lib/jackson-core-asl-1.9.11.jar:/usr/local/zookeeper/bin/../lib/commons-cli-1.2.jar:/usr/local/zookeeper/bin/../zookeeper-3.5.0-alpha.jar:/usr/local/zookeeper/bin/../src/java/lib/*.jar:/usr/local/zookeeper/bin/../conf:........:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib/tools.jar:/application/jdk/lib:/application/jdk/jre/lib:/application/jdk/lib/tools.jar:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib/tools.jar:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib/tools.jar:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib/tools.jar:/application/jdk/lib:/application/jdk/jre/lib:/application/jdk/lib/tools.jar:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib/tools.jar:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib/tools.jar -Xmx1000m -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain /usr/local/zookeeper/bin/../conf/zoo.cfg
    

    zookeeper端口为2181,后面有使用到的地方
    zookeeper安装完毕,现在我们配置kafka

    [root@yzsjhl82-125 config]# cd /usr/local/kafka/config
    
    [root@yzsjhl82-125 config]# ll
    总用量 68
    -rw-r--r-- 1 root root  906 2月   9 02:30 connect-console-sink.properties
    -rw-r--r-- 1 root root  909 2月   9 02:30 connect-console-source.properties
    -rw-r--r-- 1 root root 5321 2月   9 02:30 connect-distributed.properties
    -rw-r--r-- 1 root root  883 2月   9 02:30 connect-file-sink.properties
    -rw-r--r-- 1 root root  881 2月   9 02:30 connect-file-source.properties
    -rw-r--r-- 1 root root 1111 2月   9 02:30 connect-log4j.properties
    -rw-r--r-- 1 root root 2262 2月   9 02:30 connect-standalone.properties
    -rw-r--r-- 1 root root 1221 2月   9 02:30 consumer.properties
    -rw-r--r-- 1 root root 4727 2月   9 02:30 log4j.properties
    -rw-r--r-- 1 root root 1925 2月   9 02:30 producer.properties
    -rw-r--r-- 1 root root 6851 2月   9 02:30 server.properties     #需要修改的配置文件
    -rw-r--r-- 1 root root 1032 2月   9 02:30 tools-log4j.properties
    -rw-r--r-- 1 root root 1169 2月   9 02:30 trogdor.conf
    -rw-r--r-- 1 root root 1023 2月   9 02:30 zookeeper.properties
    

    server.properties文件修改

    [root@i4t config]# grep -Ev "^$|#" server.properties
    broker.id=3           # 这里是kafka节点名称,不可以重复,在集群内部要唯一
    listeners=PLAINTEXT://:9092    #broker的默认端口端口为9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/data/logs/kafaka       #存储和日志的路径
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168                     #kafka存储时间,单位为小时(默认七天)
    log.segment.bytes=1073741824            #存储文件大小(默认为1个G,超过1个G的数据会被删除)
    log.retention.check.interval.ms=300000
    zookeeper.connect=10.4.82.127:2181          #zookeeper连接地址,集群以逗号分隔
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    delete.topic.enable=true            #不添加这个参数,topic只会被标记删除
    

    说完conf目录,下面介绍一下bin目录下的脚本

    [root@i4t bin]# tree  -L 1
    .
    ├── connect-distributed.sh
    ├── connect-standalone.sh
    ├── kafka-acls.sh
    ├── kafka-broker-api-versions.sh
    ├── kafka-configs.sh
    ├── kafka-console-consumer.sh           #控制台消费者,主要用于测试
    ├── kafka-console-producer.sh           #控制台生产者
    ├── kafka-consumer-groups.sh
    ├── kafka-consumer-perf-test.sh
    ├── kafka-delegation-tokens.sh
    ├── kafka-delete-records.sh
    ├── kafka-dump-log.sh
    ├── kafka-log-dirs.sh
    ├── kafka-mirror-maker.sh
    ├── kafka-preferred-replica-election.sh
    ├── kafka-producer-perf-test.sh
    ├── kafka-reassign-partitions.sh
    ├── kafka-replica-verification.sh
    ├── kafka-run-class.sh
    ├── kafka-server-start.sh               #启动脚本
    ├── kafka-server-stop.sh                #停止脚本
    ├── kafka-streams-application-reset.sh
    ├── kafka-topics.sh                     #topic管理脚本,只要是topic相关的,都要运行这个脚本
    ├── kafka-verifiable-consumer.sh
    ├── kafka-verifiable-producer.sh
    ├── trogdor.sh
    ├── windows
    ├── zookeeper-security-migration.sh
    ├── zookeeper-server-start.sh
    ├── zookeeper-server-stop.sh
    └── zookeeper-shell.sh
    

    启动kafka
    在启动kafka之前,要确保zookeeper是正常的,否则连接会出现问题

    [root@i4t ~]# /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &>/data/logs/kafaka/kafka.log &
    
    启动的时候我们需要指定server.properties配置文件
    默认日志输出到前台,我们程序使用后台启动日志输出到指定目录
    
    另外一种启动方式
    [root@i4t ~]# /usr/local/kafka/bin/kafka-server-start.sh -daemon config/server.properties
    

    ID为3的server已经启动

    [root@i4t kafaka]# netstat -lntup|grep 9092
    tcp6       0      0 :::9092                 :::*                    LISTEN      21529/java
    

    image_1d6fib9111be21pds158263u8p39.png-954.2kB

    kafka常见命令创建

    创建topic

    [root@i4t ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 10.4.82.127:2181 --partitions 2 --replication-factor 2  --topic one
    Created topic "one".
    
    #这里显示我们已经创建名称为one的topic,分区数和副本数为2
    --create 创建topics
    --zookeeper  zookeeper地址
    --partitions  分区数
    --replication-factor 副本数
    --topic     topic名称
    

    这里的副本数只可以根据broker的数量创建

    [root@i4t kafka]# ./bin/kafka-topics.sh --create --zookeeper 10.4.82.127:2181 --partitions 2 --replication-factor 4  --topic two
    Error while executing topic command : Replication factor: 4 larger than available brokers: 3.
    [2019-03-20 16:41:05,012] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 4 larger than available brokers: 3.
     (kafka.admin.TopicCommand$)
    
    #在kafka上,如果节点(broker)达不到设置的副本数,将无法进行创建。
    

    查看当前服务器中所有的topic

    [root@i4t ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 10.4.82.127:2181
    one
    

    我们可以去我们创建的目录查看一下

    [root@i4t kafka]# ll /data/logs/kafka/
    总用量 48
    -rw-r--r-- 1 root root     0 3月  20 15:07 cleaner-offset-checkpoint
    -rw-r--r-- 1 root root     4 3月  20 16:34 log-start-offset-checkpoint
    -rw-r--r-- 1 root root    54 3月  20 15:07 meta.properties
    drwxr-xr-x 2 root root   137 3月  20 16:30 one-0         ##这里是分区
    drwxr-xr-x 2 root root   137 3月  20 16:30 one-1            #我们创建了2个分区,所以这里都是2个
    -rw-r--r-- 1 root root 29808 3月  20 16:34 out.log
    -rw-r--r-- 1 root root    20 3月  20 16:34 recovery-point-offset-checkpoint
    -rw-r--r-- 1 root root    20 3月  20 16:35 replication-offset-checkpoint
    
    另一个集群同样也是2个,起到一个冗余的作用
    

    发送消息

    [root@i4t ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 10.4.82.127:9092 --topic one
    >i4t.com
    >
    
    # 参数解释
    kafka-console-producer.sh   生产者脚本
    --broker-list               borker节点(生产者连接broker,消费者连接zookeeper)
    --topic    topic名称
    

    现在我们已经创建好生产者,但是现在没有消费者,我们在>符号下面输入完成后窗口不关闭,在打开一台当做消费者,进行模拟测试
    在我们集群中任意启动一台当做消费者都是可以的
    消费信息

    旧版本
    /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 10.4.82.127:2181 --topic one
    
    新版本
    [root@i4t kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server 10.4.82.127:9092 --from-beginning --topic one
    
    i4t.com
    
    
    #参数解释
    --bootstrap-server  如果使用了--bootstrap-server参数,那么consumer的信息将会存放在kafka之中.新版本推荐存储在--bootstrap-server
    10.4.82.127:9092    这里的端口是kafka地址
    --from-beginning   控制台consumer只获取最新的数据,如果需要以前所有的数据需要加上--from-beginning
    

    在kafka新版本中,consumer已经将数据存储在kafka本地

    [root@i4t ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 10.4.82.127:2181
    __consumer_offsets
    one
    
    我们查看topic的时候,发现新创建了一个名称为__consumer_offsets,出现这个topic的原因是我们将consumer连接到kafka上。这个topic保存我们的访问时间
    

    查看Topic的详情

    [root@yzsjhl82-127 ~]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper 10.4.82.127:2181 --describe --topic one
    Topic:one   PartitionCount:2    ReplicationFactor:2 Configs:
        Topic: one  Partition: 0    Leader: 1   Replicas: 1,0   Isr: 0,1
        Topic: one  Partition: 1    Leader: 0   Replicas: 0,1   Isr: 0,1
    
    #参数解释
    
    
    Tocpic:One      #Topic名称
    PartitionCount:2    #分区数
    ReplicationFactor:2     #副本数
    Configs:            #其它配置
    
    Topic: one  Partition: 0    Leader: 1   Replicas: 1,0   Isr: 0,1
    名称         分区  这里的leader1就是borker1   副本(数据)是我们存储目录里的-位数  Isr选举,
    
    
    生产者会把数据写入到leader里,follower和其他副本都是去leader上拉取数据。Isr是根据和leader数据相差最小排在最前面,第一个是0代表leader,第二或者第三个就是按照和leader差异大小排列
    

    删除topic

    [root@i4t ~]# /usr/local/kafka/bin/kafka-topic.sh --zookeeper 10.4.82.127:2181 --delete --topic one
    Topic one is marked for deletion.
    Note: This will have no impact if delete.topic.enable is not set to true.
    
    --delete  删除topic参数
    --topic   后面接topic名称,不需要指定分区
    
    #删除的时候提示我们,没有进行物理删除,而是进行标记删除。我们只需要在server.properties添加delete.topic.enable=true重启kafka即可
    
    
    例子:删除topic
    [root@i4t kafka]# ./bin/kafka-topics.sh --list --zookeeper 10.4.82.127:2181
    __consumer_offsets
    one
    [root@i4t kafka]# ./bin/kafka-topics.sh --delete --zookeeper 10.4.82.127:2181 --topic one
    Topic one is marked for deletion.
    Note: This will have no impact if delete.topic.enable is not set to true.
    [root@i4t kafka]# ./bin/kafka-topics.sh --list --zookeeper 10.4.82.127:2181
    __consumer_offsets
    
    #删除以后还是会提示我们没有开启delete.topic.enable=true,只要我们查看topic没有one即可
    删除topic时,同时也会将/data/logs/kafka里面的one-(1-2)给删除,这里简单说一下one-(1-2)其实就是one下的2个分区
    

    我们在新建一个

    [root@i4t kafka]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 10.4.82.127:2181 --partitions 1 --replication-factor 3 --topic one
    Created topic "one".
    
    我们这次创建了一个分区,所以集群的服务器上在/data目录只会有一个分区(因为是3个副本),不会出现one-1或者是one-2
    
    [root@i4t kafka]# ll /data/logs/kafka/
    总用量 96
    -rw-r--r-- 1 root root     4 3月  25 14:49 cleaner-offset-checkpoint
    drwxr-xr-x 2 root root   137 3月  21 18:02 __consumer_offsets-11
    drwxr-xr-x 2 root root   137 3月  21 18:02 __consumer_offsets-14
    drwxr-xr-x 2 root root   137 3月  21 18:02 __consumer_offsets-17
    drwxr-xr-x 2 root root   137 3月  21 18:02 __consumer_offsets-2
    drwxr-xr-x 2 root root   137 3月  21 18:02 __consumer_offsets-20
    drwxr-xr-x 2 root root   137 3月  21 18:02 __consumer_offsets-23
    drwxr-xr-x 2 root root   137 3月  21 18:02 __consumer_offsets-26
    drwxr-xr-x 2 root root   137 3月  21 18:02 __consumer_offsets-29
    drwxr-xr-x 2 root root   137 3月  21 18:02 __consumer_offsets-32
    drwxr-xr-x 2 root root   137 3月  21 18:02 __consumer_offsets-35
    drwxr-xr-x 2 root root   137 3月  21 18:02 __consumer_offsets-38
    drwxr-xr-x 2 root root   137 3月  21 18:02 __consumer_offsets-41
    drwxr-xr-x 2 root root   137 3月  21 18:02 __consumer_offsets-44
    drwxr-xr-x 2 root root   137 3月  21 18:02 __consumer_offsets-47
    drwxr-xr-x 2 root root   137 3月  21 18:02 __consumer_offsets-5
    drwxr-xr-x 2 root root   137 3月  21 18:02 __consumer_offsets-8
    -rw-r--r-- 1 root root     4 3月  25 14:54 log-start-offset-checkpoint
    -rw-r--r-- 1 root root    54 3月  20 15:07 meta.properties
    drwxr-xr-x 2 root root   137 3月  25 14:53 one-0
    -rw-r--r-- 1 root root 73881 3月  21 17:59 out.log
    -rw-r--r-- 1 root root   394 3月  25 14:54 recovery-point-offset-checkpoint
    -rw-r--r-- 1 root root   394 3月  25 14:55 replication-offset-checkpoint
    

    通过zookeeper查看kafka集群

    /usr/local/zookeeper/bin/zkCli.sh
    [zk: localhost:2181(CONNECTED) 0] ls /brokers/ids   #查看broker的id
    [0]
    [zk: localhost:2181(CONNECTED) 1] ls /brokers/topics   #查看消息
    [k8s-fb-staging-log4j-tomcat-renren-jinkong-kylin-standard-server
    

    A4CB5471-1A4F-4EE0-B151-1A60A6FD9006.png-97kB

    相关文章:

    1. ZooKeeper 配置与安装
    2. 自动化运维工具之–Cobbler
    3. Kubernetes 1.14 二进制集群安装
    4. Nagios 配置及监控

    相关文章

    服务器端口转发,带你了解服务器端口转发
    服务器开放端口,服务器开放端口的步骤
    产品推荐:7月受欢迎AI容器镜像来了,有Qwen系列大模型镜像
    如何使用 WinGet 下载 Microsoft Store 应用
    百度搜索:蓝易云 – 熟悉ubuntu apt-get命令详解
    百度搜索:蓝易云 – 域名解析成功但ping不通解决方案

    发布评论