有这样一种需求,程序中需要一个定时任务(使用Cron处理定时),但是服务是集群部署的,这种情况就会造成重复执行,如果代码写的不够严谨,甚至会生成重复数据。
为避免这种情况,有3种方案,一是单机部署(缺点是机器挂了怎么办?),一是将定时任务暴露为HTTP服务,供外界(某定时任务)调用(缺点是增加了系统复杂性,定时任务的执行权只是交了出去而已),最佳选择就是每个服务都能自己判断出只执行一次就好了。
思路与实现
我们可以利用Redis的分布式锁来实现:
大致代码如下:
const uuid = nanoid();
await this.client.set("uuid", uuid, {
ex: 30,
mode: "NX",
});
if (await this.client.get("uuid") === uuid) {
console.log("will excute");
}
假设有3个集群部署,这段代码同时执行时,在Redis层面只会有一个值插入成功,而这个成功的容器便是抢单成功的幸运儿,它就可以执行任务了。
先到先得与丛林法则
其实,你仔细思考会发现,上面的代码如果不设置NX,整个逻辑也是成立的。只是NX的逻辑是谁第一个抢到了单就是谁的,典型的先到先得,而不设置的逻辑则是单一直被抢,单子似乎会落到最后一个人的手里。但这样会有明显的问题:
以A、B两个人抢单为例,他们与平台(Redis)都是两次I/O(一次写,一次读)。
正常A、B都是一样的速度开抢,平台也是同样的反应灵敏,那么游戏是公平的。这时,平台出了问题,忙不过来了,虽然A和B同时发起抢单,但平台只先响应A,然后A询问单子情况,平台告诉他单子是A的,A高高兴兴送单去了,紧接着平台发现B来抢单了,又把单子给了他,B询问单子情况,平台告诉他单子是B的,B也高高兴兴地去了。这就导致单子重复了。
要避免这种情况,需要A送单时将这个单子加锁,这样平台即使晚些处理B的抢单操作,仍能意识到不该让单子再给B了。所以,加锁是必须的。
这样看来,最开始的NX方式有效,先到先得,简单明了,这才是文明社会的做法。
谦逊要不得
另一种常见的思路是,存储一个Key值,判断它存不存在,如果不存在,那我就是第一个抢到单的,这时赶快标记下单子的所有权。它的问题在于,它的判断是先读后写,在任务并发时,对于平台(Redis)而言,一开始都是读,库中确实还没有这个key,这样所有人都被告知不存在,都认为自己抢到单了,这就出问题了。
所以对于可能有集群或并发的I/O操作,要读就直接读,不要为保障逻辑,先判断存不存在(像Node.js中fs.stat
),因为即使这一刻这文件是存在的,下一瞬间可能就被其它程序删了,这样不光多了一次I/O,更
重要的是你看似完美无缺的代码出现了bug。
也就是说不要再这样写代码:
if (fs.statSync("aa.txt")) {
const text = fs.readFileSync("aa.txt", "utf8");
}
看看Node.js中fs.stat的API说明:
Using fs.stat() to check for the existence of a file before calling fs.open(), fs.readFile() or fs.writeFile() is not recommended. Instead, user code should open/read/write the file directly and handle the error raised if the file is not available.
翻译过来就是:
在调用fs.open()、fs.readFile()或fs.writeFile()之前使用fs.stat()检查文件是否存在并不推荐。相反,用户代码应该直接打开/读取/写入文件,并处理如果文件不可用时引发的错误。
这就像生活中的一个场景:你问柜台小姐姐,冰淇淋还有没有啊?小姐姐刚要回答你时,一个熊孩子直接抢走了最后一个冰淇淋,你只能面对小姐姐爱莫能助的抱歉眼神。这时,你的内心是否在后悔,我为什么不先把冰淇淋拿走?
总结
本文主要讲述了一个分布式系统中如何避免定时任务的重复执行问题。使用Redis分布式锁的方案,通过每个服务器生成随机uuid并将其作为value存入Redis的Setnx命令中,从而实现对定时任务的锁定,避免了重复执行。
我们在日常编码中,如果遇到需要读写文件的场景,不要先用stat嗅探文件是否存在,而是应该直接操作,再处理文件不可用时的错误。
最后,分享下简单封装的抢任务的代码:
async function snatchTask(key: string, func: () => Promise) {
const uuid = nanoid();
await this.redis.set(key, uuid, {
ex: 30,
mode: "NX",
});
const new_uuid = await this.redis.get(key);
if (new_uuid !== uuid) {
this.logger.debug(`${key}已经有任务运行,此次不会再执行`);
return;
}
await func();
await delay(1000); // 这句是必须的,这是为了防止删除的太快,导致不同容器的任务又重复执行了
await this.redis.del(key);
}