kafka入门

2023年 7月 31日 158.6k 0

消息队列

消息队列模式

点对点

生产者生产消息发送到Queue,消费者从Queue取出数据,并消费数据,数据被消费,Queue不再存储,Queue支持多个消费者,一条消息只能被消费一次(只有一个消费者可以消费到)

image-20201028165848153.png

发布/订阅(一对多)

生产者发送消息到topic中,多个消费者订阅topic,和点对点不同,发布到topic的消息会被所有订阅者消费
image-20201028170039746.png

kafka架构

image-20201028170118806.png

  • Producer:消息生产者,向kafka broker发送消息
  • Consumer:消息消费者,从kafka broker取消息
  • Consumer Group:多个consumer组成,消费者组内不同消费者负责消费不同分区的数据,kafka的topic下的一条消息,只能被同一个消费者组的一个消费者消费到
    • consumer group下可以有一个或多个consumer instance,consumer instance可以是一个进程,也可以是一个线程
    • group.id是一个字符串,唯一标识一个consumer group
    • consumer group下订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)
  • Broker:一台服务器就是一个broker,一个集群由多个broker组成,一个broker可以容纳多个topic
  • Topic:主题(队列)
  • Partition:分区,kafka的扩展性体现,一个庞大的topic有很多分区(partition),可以分不到多个broker上去,每个 partition 是一个有序的队列, partition 中的每条消息 都会被分配一个有序的 id (offset) kafka 只保证按一个 partition 中的顺序将消息发给consumer ,不保证一个 topic的整体(多个 partition 间)的顺序;
  • Replica:副本,当集群某个节点故障时,该节点的partitiion数据不丢失,kafka的副本机制,一个topic的每个分区有多个副本,一个leader和follower
  • follower:每个分区的多个副本的“从”,实时从leader中同步数据,保持leader数据的同步
  • leader:每个分区副本的主,生产者发送数据的对象,消费者消费数据的对象
  • Offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息;kafka 的存储文件都是按照 offset.kafka来命名,用 offset 名字的好处是方便查。例如你想位于 2049,只要找到2048.kafka的文件即可。当然 the first offsetthe 就 是 00000000000.kafka ;
  • zookeeper:保存offset数据(0.9版本之前),保证高可用,0.9版本之后offset数据存放在kafka的系统特定topic中;

配置文件

#配置文件
#broker的全局唯一编号
broker.id=0 
#删除 topic
delete.topic.enable=true 
#处理网络请求
num.network.threads=3 
#用来处理磁盘IO的现成数量
num.io.threads=8 
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400 
#接收套字的缓冲区大小 
socket.receive.buffer.bytes=102400 
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600 
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs 
#topic在当前
num.partitions=1 
#用来恢复和清理data下数据的线程量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超将被删除
log.retention.hours=168 
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181 ,

常用命令

#启动zookeeper
bash bin/kafka -topics.sh -- zookeeper localhost:2181 -- list
#启动kafka
bash /etc/kafka/bin/kafka-server-start.sh /etc/kafka/config/server.properties
#创建topic
sh /etc/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create  
--replication-factor 3 --partitions 1 --topic first
#删除topic
sh /etc/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic first
#消费消息
sh /etc/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning  --topic first
#生产消息
sh /etc/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic first

Zookeepr

image-20201029210528010.png

  • kafka集群中有一个broker会被选举成Controller,负责管理集群broker的上下线、所有topic的分区副本分配和leader选举
  • Controller管理依赖于Zookeeper

Prodeucer(生产者)

工作流程

image-20201028195535954.png
13274599-1c42e39c8e416cb4.png

  • producer先从zookeeper的 "/brokers/.../state"节点找到该partition的leader
  • producer将消息发送给该leader
  • leader将消息写入本地log
  • followers从leader pull消息
  • 写入本地log后向leader发送ACK
  • leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer发送ACK
  • Kafka 中消息是以topic 进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。
  • topic 是逻辑上的概念,而partition 是物理上的概念,每个partition 对应一个log 文件,该log 文件中存储的就是producer 生产的数据。Producer 生产的数据会被不断追加到该log 文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。
  • 文件存储

    clipboard.png

    • Broker:消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
    • Topic:一类消息,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。
    • Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。
    • Segment:partition物理上由多个segment组成
    • offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息.

    分析过程分为以下4个步骤:

    • topic中partition存储分布
    • partiton中文件存储方式
    • partiton中segment文件存储结构
    • 在partition中如何通过offset查找message

    topic中partition存储分布

    假设实验环境中Kafka集群只有一个broker,xxx/message-folder为数据文件存储根目录,在Kafka broker中server.properties文件配置(参数log.dirs=xxx/message-folder),

    例如创建2个topic名称分别为report_push、launch_info, partitions数量都为partitions=4 存储路径和目录规则为: xxx/message-folder

                  |--report_push-0
                  |--report_push-1
                  |--report_push-2
                  |--report_push-3
                  |--launch_info-0
                  |--launch_info-1
                  |--launch_info-2
                  |--launch_info-3
    

    在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1

    partiton中文件存储方式

    0ab51510.png

    • 每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。
    • 每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。

    这样做的好处就是能快速删除无用文件,有效提高磁盘利用率

    partiton中segment文件存储结构

    • segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件.
    • segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。

    创建一个topicXXX包含1 partition,设置每个segment大小为500MB,并启动producer向Kafka broker写入大量数据如下图2所示segment文件列表形象说明了上述2个规则:
    69e4b0a6.png

    以上图中一对segment file文件为例,说明segment中indexdata file对应关系物理结构如下:

    c415ed42.png
    上图中索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址。 其中以索引文件中元数据3,497为例,依次在数据文件中表示第3个message(在全局partiton表示第368772(368796+3)个message)、以及该消息的物理偏移地址为497。

    segment data file由许多message组成,每个message物理结构如下:

    355c1d57.png

    关键字 解释说明
    8 byte offset 在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message
    4 byte message size message大小
    4 byte CRC32 用crc32校验message
    1 byte “magic” 表示本次发布Kafka服务程序协议版本号
    1 byte “attributes” 表示为独立版本、或标识压缩类型、或编码类型。
    4 byte key length 表示key的长度,当key为-1时,K byte key字段不填
    K byte key 可选
    value bytes payload 表示实际消息数据。

    通过offset查找message

    例如读取offset=368776的message,需要通过下面2个步骤查找。

    • 第一步查找segment file ,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0;第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。 当offset=368776时定位到00000000000000368769.index|log
    • 第二步通过segment file查找message 通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到offset=368776为止。

    segment index file采取稀疏索引存储方式,它减少索引文件大小,通过mmap可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。

    Kafka文件存储优势

    Kafka运行时很少有大量读磁盘的操作,主要是定期批量写磁盘操作,因此操作磁盘很高效。这跟Kafka文件存储中读写message的设计是息息相关的。Kafka中读写message有如下特点:

    写message

    • 消息从java堆转入page cache(即物理内存)。
    • 由异步线程刷盘,消息从page cache刷入磁盘。

    读message

    • 消息直接从page cache转入socket发送出去。
    • 当从page cache没有找到相应数据时,此时会产生磁盘IO,从磁 盘Load消息到page cache,然后直接从socket发出去

    Kafka高效文件存储设计特点

    • Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
    • 通过索引信息可以快速定位message和确定response的最大大小。
    • 通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
    • 通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。

    零拷贝
    image-20201029210356233.png

    分区

    分区原因

    • 方便在集群中扩展 方便在集群中扩展 ,每个 Partition可以通过调整适应它所在的机器,而一个 topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了
    • 可以提高并发 ,因为可以以 Partition为单位读写了

    从数据组织形式来说,kafka有三层形式,kafka有多个主题,每个主题有多个分区,每个分区又有多条消息

    分区个数

    分区越多,所需要消耗的资源就越多。甚至如果足够大的时候,还会触发到操作系统的一些参数限制。比如linux中的文件描述符限制,一般在创建线程,创建socket,打开文件的场景下,linux默认的文件描述符参数,只有1024,超过则会报错。

    因为每个业务场景都不同,只能结合具体业务来看。假如每秒钟需要从主题写入和读取1GB数据,而消费者1秒钟最多处理50MB的数据,那么这个时候就可以设置20-25个分区,当然还要结合具体的物理资源情况。

    而如何无法估算出大概的处理速度和时间,那么就用基准测试来测试吧。创建不同分区的topic,逐步压测测出最终的结果。如果实在是懒得测,那比较无脑的确定分区数的方式就是broker机器数量的2~3倍。

    生产者分区写入策略

    生产者在将消息发送到某个Topic ,需要经过拦截器、序列化器和分区器(Partitioner)的一系列作用之后才能发送到对应的Broker,在发往Broker之前是需要确定它所发往的分区,kafka如何将数据分配到不同分区中的策略

    • 如果消息 ProducerRecord 指定了partition字段,那么就不需要分区器。
    • 如果消息 ProducerRecord 没有指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值。分区器的作用就是为消息分配分区。
    public class ProducerRecord {
        // 该消息需要发往的主题
        private final String topic;
        // 该消息需要发往的主题中的某个分区,如果该字段有值,则分区器不起作用,直接发往指定的分区
        // 如果该值为null,则利用分区器进行分区的选择 
        private final Integer partition;
        private final Headers headers;
        // 如果partition字段为null,则使用分区器进行分区选择时会用到该key字段,该值可为空 
        private final K key;
        private final V value;
        private final Long timestamp;
    

    默认

    Kafka 中提供的默认分区器是 DefaultPartitioner,它实现了Partitioner接口(用户可以实现这个接口来自定义分区器),其中的partition方法就是用来实现具体的分区分配逻辑:

    • 如果在发消息的时候指定了分区,则消息投递到指定的分区。
    • 如果没有指定分区,但是消息的key不为空,则使用称之为murmur的Hash算法(非加密型Hash函数,具备高运算性能及低碰撞率)来计算分区分配。
    • 如果既没有指定分区,且消息的key也是空,则用轮询的方式选择一个分区。
      clipboard-1603891595299.png
    public class DefaultPartitioner implements Partitioner {
    
        private final ConcurrentMap topicCounterMap = new ConcurrentHashMap();
    
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            // 首先通过cluster从元数据中获取topic所有的分区信息
            List partitions = cluster.partitionsForTopic(topic);
            // 拿到该topic的分区数
            int numPartitions = partitions.size();
            // 如果消息记录中没有指定key
            if (keyBytes == null) {
                // 则获取一个自增的值
                int nextValue = nextValue(topic);
                // 通过cluster拿到所有可用的分区(可用的分区这里指的是该分区存在首领副本)
                List availablePartitions = cluster.availablePartitionsForTopic(topic);
                // 如果该topic存在可用的分区
                if (availablePartitions.size() > 0) {
                    // 那么将nextValue转成正数之后对可用分区数进行取余
                    int part = Utils.toPositive(nextValue) % availablePartitions.size();
                    // 然后从可用分区中返回一个分区
                    return availablePartitions.get(part).partition();
                } else { // 如果不存在可用的分区
                    // 那么就从所有不可用的分区中通过取余的方式返回一个不可用的分区
                    return Utils.toPositive(nextValue) % numPartitions;
                }
            } else { // 如果消息记录中指定了key
                // 则使用该key进行hash操作,然后对所有的分区数进行取余操作,这里的hash算法采用的是murmur2算法,然后再转成正数
                //toPositive方法很简单,直接将给定的参数与0X7FFFFFFF进行逻辑与操作。
                return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
            }
        }
    
        // nextValue方法可以理解为是在消息记录中没有指定key的情况下,需要生成一个数用来代替key的hash值
        // 方法就是最开始先生成一个随机数,之后在这个随机数的基础上每次请求时均进行+1的操作
        private int nextValue(String topic) {
            // 每个topic都对应着一个计数
            AtomicInteger counter = topicCounterMap.get(topic);
            if (null == counter) { // 如果是第一次,该topic还没有对应的计数
                // 那么先生成一个随机数
                counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
                // 然后将该随机数与topic对应起来存入map中
                AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
                if (currentCounter != null) {
                    // 之后把这个随机数返回
                    counter = currentCounter;
                }
            }
            // 一旦存入了随机数之后,后续的请求均在该随机数的基础上+1之后进行返回
            return counter.getAndIncrement();
        }
    

    自定义分区

    public class MyParatitioner implements Partitioner {
        @Override
        public void configure(Map configs) {
        }
    
        @Override
        public int partition(String topic, Object key, byte[] keyBytes,
                             Object value, byte[] valueBytes, Cluster cluster) {
            //key不能空,如果key为空的会通过轮询的方式 选择分区
            if(keyBytes == null || (!(key instanceof String))){
                throw new RuntimeException("key is null");
            }
            //获取分区列表
            List partitions = cluster.partitionsForTopic(topic);
    
            //以下是上述各种策略的实现,不能共存
            //随机策略
            return ThreadLocalRandom.current().nextInt(partitions.size());
    
            //按消息键保存策略
            return Math.abs(key.hashCode()) % partitions.size();
    
            //自定义分区策略, 比如key为123的消息,选择放入最后一个分区
            if(key.toString().equals("123")){
                return partitions.size()-1;
            }else{
                //否则随机
                ThreadLocalRandom.current().nextInt(partitions.size());
            }
        }
    
        @Override
        public void close() {
        }
    }
    
    //生成kafka producer客户端的时候指定该类就行:
    
        props.put("partitioner.class", "kafkaconf.MyParatitioner");  //主要这个配置指定分区类  //properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.Mypartition.Mypartition");
        ......其他配置
        val producer = new KafkaProducer[String, String](properties)
    

    副本机制

    在kafka中,每个主题可以有多个分区,每个分区又可以有多个副本。这多个副本中,只有一个是leader,而其他的都是follower副本。仅有leader副本可以对外提供服务。

    多个follower副本通常存放在和leader副本不同的broker中。通过这样的机制实现了高可用,当某台机器挂掉后,其他follower副本也能迅速”转正“,开始对外提供服务。

    副本作用

    在kafka中,实现副本的目的就是冗余备份,且仅仅是冗余备份,所有的读写请求都是由leader副本进行处理的。follower副本仅有一个功能,那就是从leader副本拉取消息,尽量让自己跟leader副本的内容一致

    如果follower副本也对外提供服务那会怎么样呢?首先,性能是肯定会有所提升的。但同时,会出现一系列问题。类似数据库事务中的幻读,脏读。

    比如你现在写入一条数据到kafka主题a,消费者b从主题a消费数据,却发现消费不到,因为消费者b去读取的那个分区副本中,最新消息还没写入。而这个时候,另一个消费者c却可以消费到最新那条数据,因为它消费了leader副本。

    数据可靠(Producer的ACK机制)

    为保证producer发送的数据,能可靠到指定topic,topic的每个的partition收到 producer发送的数据后,都需要向producer发送 ack(acknowledgement确认收到),如果 producer收到 ack,就会进行下一轮的发送。

    clipboard-1603891899841.png

    问题:分区中现有一个leader副本节点和多个follower副本节点,生产者将消息发送过来的时候,何时返回ack给生产者?

    leader副本负责读与写,follower副本同步leader的数据。

    方案1:leader和所有的follower都同步完成,才发送ack给生产者
    方案2:leader+follower同步完成的数量过半,就发送ack给生产者

    方案 优点 缺点
    半数以上flower完成同步,就发送ack 延迟低(follower有块有慢,当半数以上follower完成,就过滤剩下的follower) 选举新leader时,容忍n节点故障,需要2n+1个副本
    全部follower完成同步,才发送ack 选举新leader时,容忍n态节点故障,需要n+1个副本 延迟高(同步快的需要等同步慢的,导致延迟高)

    Kafka 选择了第二种方案,原因如下:

  • 同样为了容忍n 台节点的故障,第一种方案需要2n+1 个副本,而第二种方案只需要n+1个副本,而Kafka 的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
  • 虽然第二种方案的网络延迟会比较高,但网络延迟对Kafka 的影响较小。
  • 选择方案二会可能出现这种情况,leader+follower完全同步时,假如有1个leader+4个follower,1个leader和3个follower都同步完成,1个follower同步超级慢或者挂掉,会影响返回或者不返回ack
  • ISR

    采用第二种方案之后,设想以下情景:

    leader 收到数据,所有follower 都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader 进行同步,那leader 就要一直等下去,直到它完成同步,才能发送ack。这个问题怎么解决呢

    Leader维护了一个动态的in-sync replica set (ISR-同步副本列表),意为和leader保持同步的follower集合。根据follower发来的FETCH请求中的fetch offset判断ISR中的follower完成数据同步是否成功。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新的leader。

    • ISR(In-Sync Replicas ):与leader保持同步的follower集合
    • AR(Assigned Replicas):分区的所有副本
      • ISR是由leader维护,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。
      • AR=ISR+OSR。

    ACK

    对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。

    ACK有三个值,分别为-1 0 1

    • 当ACK=0时,producer不等待broker的ack,这一操作提供了一个最低的延迟,broker一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据;

    modb_20200717_101657.png

    • ACK=1时,producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,而由于已经返回了ack,系统默认新选举的leader已经有了数据,从而不会进行失败重试,那么将会丢失数据

    modb_20200717_101657.png

    • ACK=-1,producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,导致没有返回ack给Producer,由于失败重试机制,又会给新选举出来的leader发送数据,造成数据重复

    modb_20200717_101658.png

    数据一致性

    假设leader接受了producer传来的数据为20条,ISR中三台follower(f1,f2,f3)开始同步数据,由于网络传输,三台follower同步数据的速率不同。当f1同步了15条数据,f2同步了10条数据,f3同步了13条数据,此时,leader突然挂掉,从ISR中选取了f2作为主节点,此时leader-f2同步了10条,f1同步15,f3同步13,就会造成leader和follower之间数据不一致问题。

    解决办法:

    • HW (High Watermark)俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息,对于同一个副本对象而言,其HW值不会大于LEO值。小于等于HW值的所有消息都被认为是“已备份”的(replicated) 。
    • LEO(Log End Offset),即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。**注意是下一条消息!**也就是说,如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。LEO 的大小相当于当前日志分区中最后一条消息的offset值加1,分区 ISR 集合中的每个副本都会维护自身的 LEO,,而 ISR 集合中最小的 LEO 即为分区的 HW,对消费者而言只能消费 HW 之前的消息(就是途中黄色块)。

    735367-20170830155744640-440918775.png

    上图中,HW值是7,表示位移是0-7的所有消息都已经处于“已备份状态”(committed),而LEO值是15,那么8~14的消息就是尚未完全备份(fully replicated)——为什么没有15?因为刚才说过了,LEO指向的是下一条消息到来时的位移,故上图使用虚线框表示。我们总说consumer无法消费未提交消息。这句话如果用以上名词来解读的话,应该表述为:consumer无法消费分区下leader副本中位移值大于分区HW的任何消息。这里需要特别注意分区HW就是leader副本的HW值。

    follower副本何时更新LEO

    follower副本只是被动地向leader副本请求数据,具体表现为follower副本不停地向leader副本所在的broker发送FETCH请求,一旦获取消息后写入自己的日志中进行备份。那么follower副本的LEO是何时更新的呢?首先,Kafka有两套follower副本LEO:1. 一套LEO保存在follower副本所在broker的副本管理机中;2. 另一套LEO保存在leader副本所在broker的副本管理机中——换句话说,leader副本机器上保存了所有的follower副本的LEO。

    为什么要保存两套?这是因为Kafka使用前者帮助follower副本更新其HW值;而利用后者帮助leader副本更新其HW使用

    • follower副本端的follower副本LEO何时更新

    follower副本端的LEO值就是其底层日志的LEO值,也就是说每当新写入一条消息,其LEO值就会被更新(类似于LEO += 1)。当follower发送FETCH请求后,leader将数据返回给follower,此时follower开始向底层log写数据,从而自动地更新LEO值

    • leader副本端的follower副本LEO何时更新?

    leader副本端的follower副本LEO的更新发生在leader在处理follower FETCH请求时。一旦leader接收到follower发送的FETCH请求,它首先会从自己的log中读取相应的数据,但是在给follower返回数据之前它先去更新follower的LEO(即上面所说的第二套LEO)

    follower副本何时更新HW

    follower更新HW发生在其更新LEO之后,一旦follower向log写完数据,它会尝试更新它自己的HW值。具体算法就是比较当前LEO值与FETCH响应中leader的HW值,取两者的小者作为新的HW值。这告诉我们一个事实:如果follower的LEO值超过了leader的HW值,那么follower HW值是不会越过leader HW值的。

    leader副本何时更新LEO

    和follower更新LEO道理相同,leader写log时就会自动地更新它自己的LEO值

    leader副本何时更新HW值

    前面说过了,leader的HW值就是分区HW值,因此何时更新这个值是我们最关心的,因为它直接影响了分区数据对于consumer的可见性 。以下4种情况下leader会尝试去更新分区HW——切记是尝试,有可能因为不满足条件而不做任何更新:

    • 副本成为leader副本时:当某个副本成为了分区的leader副本,Kafka会尝试去更新分区HW,这个副本的状态是一定要检查的!不过,本文讨论的是当系统稳定后且正常工作时备份机制可能出现的问题,故这个条件不在我们的讨论之列。
    • broker出现崩溃导致副本被踢出ISR时:若有broker崩溃则必须查看下是否会波及此分区,因此检查下分区HW值是否需要更新是有必要的。本文不对这种情况做深入讨论
    • producer向leader副本写入消息时:因为写入消息会更新leader的LEO,故有必要再查看下HW值是否也需要修改
    • leader处理follower FETCH请求时:当leader处理follower的FETCH请求时首先会从底层的log读取数据,之后会尝试更新分区HW值

    特别注意上面4个条件中的最后两个。它揭示了一个事实——当Kafka broker都正常工作时,分区HW值的更新时机有两个:leader处理PRODUCE请求时和leader处理FETCH请求时。另外,leader是如何更新它的HW值的呢?前面说过了,leader broker上保存了一套follower副本的LEO以及它自己的LEO。当尝试确定分区HW时,它会选出所有满足条件的副本,比较它们的LEO(当然也包括leader自己的LEO),并选择最小的LEO值作为HW值。这里的满足条件主要是指副本要满足以下两个条件之一:

    • 处于ISR中
    • 副本LEO落后于leader LEO的时长不大于replica.lag.time.max.ms参数值(默认是10s)

    乍看上去好像这两个条件说得是一回事,毕竟ISR的定义就是第二个条件描述的那样。但某些情况下Kafka的确可能出现副本已经“追上”了leader的进度,但却不在ISR中——比如某个从failure中恢复的副本。如果Kafka只判断第一个条件的话,确定分区HW值时就不会考虑这些未在ISR中的副本,但这些副本已经具备了“立刻进入ISR”的资格,因此就可能出现分区HW值越过ISR中副本LEO的情况——这肯定是不允许的,因为分区HW实际上就是ISR中所有副本LEO的最小值。

    举例

    我们假设有一个topic,单分区,副本因子是2,即一个leader副本和一个follower副本。当producer发送一条消息时,broker端的副本到底会发生什么事情以及分区HW是如何被更新的

    下图是初始状态,

  • 初始时leader和follower的HW和LEO都是0(严格来说源代码会初始化LEO为-1,不过这不影响之后的讨论);
  • leader中的remote LEO指的就是leader端保存的follower LEO,也被初始化成0。此时,producer没有发送任何消息给leader,而follower已经开始不断地给leader发送FETCH请求了,但因为没有数据因此什么都不会发生。
  • follower发送过来的FETCH请求因为无数据而暂时会被寄存到leader端的purgatory中,待500ms(replica.fetch.wait.max.ms参数)超时后会强制完成。倘若在寄存期间producer端发送过来数据,那么会Kafka会自动唤醒该FETCH请求,让leader继续处理之。
  • 虽然purgatory不是本文的重点,但FETCH请求发送和PRODUCE请求处理的时机会影响我们的讨论。因此后续我们也将分两种情况来讨论分区HW的更新。

    735367-20170921093123212-263609660.png

    第一种情况

    follower发送FETCH请求在leader处理完produce请求之后

    producer给该topic分区发送了一条消息。此时的状态如下图所示:

    735367-20170921102252493-795439586.png

    如图所示,leader接收到produce请求主要做两件事情:

  • 把消息写入写底层log(同时也就自动地更新了leader的LEO)
  • 尝试更新leader HW值(前面leader副本何时更新HW值一节中的第三个条件触发)。我们已经假设此时follower尚未发送FETCH请求,那么leader端保存的remote LEO依然是0,因此leader会比较它自己的LEO值和remote LEO值,发现最小值是0,与当前HW值相同,故不会更新分区HW值
  • 所以,PRODUCE请求处理完成后leader端的HW值依然是0,而LEO是1,remote LEO是0。假设此时follower发送了FETCH请求(或者说follower早已发送了FETCH请求,只不过在broker的请求队列中排队),那么状态变更如下图所示:

    735367-20170921111634978-1831383369.png

    本例中当follower发送FETCH请求时,leader端的处理依次是:

  • 读取底层log数据
  • 更新remote LEO = 0(为什么是0? 因为此时follower还没有写入这条消息。leader如何确认follower还未写入呢?这是通过follower发来的FETCH请求中的fetch offset来确定的)
  • 尝试更新分区HW——此时leader LEO = 1,remote LEO = 0,故分区HW值= min(leader LEO, follower remote LEO) = 0
  • 把数据和当前分区HW值(依然是0)发送给follower副本
  • 而follower副本接收到FETCH response后依次执行下列操作:

  • 写入本地log(同时更新follower LEO)
  • 更新follower HW——比较本地LEO和当前leader HW取小者,故follower HW = 0
  • 此时,第一轮FETCH RPC结束,我们会发现虽然leader和follower都已经在log中保存了这条消息,但分区HW值尚未被更新,实际上,它是在第二轮FETCH RPC中被更新的,如下图所示:

    735367-20170930085955559-1430605490.png

    上图中,follower发来了第二轮FETCH请求,leader端接收到后仍然会依次执行下列操作:

  • 读取底层log数据
  • 更新remote LEO = 1(这次为什么是1了? 因为这轮FETCH RPC携带的fetch offset是1,那么为什么这轮携带的就是1了呢,因为上一轮结束后follower LEO被更新为1了)
  • 尝试更新分区HW——此时leader LEO = 1,remote LEO = 1,故分区HW值= min(leader LEO, follower remote LEO) = 1。注意分区HW值此时被更新了!!!
  • 把数据(实际上没有数据)和当前分区HW值(已更新为1)发送给follower副本
  • 同样地,follower副本接收到FETCH response后依次执行下列操作:

  • 写入本地log,当然没东西可写,故follower LEO也不会变化,依然是1
  • 更新follower HW——比较本地LEO和当前leader LEO取小者。由于此时两者都是1,故更新follower HW = 1
  • producer端发送消息后broker端完整的处理流程就讲完了。此时消息已经成功地被复制到leader和follower的log中且分区HW是1,表明consumer能够消费offset = 0的这条消息。下面我们来分析下PRODUCE和FETCH请求交互的第二种情况。

    第二种情况

    FETCH请求保存在purgatory中PRODUCE请求到来

    这种情况实际上和第一种情况差不多。前面说过了,当leader无法立即满足FECTH返回要求的时候(比如没有数据),那么该FETCH请求会被暂存到leader端的purgatory中,待时机成熟时会尝试再次处理它。不过Kafka不会无限期地将其缓存着,默认有个超时时间(500ms),一旦超时时间已过,则这个请求会被强制完成。不过我们要讨论的场景是在寄存期间,producer发送PRODUCE请求从而使之满足了条件从而被唤醒。此时,leader端处理流程如下:

  • leader写入本地log(同时自动更新leader LEO)
  • 尝试唤醒在purgatory中寄存的FETCH请求
  • 尝试更新分区HW
  • 至于唤醒后的FETCH请求的处理与第一种情况完全一致,故这里不做详细展开了。

    以上所有的东西其实就想说明一件事情:Kafka使用HW值来决定副本备份的进度,而HW值的更新通常需要额外一轮FETCH RPC才能完成,故而这种设计是有问题的。它们可能引起的问题包括:

    • 备份数据丢失
    • 备份数据不一致

    数据丢失

    如前所述,使用HW值来确定备份进度时,其值的更新是在下一轮RPC中完成的。

    如果follower副本在 follower副本接收到FETCH response后执行操作的第一步与第二步之间发生崩溃,那么就有可能造成数据的丢失。我们举个例子来看下。

    735367-20170921140300681-102012752.png

    上图中有两个副本:A和B。开始状态是A是leader。我们假设producer端min.insync.replicas设置为1,那么当producer发送两条消息给A后,A写入到底层log,此时Kafka会通知producer说这两条消息写入成功。

    但是在broker端,leader和follower底层的log虽都写入了2条消息且分区HW已经被更新到2,但follower HW尚未被更新(也就是操作的第二步尚未执行)。倘若此时副本B所在的broker宕机,那么重启回来后B会自动把LEO调整到之前的HW值,故副本B会做日志截断(log truncation),将offset = 1的那条消息从log中删除,并调整LEO = 1,此时follower副本底层log中就只有一条消息,即offset = 0的消息。

    B重启之后需要给A发FETCH请求,但若A所在broker机器在此时宕机,那么Kafka会令B成为新的leader,而当A重启回来后也会执行日志截断,将HW调整回1。这样,位移=1的消息就从两个副本的log中被删除,即永远地丢失了。

    这个场景丢失数据的前提是在min.insync.replicas=1时,一旦消息被写入leader端log即被认为是“已提交”,而延迟一轮FETCH RPC更新HW值的设计使得follower HW值是异步延迟更新的,倘若在这个过程中leader发生变更,那么成为新leader的follower的HW值就有可能是过期的,使得clients端认为是成功提交的消息被删除。

    leader/follower数据离散

    除了可能造成的数据丢失以外,这种设计还有一个潜在的问题,即造成leader端log和follower端log的数据不一致。比如leader端保存的记录序列是r1,r2,r3,r4,r5,....;而follower端保存的序列可能是r1,r3,r4,r5,r6...。这也是非法的场景,因为顾名思义,follower必须追随leader,完整地备份leader端的数据。

    我们依然使用一张图来说明这种场景是如何发生的:

    image-20201029135749190.png

    image-20201029135749190.png
    这种情况的初始状态与情况1有一些不同的:A依然是leader,A的log写入了2条消息,但B的log只写入了1条消息。分区HW更新到2,但B的HW还是1,同时producer端的min.insync.replicas = 1。

    这次我们让A和B所在机器同时挂掉,然后假设B先重启回来,因此成为leader,分区HW = 1。假设此时producer发送了第3条消息(绿色框表示)给B,于是B的log中offset = 1的消息变成了绿色框表示的消息,同时分区HW更新到2(A还没有回来,就B一个副本,故可以直接更新HW而不用理会A)之后A重启回来,需要执行日志截断,但发现此时分区HW=2而A之前的HW值也是2,故不做任何调整。此后A和B将以这种状态继续正常工作。

    显然,这种场景下,A和B底层log中保存在offset = 1的消息是不同的记录,从而引发不一致的情形出现。

    解决方案

    Kafka 0.11.0.0.版本解决方案

    造成上述两个问题的根本原因在于HW值被用于衡量副本备份的成功与否以及在出现failture时作为日志截断的依据,但HW值的更新是异步延迟的,特别是需要额外的FETCH请求处理流程才能更新,故这中间发生的任何崩溃都可能导致HW值的过期。鉴于这些原因,Kafka 0.11引入了leader epoch来取代HW值。Leader端多开辟一段内存区域专门保存leader的epoch信息,这样即使出现上面的两个场景也能很好地规避这些问题。

    所谓leader epoch实际上是一对值:(epoch,offset)。epoch表示leader的版本号,从0开始,当leader变更过1次时epoch就会+1,而offset则对应于该epoch版本的leader写入第一条消息的位移。因此假设有两对值:

    (0, 0) (1, 120)

    表示第一个leader从位移0开始写入消息;共写了120条[0, 119];而第二个leader版本号是1,从位移120处开始写入消息。

    leader broker中会保存这样的一个缓存,并定期地写入到一个checkpoint文件中。

    当leader写底层log时它会尝试更新整个缓存——如果这个leader首次写消息,则会在缓存中增加一个条目;否则就不做更新。而每次副本重新成为leader时会查询这部分缓存,获取出对应leader版本的位移,这就不会发生数据不一致和丢失的情况。

    规避数据丢失

    image-20201029140803466.png

    上图左半边已经给出了简要的流程描述,这里不详细展开具体的leader epoch实现细节(比如OffsetsForLeaderEpochRequest的实现),我们只需要知道每个副本都引入了新的状态来保存自己当leader时开始写入的第一条消息的offset以及leader版本。这样在恢复的时候完全使用这些信息而非水位来判断是否需要截断日志。

    规避数据不一致

    image-20201029141758935.png

    同样的道理,依靠leader epoch的信息可以有效地规避数据不一致的问题。

    0.11.0.0版本的Kafka通过引入leader epoch解决了原先依赖水位表示副本进度可能造成的数据丢失/数据不一致问题。

    源代码位置:kafka.server.epoch.LeaderEpochCache.scala (leader epoch数据结构)、kafka.server.checkpoints.LeaderEpochCheckpointFile(checkpoint检查点文件操作类)还有分布在Log中的CRUD操作。

    选举机制

    ISR

    • leader会追踪和维护ISR中所有follower的滞后状态。如果滞后太多(时间滞后replica.lag.time.max.ms可配置),leader会把该replica从ISR中移除。被移除ISR的replica一直在追赶leader。leader写入数据后并不会commit,只有ISR列表中的所有folower同步之后才会commit,把滞后的follower移除ISR主要是避免写消息延迟。设置ISR主要是为了broker宕掉之后,重新选举partition的leader从ISR列表中选择。

    leader

    • 在kafka集群中有2个种leader,一种是broker的leader即controller leader,还有一种就是partition的leader,下面介绍一下2种leader的选举大致流程。

    Controller leader

    所谓控制器就是一个Borker,在一个kafka集群中,有多个broker节点,但是它们之间需要选举出一个leader,其他的broker充当follower角色。集群中第一个启动的broker会通过在zookeeper中创建临时节点/controller来让自己成为控制器,其他broker启动时也会在zookeeper中创建临时节点,但是发现节点已经存在,所以它们会收到一个异常,意识到控制器已经存在,那么就会在zookeeper中创建watch对象,便于它们收到控制器变更的通知。

    那么如果控制器由于网络原因与zookeeper断开连接或者异常退出,那么其他broker通过watch收到控制器变更的通知,就会去尝试创建临时节点/controller,如果有一个broker创建成功,那么其他broker就会收到创建异常通知,也就意味着集群中已经有了控制器,其他broker只需创建watch对象即可。

    如果集群中有一个broker发生异常退出了,那么控制器就会检查这个broker是否有分区的副本leader,如果有那么这个分区就需要一个新的leader,此时控制器就会去遍历其他副本,决定哪一个成为新的leader,同时更新分区的ISR集合。

    如果有一个broker加入集群中,那么控制器就会通过Broker ID去判断新加入的broker中是否含有现有分区的副本,如果有,就会从分区副本中去同步数据。

    集群中每选举一次控制器,就会通过zookeeper创建一个controller epoch,每一个选举都会创建一个更大,包含最新信息的epoch,如果有broker收到比这个epoch旧的数据,就会忽略它们,kafka也通过这个epoch来防止集群产生“脑裂”。

    分区副本选举机制

    分区leader副本的选举由Kafka Controller (Controller leader)负责具体实施。当创建分区(创建主题或增加分区都有创建分区的动作)或分区上线(比如分区中原先的leader副本下线,此时分区需要选举一个新的leader上线来对外提供服务)的时候都需要执行leader的选举动作

    在kafka的集群中,会存在着多个主题topic,在每一个topic中,又被划分为多个partition,为了防止数据不丢失,每一个partition又有多个副本,在整个集群中,总共有三种副本角色:

    • 首领副本(leader):也就是leader主副本,每个分区都有一个首领副本,为了保证数据一致性,所有的生产者与消费者的请求都会经过该副本来处理。
    • 跟随者副本(follower):除了首领副本外的其他所有副本都是跟随者副本,跟随者副本不处理来自客户端的任何请求,只负责从首领副本同步数据,保证与首领保持一致。如果首领副本发生崩溃,就会从这其中选举出一个leader。
    • 首选leader副本:创建分区时指定的首选首领。如果不指定,则为分区的第一个副本。

    follower需要从leader中同步数据,但是由于网络或者其他原因,导致数据阻塞,出现不一致的情况,为了避免这种情况,follower会向leader发送请求信息,这些请求信息中包含了follower需要数据的偏移量offset,而且这些offset是有序的。

    如果有follower向leader发送了请求1,接着发送请求2,请求3,那么再发送请求4,这时就意味着follower已经同步了前三条数据,否则不会发送请求4。leader通过跟踪 每一个follower的offset来判断它们的复制进度。

    默认的,如果follower与leader之间超过10s内没有发送请求,或者说没有收到请求数据,此时该follower就会被认为“不同步副本”。而持续请求的副本就是“同步副本”,当leader发生故障时,只有“同步副本”才可以被选举为leader。其中的请求超时时间可以通过参数replica.lag.time.max.ms参数来配置。

    我们希望每个分区的leader可以分布到不同的broker中,尽可能的达到负载均衡,所以会有一个首选首领,如果我们设置参数auto.leader.rebalance.enable为true,那么它会检查首选首领是否是真正的首领,如果不是,则会触发选举,让首选首领成为首领。

    基本思路是按照AR集合中副本的顺序查找第一个存活的副本,并且这个副本在ISR集合中。一个分区的AR集合在分配的时候就被指定,并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的ISR集合中副本的顺序可能会改变。注意这里是根据AR的顺序而不是ISR的顺序进行选举的。

    还有一些情况也会发生分区leader的选举,比如当分区进行重分配(reassign)的时候也需要执行leader的选举动作。这个思路比较简单:从重分配的AR列表中找到第一个存活的副本,且这个副本在目前的ISR列表中。

    再比如当发生优先副本(preferred replica partition leader election)的选举时,直接将优先副本设置为leader即可,AR集合中的第一个副本即为优先副本。

    还有一种情况就是当某节点被优雅地关闭(也就是执行ControlledShutdown)时,位于这个节点上的leader副本都会下线,所以与此对应的分区需要执行leader的选举。这里的具体思路为:从AR列表中找到第一个存活的副本,且这个副本在目前的ISR列表中,与此同时还要确保这个副本不处于正在被关闭的节点上。

    leader发生故障以后,会从ISR中选出一个新leader,为了保证多个副本之间的同步性和数据一致性,其余follower会将log文件中高于HW的部分截掉,然后从leader中同步数据

    例子

    分区中的leader挂掉后,需要从ISR的follower副本中选举出新的leader

    6a54935065447692b3e995995ca8de6a.png

    图示HW为9,消费者只能看到0~9

    假如leader挂掉了,选举follower2为leader,那么以新leader通知其他节点以HW 9为基准,超过HW的部分需要截取掉,leader自身的(LEO为11 大于HW 9)不用截取掉。然后从新的leader开始同步,这个只会保证副本之间的数据一致性,并不能保证数据不丢失或者不重复

    消费组选主

    在kafka的消费端,会有一个消费者协调器以及消费组,组协调器GroupCoordinator需要为消费组内的消费者选举出一个消费组的leader,那么如何选举的呢?

    如果消费组内还没有leader,那么第一个加入消费组的消费者即为消费组的leader,如果某一个时刻leader消费者由于某些原因退出了消费组,那么就会重新选举leader,如何选举?

    private val members = new mutable.HashMap[String, MemberMetadata]
    leaderId = members.keys.headOption
    12
    

    上面代码是kafka源码中的部分代码,member是一个hashmap的数据结构,key为消费者的member_id,value是元数据信息,那么它会将leaderId选举为Hashmap中的第一个键值对,它和随机基本没啥区别。

    对于整个选举算法的详情需要先了解Raft选举算法,kafka是基于该算法来实现leader选举的。可以参考文章【分布式一致性协议:Raft算法详解】。

    每个Kafka副本对象都有两个重要的属性:LEO和HW。注意是所有的副本,而不只是leader副本。

    20200531115957300.png
    20200531120025228.png
    20200531120104511.png
    2020053112040821.png

    Consumer 消费者

    消费方式

    push

    push模式很难适应不同消费速率的消费者,消息发送速率是由broker决定的,broker会以最快的速度传递消息,但是会造成consumer来不及处理消息,会导致拒绝服务以及网络拥堵

    pull

    consumer采用pull模式从broker拉数据

    不足之处就是,如果kafka没数据,消费者会陷入循环,一直返回空数据,于是kafka在消费数据的时候会传入一个时长参数timeout,如果当前没有数据,consumer会等一段时间再返回,这个时长是timeout

    消费者组

    874963-20180917164755125-572862202.png

    • 同一时刻,一条消息只能被组中的一个消费者实例消费
    • 消费者组订阅这个主题,意味着主题下的所有分区都会被组中的消费者消费到,如果按照从属关系来说的话就是,主题下的每个分区只从属于组中的一个消费者,不可能出现组中的两个消费者负责同一个分区。
      • 如果分区数大于或者等于组中的消费者实例数,那自然没有什么问题,无非一个消费者会负责多个分区,(PS:当然,最理想的情况是二者数量相等,这样就相当于一个消费者负责一个分区);
      • 如果消费者实例的数量大于分区数,那么按照默认的策略(PS:之所以强调默认策略是因为你也可以自定义策略),有一些消费者是多余的,一直接不到消息而处于空闲状态。
      • 如果分区数目大于消费者数目,即多个消费者负责同一个分区,那么会有什么问题? 我们知道,Kafka在设计的时候就是要保证分区下消息的顺序,也就是说消息在一个分区中的顺序是怎样的,那么消费者在消费的时候看到的就是什么样的顺序,那么要做到这一点就首先要保证消息是由消费者主动拉取的(pull),其次还要保证一个分区只能由一个消费者负责。倘若,两个消费者负责同一个分区,那么就意味着两个消费者同时读取分区的消息,由于消费者自己可以控制读取消息的offset,就有可能C1才读到2,而C1读到1,C1还没处理完,C2已经读到3了,则会造成很多浪费,因为这就相当于多线程读取同一个消息,会造成消息处理的重复,且不能保证消息的顺序,这就跟主动推送(push)无异。

    消费者组

    • consumer group下可以有一个或多个consumer instance,consumer instance可以是一个进程,也可以是一个线程
    • group.id是一个字符串,唯一标识一个consumer group
    • consumer group下订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)

    消费者位置

    消费者在消费的过程中需要记录自己消费了多少数据,即消费位置信息。在Kafka中这个位置信息有个专门的术语:位移(offset)。

    很多消息引擎都把这部分信息保存在服务器端(broker端)。这样做的好处当然是实现简单,但会有三个主要的问题:

  • broker从此变成有状态的,会影响伸缩性;
  • 需要引入应答机制(acknowledgement)来确认消费成功。
  • 由于要保存很多consumer的offset信息,必然引入复杂的数据结构,造成资源浪费。而Kafka选择了不同的方式:每个consumer group保存自己的位移信息,那么只需要简单的一个整数表示位置就够了;同时可以引入checkpoint机制定期持久化,简化了应答机制的实现。
  • 位移管理(offset management)

    自动VS手动

    Kafka默认是定期帮你自动提交位移的(enable.auto.commit = true),你当然可以选择手动提交位移实现自己控制。另外kafka会定期把group消费情况保存起来,做成一个offset map,如下图所示:

    735367-20161226175429711-638862783.png

    上图中表明了test-group这个组当前的消费情况。

    //设置自动提交
    props.put("enable.auto.commit", "true");
    

    位移提交 -- 逐步减少与zk的耦合

    老版本的位移是提交到zookeeper中的,目录结构是:/consumers//offsets/,但是zookeeper并不适合进行大批量的读写操作,尤其是写操作。因此kafka提供了另一种解决方案:增加__consumeroffsets topic,将offset信息写入这个topic,摆脱对zookeeper的依赖(指保存offset这件事情)。__consumer_offsets中的消息保存了每个consumer group某一时刻提交的offset信息。依然以上图中的consumer group为例,格式大概如下:

    735367-20161226175522507-1842910668.png

    __consumers_offsets topic配置了compact策略,使得它总是能够保存最新的位移信息,既控制了该topic总体的日志容量,也能实现保存最新offset的目的。compact的具体原理请参见:Log Compaction

    kafka读取__consumers_offsets内容

    由于Zookeeper并不适合大批量的频繁写入操作,新版Kafka已推荐将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets topic,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息

    #创建topic
    bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 3 --partitions 3
    #由于默认没有指定key,所以根据round-robin方式,消息分布到不同的分区上
    #创建consumer group
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic test --from-beginning --new-consumer
    #获取group id
    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --list --new-consumer
    #输出 console-consumer-46965 
    
    
    s
    #查询__consumer_offsets topic所有内容
    #注意:运行下面命令前先要在consumer.properties中设置exclude.internal.topics=false
    #0.11.0.0之前版本
    bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
    
    #0.11.0.0之后版本(含)
    bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
    

    默认情况下__consumer_offsets有50个分区,如果你的系统中consumer group也很多的话,那么这个命令的输出结果会很多。

    计算指定consumer group在__consumer_offsets topic中分区信息,这时候就用到了之前获取的group.id(本例中是console-consumer-46965)。

    Kafka会使用下面公式计算该group位移保存在__consumer_offsets的哪个分区上:

    Math.abs(groupID.hashCode()) % numPartitions
    

    所以在本例中,对应的分区=Math.abs("console-consumer-46965".hashCode()) % 50 = 11,即__consumer_offsets的分区11保存了这个consumer group的位移信息

    #获取指定consumer group的位移信息 
    #0.11.0.0版本之前
    bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter"
    
    #0.11.0.0版本以后(含)
    bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"
    

    image-20201031183455120.png

    精准一次性消费

    手动管理offset

    www.cnblogs.com/yn-huang/p/…

    分区策略

    org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor

    如果是自定义分配策略的话可以继承AbstractPartitionAssignor这个类,它默认有3个实现,对应三种分区策略

    range

    range策略对应的实现类是org.apache.kafka.clients.consumer.RangeAssignor

    这是默认的分配策略

    可以通过消费者配置中partition.assignment.strategy参数来指定分配策略,它的值是类的全路径,是一个数组

    /**
     * The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order
     * and the consumers in lexicographic order. We then divide the number of partitions by the total number of
     * consumers to determine the number of partitions to assign to each consumer. If it does not evenly
     * divide, then the first few consumers will have one extra partition.
     *
     * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
     * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
     *
     * The assignment will be:
     * C0: [t0p0, t0p1, t1p0, t1p1]
     * C1: [t0p2, t1p2]
     */
    

    range策略是基于每个主题的,对于每个主题,我们以数字顺序排列可用分区,以字典顺序排列消费者。然后,将分区数量除以消费者总数,以确定分配给每个消费者的分区数量。如果没有平均划分(PS:除不尽),那么最初的几个消费者将有一个额外的分区。

  • range分配策略针对的是主题(PS:这里所说的分区指的某个主题的分区,消费者指的是订阅这个主题的消费者组中的消费者实例)
  • 首先,将分区按数字顺序排行序,消费者按消费者名称的字典序排好序
  • 然后,用分区总数除以消费者总数。如果能够除尽,则皆大欢喜,平均分配;若除不尽,则位于排序前面的消费者将多负责一个分区
  • 例子

    假设有两个消费者C0和C1,两个主题t0和t1,并且每个主题有3个分区,分区的情况是这样的:t0p0,t0p1,t0p2,t1p0,t1p1,t1p2

    那么,基于以上信息,最终消费者分配分区的情况是这样的:

    C0: [t0p0, t0p1, t1p0, t1p1]

    C1: [t0p2, t1p2]

    对于主题t0,分配的结果是C0负责P0和P1,C1负责P2;对于主题t2,也是如此
    874963-20180917175720708-538297823.png

    public Map assign(Map partitionsPerTopic,
                                                        Map subscriptions) {
        //    主题与消费者的映射                                                            
        Map consumersPerTopic = consumersPerTopic(subscriptions);
        Map assignment = new HashMap();
        for (String memberId : subscriptions.keySet())
            assignment.put(memberId, new ArrayList());
    
        for (Map.Entry topicEntry : consumersPerTopic.entrySet()) {
            String topic = topicEntry.getKey();    //    主题
            List consumersForTopic = topicEntry.getValue();    //    消费者列表
    
            //    partitionsPerTopic表示主题和分区数的映射
            //    获取主题下有多少个分区
            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
            if (numPartitionsForTopic == null)
                continue;
    
            //    消费者按字典序排序
            Collections.sort(consumersForTopic);
    
            //    分区数量除以消费者数量
            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
            //    取模,余数就是额外的分区
            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
    
            List partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
            for (int i = 0, n = consumersForTopic.size(); i  consumersWithExtraPartition ? 0 : 1);
                //    分配分区
                assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
            }
        }
        return assignment;
    }
    

    roundrobin(轮询)

    roundronbin分配策略的具体实现是org.apache.kafka.clients.consumer.RoundRobinAssignor

    轮询分配策略是基于所有可用的消费者和所有可用的分区的,与前面的range策略最大的不同就是它不再局限于某个主题

    如果所有的消费者实例的订阅都是相同的,那么这样最好了,可用统一分配,均衡分配

    例如,假设有两个消费者C0和C1,两个主题t0和t1,每个主题有3个分区,分别是t0p0,t0p1,t0p2,t1p0,t1p1,t1p2

    那么,最终分配的结果是这样的:

    C0: [t0p0, t0p2, t1p1]

    C1: [t0p1, t1p0, t1p2]

    用图形表示大概是这样的:
    874963-20180917183049428-1942144917.png

    rebalance

    • Rebalance

    rebalance本质上是一种协议,规定了一个consumer group下的所有consumer如何达成一致来分配订阅topic的每个分区。比如某个group下有20个consumer,它订阅了一个具有100个分区的topic。正常情况下,Kafka平均会为每个consumer分配5个分区。这个分配的过程就叫rebalance。

    rebalance时机

    • 组成员发生变更(新consumer加入组、已有consumer主动离开组或已有consumer崩溃了)
    • 订阅主题数发生变更——如果使用了正则表达式的方式进行订阅,那么新建匹配正则表达式的topic就会触发rebalance
    • 订阅主题的分区数发生变更
    • consumer调用unsubscrible(),取消topic的订阅

    rebalance分区分配

    之前提到了group下的所有consumer都会协调在一起共同参与分配,这是如何完成的?Kafka新版本consumer默认提供了两种分配策略:range和round-robin。当然Kafka采用了可插拔式的分配策略,你可以创建自己的分配器以实现不同的分配策略。实际上,由于目前range和round-robin两种分配器都有一些弊端,Kafka社区已经提出第三种分配器来实现更加公平的分配策略,只是目前还在开发中。我们这里只需要知道consumer group默认已经帮我们把订阅topic的分区分配工作做好了就行了。

    简单举个例子,假设目前某个consumer group下有两个consumer: A和B,当第三个成员加入时,kafka会触发rebalance并根据默认的分配策略重新为A、B和C分配分区,如下图所示:

    735367-20161226175710289-1164779517.png

    执行rebalance

    Kafka提供了一个角色:coordinator来执行对于consumer group的管理。坦率说kafka对于coordinator的设计与修改是一个很长的故事。最新版本的coordinator也与最初的设计有了很大的不同。这里只提及两次比较大的改变

    首先是0.8版本的coordinator,那时候的coordinator是依赖zookeeper来实现对于consumer group的管理的。Coordinator监听zookeeper的/consumers//ids的子节点变化以及/brokers/topics/数据变化来判断是否需要进行rebalance。

    group下的每个consumer都自己决定要消费哪些分区,并把自己的决定抢先在zookeeper中的/consumers//owners//下注册。很明显,这种方案要依赖于zookeeper的帮助,而且每个consumer是单独做决定的,没有那种“大家属于一个组,要协商做事情”的精神。

    0.9版本的kafka改进了coordinator的设计,提出了group coordinator——每个consumer group都会被分配一个这样的coordinator用于组管理和位移管理。

    这个group coordinator比原来承担了更多的责任,比如组成员管理、位移提交保护机制等。当新版本consumer group的第一个consumer启动的时候,它会去和kafka server确定谁是它们组的coordinator。之后该group内的所有成员都会和该coordinator进行协调通信。这种coordinator设计不再需要zookeeper了,性能上可以得到很大的提升。

    如何确定coordinator

    • 确定consumer group位移信息写入__consumers_offsets的哪个分区。具体计算公式:
      • __consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
      • groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。
    • 该分区leader所在的broker就是被选定的coordinator

    Rebalance Generation

    JVM GC的分代收集就是这个词(严格来说是generational),它表示了rebalance之后的一届成员,主要是用于保护consumer group,隔离无效offset提交的。

    比如上一届的consumer成员是无法提交位移到新一届的consumer group中。我们有时候可以看到ILLEGAL_GENERATION的错误,就是kafka在抱怨这件事情。每次group进行rebalance之后,generation号都会加1,表示group进入到了一个新的版本,如下图所示: Generation 1时group有3个成员,随后成员2退出组,coordinator触发rebalance,consumer group进入Generation 2,之后成员4加入,再次触发rebalance,group进入Generation 3

    735367-20161226175822570-898409869.png

    协议(protocol)

    前面说过了, rebalance本质上是一组协议。group与coordinator共同使用它来完成group的rebalance。目前kafka提供了5个协议来处理与consumer group coordination相关的问题:

    • Heartbeat请求:consumer需要定期给coordinator发送心跳来表明自己还活着
    • LeaveGroup请求:主动告诉coordinator我要离开consumer group
    • SyncGroup请求:group leader把分配方案告诉组内所有成员
    • JoinGroup请求:成员请求加入组
    • DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等。通常该请求是给管理员使用

    Coordinator在rebalance的时候主要用到了前面4种请求。

    liveness

    consumer如何向coordinator证明自己还活着?

    通过定时向coordinator发送Heartbeat请求。如果超过了设定的超时时间,那么coordinator就认为这个consumer已经挂了。一旦coordinator认为某个consumer挂了,那么它就会开启新一轮rebalance,并且在当前其他consumer的心跳response中添加“REBALANCE_IN_PROGRESS”,告诉其他consumer:不好意思各位,你们重新申请加入组吧!

    Rebalance过程

    总体而言,rebalance分为2步:Join和Sync

  • Join, 顾名思义就是加入组。这一步中,所有成员都向coordinator发送JoinGroup请求,请求入组。一旦所有成员都发送了JoinGroup请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader——注意leader和coordinator不是一个概念,leader负责消费分配方案的制定。
  • Sync,这一步leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给coordinator,非leader也会发SyncGroup请求,只是内容为空。coordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。
  • 首先是加入组的过程:
    735367-20161226175922086-1237318351.png

    在coordinator收集到所有成员请求前,它会把已收到请求放入一个叫purgatory(炼狱)的地方。
    然后是分发分配方案的过程,即SyncGroup请求:

    735367-20161226180005242-1302422077.png

    consumer group状态机

    和很多kafka组件一样,group也做了个状态机来表明组状态的流转。coordinator根据这个状态机会对consumer group做不同的处理,如下图所示
    735367-20161226180046945-1657832046.png

    简单说明下图中的各个状态:

    • Dead:组内已经没有任何成员的最终状态,组的元数据也已经被coordinator移除了。这种状态响应各种请求都是一个response: UNKNOWN_MEMBER_ID
    • Empty:组内无成员,但是位移信息还没有过期。这种状态只能响应JoinGroup请求
    • PreparingRebalance:组准备开启新的rebalance,等待成员加入
    • AwaitingSync:正在等待leader consumer将分配方案传给各个成员
    • Stable:rebalance完成!可以开始消费了

    rebalance场景剖析

    新成员加入组(member join)

    735367-20180122172838209-863721577.png

    组成员崩溃(member failure)

    前面说过了,组成员崩溃和组成员主动离开是两个不同的场景。因为在崩溃时成员并不会主动地告知coordinator此事,coordinator有可能需要一个完整的session.timeout周期才能检测到这种崩溃,这必然会造成consumer的滞后。可以说离开组是主动地发起rebalance;而崩溃则是被动地发起rebalance

    735367-20180122172921209-2006292699.png

    组成员主动离组(member leave group)

    735367-20180122172958600-838820663.png

    提交位移(member commit offset)

    735367-20180122173024959-506110104.png

    避免不必要的Rebalance

    除去consumer正常的添加和停掉导致rebalance外,在某些情况下,Consumer 实例会被 Coordinator 错误地认为 “已停止” 从而被“踢出”Group,导致rebalance,这种情况应该避免

    情况一:

    未能及时发送心跳,导致 Consumer 被 “踢出”Group 而引发的

    • 设置 session.timeout.ms = 6s。
    • 设置 heartbeat.interval.ms = 2s。
    • 要保证 Consumer 实例在被判定为 “dead” 之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。

    情况二:

    Consumer 消费时间过长导致的

    max.poll.interval.ms 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点

    rebalance影响

  • 可能重复消费: Consumer被踢出消费组,可能还没有提交offset,Rebalance时会Partition重新分配其它Consumer,会造成重复消费,虽有幂等操作但耗费消费资源,亦增加集群压力

  • 集群不稳定:Rebalance扩散到整个ConsumerGroup的所有消费者,因为一个消费者的退出,导致整个Group进行了Rebalance,并在一个比较慢的时间内达到稳定状态,影响面较大

  • 影响消费速度:频繁的Rebalance反而降低了消息的消费速度,大部分时间都在重复消费和Rebalance

  • 总结

    本次主要讲解了kafka的总体基本架构和原理,并从生产者和消费者角度分别介绍了kafka的架构,便于大家从不同角度去理解这个高性能消息队列。

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论