EMS
Extend MySQL Stream;
一种基于 MySQL 实现的 stream 队列.
功能
写入设计
msg id 就是 topic 维度的自增 id,可对多个 topic 并发写入
- 针对一个 topic,需要有物理 physics offset, 每次写入,topic 维度的 physics_offset 自增加一
- 写入需要上锁吗? 看怎么写, 如果使用非原子的形式自增 id,比如数据的的方式,先查出最大 id,再加一,那么必须加锁
- 如果使用 redis 自增特性实现, 为每个 topic 配置一个自增 key, 则可以避免加锁.
- redis 实现虽然性能好, 如为配置aof,宕机则可能导致丢失数据, 此时,会出现 offset 重复异常, 过一会随着继续自增, 也就恢复了.
- topic 维度的自增 id 如果使用 mysql 实现, 性能不堪受辱,因此,此处使用 redis 自增实现(可配置为 mysql 实现);
- 经过测试,笔记本电脑,单 topic 20 并发写入,qps 在 1000-1500 左右(local mysql & local redis),基本满足业务需求。
- 考虑到高可用性和业务场景,此处无法使用批量插入
- 写入需要上锁吗? 看怎么写, 如果使用非原子的形式自增 id,比如数据的的方式,先查出最大 id,再加一,那么必须加锁
- 所有的 topic 和 msg 都写入的这一张表中,表数据定时清理,消费完的消息,可提前删除。
- 注意,本方案写入性能瓶颈是 MySQL Server 的性能瓶颈。
读取设计
- 这个 client id,我们将其设计为,ip + pid + uuid + thread id;
- ip 和 pid 可帮助我们追溯问题
- uuid 简单防重复
- thread id,一种性能优化,下面细说。
- 结合实际业务场景,且遵循 simple is better 原则,读取时,使用上锁的方式解决并发问题。锁的粒度就是 tg
- 考虑到要实现基本的顺序读取和防止重复消费,多线程并发时,我们应当实现基于自增的形式读取 msg;每个 clientid 读取消息后,都会记录一个简单的log,并在 tg 维度增加一个 max offset
- 每次读取消息时,每个 client 都需要去检查当前想要读取的 tg 是否已经有【其他 client】在操作 max offset。即,我们将锁的粒度缩小到了 max offset;
- 整体原则是,一个 t + g 的 max offset,同时只能有一个 thread 操作(写和更新)
- 如果有其他人在读取,则阻塞
- 如果没有其他人在读取,则锁住这个 tg, 并批量拉取一定数量的消息 id,
- 对这个 tg 维度的 max offset + n
- 批量插入这个 tg + clientid offset log,表明这个消息被这个 clientid 读取了,同时也间接更新了 max offset(order by offset)
- 释放锁🔐
- 拉取刚刚读取的 msg id list 里面的消息体
- 交给业务处理消息
ack
广播消息
client id
核心表设计
锁🔐
如上文所说,由于本方案未采用常见的多 queue 和多 partition 的设计,因此瓶颈在于上图提到的分布式锁的设计上,具体链路为 consumer group client 在集群消费时,
为了让并发读取的 thread 拉取到的消息尽可能准确,使用上锁的方式来实现。
总体看下来, 可以简单理解为, ems 失去了性能, 却拥有了所有.
ps:一个 Java 插件化项目,github 地址:github.com/stateIs0/ex… 欢迎交流使用 star。