前言
上周在进行自测的时候,kafka抛出一个RecordTooLargeException异常,从名字我们可以直接看出是消息太大了,导致发不出去而抛出异常,那么怎么应该怎么解决这个问题呢,其实很简单,要么将消息拆分得小一点,要么调节kafka层面的参数,依然它抛出这个异常,那么就证明超过了某个参数的阈值,由此我们可以有两种方式来处理这个问题,但是一切还要从我们的业务背景和数据结构去看这个问题。
业务背景
我们这边会将数据写入文件,通过FTP的方式,没产生数据,就往FTP里面追加,而这些数据都是需要保证不丢失的,由于业务的发展,我这边需要专门去处理这些文件,然后通过kafka投递给下游系统,所以自然需要解析文件,还得一条一条的解析后发送。
问题出现
一开始我看到文件都比较小,所以处理方式是只有这个文件的数据全部解析完成并成功投递kafka,那么我这边才记录这个文件处理成功,但是处理了很多个大文件过后,发现数据条数对不上,看日志是RecordTooLargeException异常,因为上面的处理方式是文件处理完成并全部投递到kafka才记录文件解析完成,所以这是有问题的,一个大文件可能有即使上百万条数据,难免会遇到很大的数据,所以只要一条没解析成功,那么后面的数据就不去解析了,这个文件就不算解析成功,所以应该要设计容错,并对数据进行监控和补偿。
处理问题
在得知是某些数据过大的问题,我就DEBUG去看源码,在kafka生产端的KafkaProducer类中,发现问题出在下面这方法中。
ensureValidRecordSize方法就是对消息的大小进行判断的,参数size
就是我们所发送的消息的字节数,maxRequestSize
就是允许消息的最大字节,因为没有进行设置,所以这个值使用的是默认值,默认为1M,所以就应该将maxRequestSize
这个参数进行重新设置。
因为我们使用的是SpringBoot开发,于是通过yml方式配置,但是发现spring-kafka没提示这个属性,于是只有写一个Kafka的配置类,然后再读取yml文件内容进行配置
配置类
yml文件
通过上面的配置后,我们看到我将max.request.size参数的值设置为10M,这需要根据实际情况来,因为我在处理的过程中发现像比较大的数据行也只有个别。
如果在实际使用过程中数据比较大,那么可能需要拆分数据,不过如果数据不能拆分,那么我们应该考虑消息压缩方式,将数据压缩后再发送,然后在消费者进行解压,不过这种压缩是我们自己实现的,并不是kafka层面的压缩,kafka本身也提供了压缩功能,有兴趣可以了解一下。
扩展
上面设置了max.request.size参数,我们在上面的截图代码中看到第二个判断中有一个参数totalMemorySize,这个值是缓冲区大小,我们发送的消息并不会马上发送kafka服务端,而是会先放在内存缓冲区,然后kafka通过一个线程去取,然后发送,可通过buffer.memory
设置,这个值的默认值为32M,所以我们在设置max.request.size的时候也要考虑一下这个值。
总结
有必要对kafka进行比较深一点的学习,这样在出现问题的时候能够快速定位,并且合理解决,当然,在业务处理的时候要充分考虑可能出现的问题,做好容错和相应的补偿方案。
今天的分享就到这里,感谢你的观看,我们下期见