数十万定时任务,如何高效触发定时和超时

2023年 11月 7日 21.3k 0

项目产品中,大家都会有"定时任务"和"定时超时"的需求,初始阶段,我们基本都是用少数的一些timer,即使是任务量越来越大的时候,我们就难免维护着大量的timer,或者进行了大量低效的扫描。

定时任务使用场景:当订单一直处于未支付状态时,如何及时的关闭订单(已经使用)

如何定期检查处于退款状态的订单是否已经退款成功(后期重构使用)

设计方案:

  • 整个Redis当做消息池,以KV形式存储消息
  • 使用ZSET做优先队列,按照Score维持优先级
  • 使用LIST结构,以先进先出的方式消费
  • ZSET和LIST存储消息地址(对应消息池的每个KEY)
  • 使用定时器维护路由
  • 根据TTL规则实现消息延迟

咱们公司现阶段就是使用的这套方法:

1.新增一个job,会job_pool中插入一条数据,记录了业务方消费方。也会在bucket插入一条记录,记录执行的时间戳

2.搬运线程会去bucket中查找哪些执行时间戳的RunTimeMillis比现在的时间小,将这些记录全部删除;同时会解析出每个任务的Topic是什么,然后将这些任务PUSH到TOPIC对应的列表queue中

3每个topic的list都会有一个监听线程去批量获取list中的待消费数据,获取到的数据全部扔给这个topic的消费线程池

4.消费线程池执行会去job_pool查找数据结构,返回给回调结构,执行回调方法。

图片图片

待优化的内容:

  • 目前只有一个Queue队列存放消息,当需要消费的消息大量堆积后,会影响消息通知的时效。改进的办法是,开启多个Queue,进行消息路由,再开启多个消费线程进行消费,提供吞吐量
  • 消息没有进行持久化,存在风险,后续会将消息持久化到MongoDB中
  • 一般来说还有什么其他方法实现这类需求呢?

    “轮询扫描法”

    1.用一个Map来记录每一个uid最近一次请求时间last_packet_time

    2.当某个用户uid有请求包来到,实时更新这个Map

    3.启动一个timer,当Map中不为空时,轮询扫描这个Map,看每个uid的last_packet_time是否超过30s,如果超过则进行超时处理

    “多timer触发法”

    1.用一个Map来记录每一个uid最近一次请求时间last_packet_time

    2.当某个用户uid有请求包来到,实时更新这个Map,并同时对这个uid请求包启动一个timer,30s之后触发

    3.每个uid请求包对应的timer触发后,看Map中,查看这个uid的last_packet_time是否超过30s,如果超过则进行超时处理

    方案一:只启动一个timer,但需要轮询,效率较低

    方案二:不需要轮询,但每个请求包要启动一个timer,比较耗资源

    ZSet(有序集合)数据结构来实现

  • 创建ZSet:首先,你需要创建一个ZSet数据结构,其中每个订单将作为一个成员,其分数将表示订单的创建时间戳。你可以使用Redis等支持ZSet的数据库来实现。
  • 添加订单:每当用户创建新订单时,将订单添加到ZSet中,其中成员是订单ID,分数是订单的创建时间戳。
  • 定时检查订单:定期(例如,每分钟)执行一个程序或定时任务来检查ZSet中的订单。你可以使用程序来查询ZSet,找到创建时间超过一定时间阈值的订单,表示它们长时间未支付。
  • 取消订单:对于那些长时间未支付的订单,可以将其从ZSet中删除,并执行取消订单的操作(例如,将订单状态设置为"已取消")。
  • import redis.clients.jedis.Jedis;
    import redis.clients.jedis.Tuple;
    import java.util.Set;
    
    
    public class OrderCancellationSystem {
        public static void main(String[] args) {
            Jedis jedis = new Jedis("localhost"); // 连接到本地Redis服务器
    
    
            // 模拟添加订单
            addOrder(jedis, "Order1");
            addOrder(jedis, "Order2");
    
    
            // 定时任务,每分钟检查订单并自动取消
            while (true) {
                cancelLongUnpaidOrders(jedis);
                try {
                    Thread.sleep(60000); // 等待一分钟再次检查
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    
        public static void addOrder(Jedis jedis, String orderId) {
            long currentTime = System.currentTimeMillis();
            jedis.zadd("orders", currentTime, orderId);
        }
    
    
        public static void cancelOrder(String orderId) {
            // 执行取消订单操作,例如更新订单状态
            System.out.println("Cancelling order: " + orderId);
        }
    
    
        public static void cancelLongUnpaidOrders(Jedis jedis) {
            long expirationTime = System.currentTimeMillis() - 3600 * 1000; // 60分钟前的时间戳
            Set longUnpaidOrders = jedis.zrangeByScoreWithScores("orders", "-inf", String.valueOf(expirationTime));
    
    
            for (Tuple order : longUnpaidOrders) {
                String orderId = order.getElement();
                cancelOrder(orderId);
                jedis.zrem("orders", orderId); // 从ZSet中删除已取消的订单
            }
        }
    }

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论