前言
kafka 官方文档浅读,好吃力。已经有大半个月没有更新了,记录一下kafak相关的内容。
konwlage
Kafka不是一个传统的消息传递系统,而更类似于一个持久性的分布式发布订阅消息日志。
那么kafka一般将数据持久化在哪里呢?
kafka的持久化数据存储通常存储在称为"日志目录"(log directory)的文件系统路径上。
每个Kafka主题(topic)都有一个或多个分区(partition),每个分区在磁盘上都有一个对应的日志目录。这些日志目录包含了分区中的消息数据。
Kafka的持久化存储是通过以下方式实现的:
消息日志: 每个分区都有一个消息日志,它是一个不断追加的顺序文件,其中包含了所有已发布到该分区的消息。这些消息在磁盘上持久存储,确保数据的持久性。 分段日志(Segmented Logs): 为了管理长期的消息存储,Kafka会将消息日志分成多个分段(segment),每个分段都有一个最大大小限制。当一个分段达到其大小限制时,Kafka会创建一个新的分段,从而实现了消息数据的分段存储。旧的分段在满足一定条件后可以被清理或删除。 索引文件: 为了支持高效的消息检索,Kafka还维护了索引文件,它们包含了消息偏移量(offset)与消息在日志文件中位置的映射关系,以便快速查找特定消息。
一般我们都认为存储系统中对磁盘的读写较慢,那么kafak使用文件系统来作为存储形式。它的优势在哪里?
- 性能优势: Kafka利用了现代操作系统对于文件系统缓存的高度优化,将消息数据写入文件系统,充分利用了操作系统的页缓存。这意味着所有的读写操作都可以在内存中进行,通常是O(1)时间复杂度,因此不会受到磁盘性能的严重限制。这种设计能够充分利用主内存和高速缓存,从而提供了高吞吐量和低延迟的性能。
- 容量扩展性: 文件系统的使用允许Kafka轻松扩展存储容量。只需添加更多的硬盘驱动器或扩展文件系统,就可以处理更多的数据,而不需要更改应用程序代码
对于kafka来说,它较为经常使用的数据结构是什么?
Kafka采用了一种设计,将消息立即写入文件系统的持久日志中,而不是依赖复杂的B树等数据结构。这种设计使得所有操作都是O(1),即常数时间。
通过将消息写入追加日志,Kafka充分利用了现代操作系统对于文件系统缓存的优化,从而将读写操作加速到接近内存速度。这种设计非常适合处理实时数据流,因为它提供了高吞吐量和低延迟的性能,同时保持数据的持久性。
那Kafka在提高效率方面一般都采用了什么方式?
sendfile
系统调用来实现零拷贝数据传输。这通过避免多次复制和系统调用,从文件到套接字的数据传输路径变得高效,提高了数据传输的速度。这意味着数据只需从磁盘读取到页缓存一次,然后可以在多次消费中重复使用,而无需每次读取时都将数据复制到用户空间。消息集,它是不是一压缩操作呢?那么如何保证在网络传输的过程中保证其完整性呢?
不一定,虽然压缩可以是一种将多个消息组合在一起的方式之一,但消息集(Message Set)的抽象并不仅仅用于压缩操作。消息集的主要目的是将多个消息组织在一起,以减少网络通信的开销和提高传输效率。
- Kafka使用一种特定的二进制消息格式,这种格式包括消息的元数据和消息体
元数据包括消息的偏移量、大小、时间戳等信息。这个消息格式是标准化的,生产者、代理和消费者都使用相同的格式来编码和解码消息。
- Kafka使用复制机制来确保消息的可靠性。每个分区通常都有多个副本,这些副本分布在不同的服务器上。Kafka使用同步复制和ISR(In-Sync Replicas)的概念来确保消息被正确复制并保持一致性。只有当所有的副本都确认接收消息后,生产者才会认为消息已成功发送。
Kafka提供了一种高效的批处理压缩格式。多个消息可以被组合成一个批次,然后以压缩形式发送到服务器。这个消息批次将以压缩形式写入,并在日志中保持压缩状态,只有在消费者端才会进行解压缩。
Kafka中的生产者(Producer)以及如何实现负载均衡?
随机选择,实现一种随机的负载均衡
通过某种语义分区函数来实现
Kafka通过允许用户指定一个分区关键字并使用该关键字进行哈希分区来公开语义分区的接口
如果选择的关键字是用户ID,那么所有属于同一个用户的数据将被发送到同一个分区。这将允许消费者对其消费行为做出与数据局部性有关的假设。这种分区方式专门设计用于允许消费者进行局部感知的处理。
kafak 的异步发送消息
通过,我们在编写插入sql语句的时候,如果遇到大量的插入我们会选择批量插入,来提高效率和减少与数据库I/O的次数。
Kafka的生产者支持异步发送消息,其中批处理是提高效率的关键因素。为了启用批处理,Kafka生产者会尝试在内存中累积数据,并在单个请求中发送较大的消息批次。
具体的批处理配置包括:
- 累积消息数量:可以配置生产者累积一定数量的消息,然后将它们一起发送。
- 最大等待延迟:可以配置生产者等待一定的时间,然后发送已累积的消息,即使消息数量未达到设定的阈值。
这种批处理机制有助于减少I/O操作次数,提高了性能和效率。用户可以根据应用场景的需求来配置批处理的参数,以在吞吐量和延迟之间找到平衡点。
kafka如何有效的管理获取的数据?
Kafka的消费者通过发出获取请求来获取要消费的分区的数据。这些获取请求包含了消费者在分区日志中的偏移量,以确定从哪个位置开始获取数据。
消费者可以随时控制偏移量的位置,这意味着它可以自由地重播数据,如果需要的话。这对于处理消息丢失或者重新处理数据非常有用。
这种获取机制允许消费者灵活地管理其消费进度,并确保它们能够获取到他们感兴趣的特定消息。
Kafka如何选择的数据传输方式?
Kafka 遵循大多数消息传递系统所共享的更传统的设计,其中数据从生产者推送到代理并由消费者从代理中提取。 --- 主动拉取数据
我们都知道,消息队列的的传输方式一般分为:
- 推送:
在推模型中,数据由生产者主动推送到代理(broker),然后再由代理主动推送给消费者。
优势:
缺点:
- 拉去:
拉模型中,数据由生产者主动推送到代理(broker),消费者主动从代理拉取数据。
优点:
缺点:
Kafka如何跟踪消息消费状态?
通常,当消息传递到了代理中,如何确保消息消费的进度,一般代理会保存代理上已使用的消息的元数据,也就是说,当消息被分发给消费者时,代理要么立即在本地记录该事实,要么等待消费者的确认。
但是这样设计也可以存在问题:
代理在每次通过网络分发消息时立即将其记录为已消费,那么如果消费者无法处理该消息(例如因为崩溃或请求超时或其他原因),该消息将丢失。
解决这个问题,是通过添加了确认功能,这意味着消息在发送时仅被标记为已发送,而不是被消费;代理等待消费者的特定确认以将消息记录为已消费
但是还这样设计还存在另一个问题:
首先,如果消费者处理消息但在发送确认之前失败,那么该消息将被消费两次。
假设消费者成功接收并处理了消息,但在发送确认之前经历了故障,如崩溃、网络问题或超时。在这种情况下,代理并不知道消息已被消费,因为它没有收到确认。因此,代理会将消息重新发送给其他消费者,以确保消息得到处理。这导致了消息被消费两次的情况。
第二个问题,如果消息已被发送但从未收到确认,代理必须决定如何处理这些消息。这可能需要维护额外的状态信息。
解决方式:
传统消息系统必须处理上述问题,通常采用以下方式来解决:
- 消息去重: 针对消息重复的问题,消息系统需要实施去重机制,以确保即使消息被多次消费,最终结果也是一致的。这通常需要在消费者端进行处理。
- 处理未确认消息: 对于未确认消息,系统可能需要引入一种超时机制,以便在一定时间内没有收到确认时,将消息重新发送给消费者或采取其他措施。
- 维护多个状态: 管理多个状态可能需要更多的内存和计算资源,因此需要仔细考虑性能和资源消耗。
而kafka的设置是:
Kafka的消息确认方式:
- Kafka将主题(Topic)分成一组完全有序的分区(Partitions)。
- 每个分区在任何给定时刻都由一个消费者组中的一个消费者消费。
- 每个消费者在分区中都有一个消费位置,这个位置表示它要消费的下一条消息的偏移量(Offset)。
- Kafka通过跟踪每个消费者在每个分区中的偏移量来维护消费状态。
- 这种状态非常小,因为只需一个整数来表示每个分区中的消费位置。
- 这个状态可以定期检查和更新,使消息确认的成本非常低。
因为通过偏移量来进行相应的数据移动,也就意味着:
- 它们可以故意将自己的消费位置回退到旧的偏移量,并重新消费之前的数据。
kafka的消息传递语义?
- At most once—Messages may be lost but are never redelivered. (最多一次传递一次消息,如果丢失kafka不会处理,即使消息在传递过程中丢失)
- At least once—Messages are never lost but may be redelivered. (至少一次传递消息,kafka会确保消息一定送达,它可以重新传递消息)
- Exactly once—this is what people actually want, each message is delivered once and only once.(恰好一次 kafka认为能够保证消息一定能否完成的进行传递。这种是理想状态)
以上问题需要从两个方面进行讨论:
1、发布消息的持久性保证 2、消费消息时的保证
我们来看看 kafak 时如何保证消息的持久性的吧!
从生产者角度来看
当 kafka
发送消息时,消息被提交到日志中,一旦提交了已发布的消息,只要有分区去复制消息就意味着消息将不会丢失。但是,有一个问题,无法确定错误时在消息提交之前发送还是消息提交之后。
- Kafka的语义是比较直观的。当发布一条消息时,有一个概念,即消息被“提交”到日志中。一旦消息被提交,只要复制该消息的分区的至少一个代理(broker)仍然处于"alive"状态,该消息就不会丢失。
那么如何解决这个就是一个问题,在0.11.0.0 版本之前,kafka使用重新发送来解决消息丢失等问题。
- 这里提供了至少一次的消息传递语义
但是,这样也会有新的问题出现就是消息的重复插入,因为有可能原始请求实际已经成功,那么重新发送的消息则会再次写入日志中。kafka 在0.11.0.0 版本之后(包括该版本)支持幂等插入,通常是为消息附加一个 id 值,并使用该 id 来删除重复的消息。或者使用事务来进行处理
- 幂等操作
- 事务处理
从消费者角度来看
所有的副本都是具有完全相同日志和偏移量,那么如果有一个消费者崩溃,消费者要如何处理消息并更新消息的位置呢?
- 读取消息,保存位置,然后处理消息
- 读取消息:首先,消费者从Kafka主题中读取一批消息。
- 保存位置:在读取这些消息后,消费者会记录下当前已处理的最后一条消息的位置(偏移量)。这个位置通常存储在消费者的内存中,以便知道从哪里继续处理下一批消息。
- 处理消息:接下来,消费者开始处理这批消息,执行其业务逻辑等操作。
现在,让我们考虑一种可能的情况:如果在消费者已经读取了消息、保存了位置,但在完成消息处理之前,消费者遇到了故障或崩溃,那么在这种情况下,下一个接管消息处理任务的新进程将从上次保存的位置开始处理消息。尽管在这个位置之前的一些消息已经被读取,但由于尚未完成处理,这些消息可能不会被标记为已处理。
- 读取消息,处理消息,然后保存位置
- 读取消息:首先,消费者从Kafka主题中读取一批消息。
- 处理消息:接下来,消费者开始处理这批消息,执行其业务逻辑等操作。
- 保存位置:在消息处理完成后,消费者会记录下当前已处理的最后一条消息的位置(偏移量)。这个位置通常存储在消费者的内存中,以便知道从哪里继续处理下一批消息
现在,让我们考虑一种可能的情况:如果在消费者已经读取了消息并成功处理完它们后,但在保存处理位置之前,消费者遇到了故障或崩溃,那么在这种情况下,下一个接管消息处理任务的新进程将重新处理从上次保存的位置开始的消息。这意味着一些已经被成功处理的消息可能会被再次处理。
以上两种情况对应于"至多一次"语义,消息将会被重新发送。这种处理方式通常用于那些可以容忍偶尔处理重复消息的应用场景,例如在消息处理操作是幂等的情况下。幂等意味着多次应用相同的操作不会产生不同的结果,因此重复处理相同的消息不会引发问题。但需要注意,对于某些应用场景,特别是需要严格的消息处理语义的情况,可能需要使用其他处理方式
那如果时恰好一次语义,可以通过Kafka 0.11.0.0版本中引入的事务性生产者功能。消费者的位置信息存储为一条消息写入Kafka的方式,因此可以将偏移量与处理后的数据一起在同一事务中写入Kafka。如果事务中止,消费者的位置将回滚到旧值,同时输出主题上的数据对其他消费者不可见,这取决于它们的“隔离级别”。在默认的“read_uncommitted”隔离级别下,即使消息属于已中止的事务,所有消息对消费者都是可见的,但在“read_committed”下,消费者将只返回已提交的事务中的消息(以及不属于事务的消息)。
将数据从Kafka消费者写入到外部系统时,面临的一些协调和一致性的问题。例如,在将数据从Kafka主题读取后,消费者需要将这些数据写入外部系统,比如HDFS或其他存储系统。此时面临的问题是如何协调消费者当前的处理位置(偏移量)与将数据写入外部系统的存储位置之间的一致性。
- 传统的两阶段提交:一种传统的方法是引入两阶段提交,其中第一阶段涉及将数据写入外部系统,第二阶段涉及将消费者的位置(偏移量)进行更新。这确保了数据写入和位置的更新是原子性的,要么都成功,要么都失败。但这种方法可能在某些情况下变得复杂,尤其是当外部系统不支持两阶段提交时。
- 更简单的方法:更简单和通用的方法是将消费者的偏移量与输出数据存储在同一位置。这意味着消费者在将数据写入外部系统的同时,也将自己的处理位置(偏移量)写入相同的位置。这样做的好处是许多外部系统都不支持复杂的两阶段提交,而这种方法避免了这种复杂性。
复制机制
Kafka允许将每个主题的分区的日志复制到可配置数量的服务器上(可以在每个主题的基础上设置复制因子,默认复制因子为1)。这样可以在服务器集群中的节点发生故障时,自动切换到这些副本,以确保消息在故障发生时仍然可用。
在Kafka中,复制的基本单位是主题分区。在正常情况下,每个分区都有一个单一的领导者(leader)和零个或多个追随者(follower)。复制因子包括领导者在内的所有副本。所有写操作都发送到分区的领导者,而读操作可以发送到分区的领导者或追随者。
追随者可以像读取kafka的消费者一样消费来自领导的消息,并将其应用到自己的日志。这样做的好处就是让追随者从领导者那里拉取有一个很好的特性,即允许追随者自然地将他们应用于其日志的日志条目分批在一起。
kafka通过一个名为“控制器”的特殊节点来负责管理集群中broker(代理)的注册。而且broker需要满足以下条件:
- 代理必须与控制器保持活动会话,以便定期接收元数据更新。 作为追随者的经纪人必须复制领导者的写入,并且不能落后“太远”。
- “活动会话”的含义取决于集群配置。
- 对于 KRaft 集群,通过向控制器发送定期心跳来维护活动会话。如果控制器在broker.session.timeout.ms配置的超时时间到期之前未能收到心跳,则该节点被视为离线。
- 对于使用 Zookeeper 的集群,活动性是通过临时节点的存在来间接确定的,临时节点是由代理在其 Zookeeper 会话初始化时创建的。如果代理在 Zookeeper.session.timeout.ms 到期之前未能向 Zookeeper 发送心跳后丢失会话,则该节点将被删除。然后,控制器将通过 Zookeeper 监视注意到节点删除,并将代理标记为离线。
我们将满足这两个条件的节点称为“同步”,以避免“活动”或“失败”的模糊性。
同步副本(ISR):满足上述两个存活性条件的节点被称为“同步副本”(ISR)。领导者会跟踪“同步副本”的集合,这就是ISR。如果这两个条件中的任何一个不能满足,那么代理将被从ISR中移除。例如,如果追随者失败,那么控制器将注意到会话丢失并将代理从ISR中移除。
总结:Kafka 提供的保证是,只要至少有一个同步副本始终处于活动状态,已提交的消息就不会丢失。
复制日志:仲裁、ISR 和状态机
Kafka分区的核心:Kafka中的分区实际上是一个复制日志(replicated log)。这个复制日志是分布式数据系统中的基本原语之一,可以用于实现其他基于状态机的分布式系统。
当然,在分区复制日志的时候可能存在一个问题,即领导者崩溃,我们需要从追随者中选取一个新的领导者,如果选择所需的确认数量和必须比较的日志数量来选举领导者,以确保存在重叠,那么这称为仲裁。
通常别的系统会采用投票的方式进行。多数投票的缺点是,不需要多次失败就会导致没有可供选举的领导人。容忍一次故障需要三份数据,容忍两次故障需要五份数据。
Kafka采用了一种稍微不同的方法来选择其“ISR”(In-Sync Replicas)集合,这是与领导者竞选有关的。Kafka动态地维护一个与领导者同步的副本集合,只有这个集合中的成员才有资格竞选领导者。写入Kafka分区的消息只有在所有同步副本都接收到该消息后才被视为已提交。ISR集合在群集元数据中持久存在,每当它发生变化时都会进行更新。
- Kafka的设计允许客户端选择是否阻塞消息提交的确认,以及更低的复制因子带来的额外吞吐量和磁盘空间节省是否值得。
- Kafka的设计不要求崩溃的节点恢复所有数据。这是因为磁盘错误是实际运行中的持久性数据系统中最常见的问题,即使磁盘错误不会使数据完全丢失,也不希望要求在每次写操作上都使用fsync,因为这可能会降低性能。
- 文中提到了Kafka在领导者选举中的一种行为,即在所有副本均失效的情况下。Kafka默认情况下会等待一个一致的副本恢复,以保持一致性,但也提供了一种不等待的方式,以支持某些需要更高可用性而愿意牺牲一致性的用例。
可用性和耐用性保证
kafka 在写入数据时,生产者会选择是否等待副本确认消息。
在Kafka中,可以通过acks
参数来指定这个确认的级别,具体有以下几个选项:
注意:
- 默认情况下,当使用acks=all时,确认会在所有当前的同步副本都接收到消息后发生。这意味着如果某个主题配置为只有两个副本,而一个副本失效(即只剩下一个同步副本),那么使用acks=all的写入将成功。然而,如果剩下的同步副本也失败,这些写入可能会丢失。
- 为了优先选择消息的可靠性而不是可用性,Kafka提供了两个主题级别的配置选项:
- 禁用不干净的领导者选举:如果所有副本都不可用,那么分区将保持不可用,直到最近的领导者重新可用。这实际上更倾向于牺牲可用性来降低消息丢失的风险。
- 指定最小的ISR大小:分区只会接受写入,如果ISR的大小高于一定的最小值,以防止消息被写入只有一个副本的情况下,该副本后来变为不可用。这个设置只会在生产者使用acks=all的情况下生效,它保证了消息将被至少这么多个同步副本确认。这个设置提供了一种一致性和可用性之间的权衡,较高的最小ISR大小可以提供更好的一致性,但会降低可用性,因为如果同步副本的数量降到最小阈值以下,分区将不可用于写入。
日志压缩
日志压缩是一种用于数据保留的机制,确保Kafka始终保留每个消息键的最新已知值,而不仅仅是按照时间或大小固定周期丢弃旧的日志数据。
通常情况下,日志数据保留可以采用两种不同的方式,一种是按照固定时间或大小丢弃旧的数据,适用于临时事件数据,如日志记录。另一种方式是日志压缩,适用于具有主键的可变数据的变更日志,例如数据库表的更改。
文本指出,如果我们保留完整的日志并尝试记录每个更改,那么对于频繁更新单个记录的系统来说,这将不切实际,因为日志会无限增长。而日志压缩允许有选择地删除具有相同主键的较早记录,以确保至少保留每个键的最后状态。
Kafka允许根据主题设置保留策略,这意味着对于某些主题,可以采用时间或大小为基础的数据保留策略,而对于其他主题,可以使用日志压缩保留策略。
日志压缩基础知识
日志的头部与传统的 Kafka 日志相同。它具有密集、连续的偏移量并保留所有消息。日志压缩添加了处理日志尾部的选项。上图显示了一根带有压紧尾部的原木。请注意,日志尾部的消息保留首次写入时分配的原始偏移量,并且永远不会改变。另请注意,即使具有该偏移量的消息已被压缩,所有偏移量仍保留在日志中的有效位置;在这种情况下,该位置与日志中出现的下一个最高偏移量无法区分。
日志压缩还支持删除操作。如果一条消息具有一个键(key)和一个空的有效载荷(payload),则它将被视为从日志中删除。这样的记录有时被称为墓碑(tombstone)。这个删除标记将导致任何具有相同键的先前消息被删除(以及任何具有相同键的新消息),但删除标记本身在一段时间后会被清理出日志,以释放空间。不再保留删除操作的时间点被标记为上图中的“删除保留点”。
压缩操作是在后台进行的,定期复制日志段。清理操作不会阻塞读取,并且可以通过限制使用不超过可配置的I/O吞吐量来进行节流,以避免影响生产者和消费者。实际的压缩日志段过程类似于文本中所描述的步骤。
kafka 实施
Kafka的网络层实现:使用了Java的NIO技术来处理网络通信。
Kafka网络层的线程模型。它包括一个单独的接收器线程(acceptor thread)和N个处理器线程(processor threads)。这些处理器线程负责处理一定数量的连接。这种设计在其他地方已经经过充分的测试,被证明实现简单且快速。
Kafka消息格式和记录批次的实现细节:
Messages(消息)
消息由可变长度标头、可变长度不透明密钥字节数组和可变长度不透明值字节数组组成。
消息(又名记录)始终是批量写入的。一批消息的技术术语是记录批次,记录批次包含一条或多条记录。
Kafka中消息的存储和索引方式:
写入(Writes):
读取(Reads):
删除(Deletes):
保证(Guarantees):
消费者偏移量跟踪
写在最后:
有些乱,后面有空的时候再去啃啃kafka,再整理和更新一下文章