消息太大,kafka受不了

2023年 8月 22日 83.2k 0

前言

上周在进行自测的时候,kafka抛出一个RecordTooLargeException异常,从名字我们可以直接看出是消息太大了,导致发不出去而抛出异常,那么怎么应该怎么解决这个问题呢,其实很简单,要么将消息拆分得小一点,要么调节kafka层面的参数,依然它抛出这个异常,那么就证明超过了某个参数的阈值,由此我们可以有两种方式来处理这个问题,但是一切还要从我们的业务背景和数据结构去看这个问题。

业务背景

我们这边会将数据写入文件,通过FTP的方式,没产生数据,就往FTP里面追加,而这些数据都是需要保证不丢失的,由于业务的发展,我这边需要专门去处理这些文件,然后通过kafka投递给下游系统,所以自然需要解析文件,还得一条一条的解析后发送。

问题出现

一开始我看到文件都比较小,所以处理方式是只有这个文件的数据全部解析完成并成功投递kafka,那么我这边才记录这个文件处理成功,但是处理了很多个大文件过后,发现数据条数对不上,看日志是RecordTooLargeException异常,因为上面的处理方式是文件处理完成并全部投递到kafka才记录文件解析完成,所以这是有问题的,一个大文件可能有即使上百万条数据,难免会遇到很大的数据,所以只要一条没解析成功,那么后面的数据就不去解析了,这个文件就不算解析成功,所以应该要设计容错,并对数据进行监控和补偿。

处理问题

在得知是某些数据过大的问题,我就DEBUG去看源码,在kafka生产端的KafkaProducer类中,发现问题出在下面这方法中。


ensureValidRecordSize方法就是对消息的大小进行判断的,参数size就是我们所发送的消息的字节数,maxRequestSize就是允许消息的最大字节,因为没有进行设置,所以这个值使用的是默认值,默认为1M,所以就应该将maxRequestSize这个参数进行重新设置。

因为我们使用的是SpringBoot开发,于是通过yml方式配置,但是发现spring-kafka没提示这个属性,于是只有写一个Kafka的配置类,然后再读取yml文件内容进行配置

配置类

yml文件

通过上面的配置后,我们看到我将max.request.size参数的值设置为10M,这需要根据实际情况来,因为我在处理的过程中发现像比较大的数据行也只有个别。

如果在实际使用过程中数据比较大,那么可能需要拆分数据,不过如果数据不能拆分,那么我们应该考虑消息压缩方式,将数据压缩后再发送,然后在消费者进行解压,不过这种压缩是我们自己实现的,并不是kafka层面的压缩,kafka本身也提供了压缩功能,有兴趣可以了解一下。

扩展

上面设置了max.request.size参数,我们在上面的截图代码中看到第二个判断中有一个参数totalMemorySize,这个值是缓冲区大小,我们发送的消息并不会马上发送kafka服务端,而是会先放在内存缓冲区,然后kafka通过一个线程去取,然后发送,可通过buffer.memory设置,这个值的默认值为32M,所以我们在设置max.request.size的时候也要考虑一下这个值。

总结

有必要对kafka进行比较深一点的学习,这样在出现问题的时候能够快速定位,并且合理解决,当然,在业务处理的时候要充分考虑可能出现的问题,做好容错和相应的补偿方案。

今天的分享就到这里,感谢你的观看,我们下期见

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论