造个轮子任务调度执行小框架任务清单执行器实现

2023年 8月 13日 43.8k 0

前言

okey,上一篇文章我们提到了,如何实现它的一个清单的一个代理。这里的话我们来捋一捋我们的这个执行流程是啥:
在这里插入图片描述
所以的话,我们的我们这里今天要做的是这个执行器的一个执行。当然这里的话,我们也是分两个部分,因为这个执行器的话,是分两个部分的,一个是正常的任务执行,还有一个是这个宕机之后,我们对任务的一个恢复的处理。

执行器流程

提交流程

那么在这里的话,我得先说说这个执行器提交的流程,因为这个不说清楚的话,就比较麻烦了。
首先我们先来看到这几个类:
在这里插入图片描述
然后的话,我们的流程是这样的:

1. ExecuteMagaer 负责创建TaskWrapper
2. TaskWrapper 里面包含了代理对象,执行代理对象的执行方法
3. 将TaskWraaper 提交到线程池里面去

所以的话,是通过这三个环节,最终任务提交到了我们的这个线程池里面,然后进行执行。

线程池实现

okey,这里的话,我们当然需要去有一个线程池,但是这个线程池的话,有个特点,那就是:

  • 如果你有ID,那么相同ID的任务排队执行
  • 如果没有ID,那么就直接异步执行
    这样做的话有啥好处嘛,好处就是,假设这个ID是你的UserID,在用户下单的时候,就算重复下单,由于两次账单是顺序执行的,第一个账单执行完毕之后,改变了状态,假设此时你对商品ID上锁了,那么第二个账单执行的时候,发现商品ID锁住了,就不会继续无脑执行了。主要是实现更加细致的操作。
  • package com.huterox.todoscheduler.common.impl;
    
    
    import com.huterox.todoscheduler.common.SerializationUtils;
    import com.huterox.todoscheduler.common.TaskManager;
    import com.huterox.todoscheduler.config.Configuration;
    import com.huterox.todoscheduler.core.wapper.TaskWrapper;
    
    import java.io.Serializable;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.*;
    
    /**
     * 任务管理器
     * */
    public class DefaultTaskManager implements TaskManager, Serializable {
    
        private final ThreadPoolExecutor executor;
        private final Map taskQueues;
        private final Object lock;
    
        public DefaultTaskManager(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
            executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue());
            taskQueues = new HashMap();
            lock = new Object();
        }
    
        public DefaultTaskManager() {
    
            executor = new ThreadPoolExecutor(
                    Configuration.corePoolSize,
                    Configuration.maximumPoolSize,
                    Configuration.keepAliveTime,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue());
            taskQueues = new HashMap();
            lock = new Object();
        }
    
        @Override
        public void submitTask(TaskWrapper task, String id) {
            if (id == null || id.isEmpty()) {
                executor.execute(task); // 直接执行任务
                //然后保存当前的一个状态
                saveStatus();
            } else {
                synchronized (lock) {
                    BlockingQueue queue = taskQueues.computeIfAbsent(id, k -> new LinkedBlockingQueue());
    
                    if (queue.isEmpty()) {
                        // 之前没有相同ID的任务在执行,直接提交到线程池执行
                        executor.execute(() -> {
                            try {
                                task.run(); // 执行任务
                            } finally {
                                submitNextTask(id); // 执行完毕后提交下一个任务
                                saveStatus();
                            }
                        });
                    } else {
                        // 将任务加入队列中,等待前面的任务执行完毕后再执行
                        queue.offer(() -> {
                            try {
                                task.run(); // 执行任务
                            } finally {
                                submitNextTask(id); // 执行完毕后提交下一个任务
                                saveStatus();
                            }
                        });
                    }
                }
            }
        }
    
        @Override
        public void saveStatus() {
            //保存当前的一个状态
            SerializationUtils.serializeObject(this,"runningTask","task.ser");
        }
    
        private void submitNextTask(String id) {
            synchronized (lock) {
                BlockingQueue queue = taskQueues.get(id);
                if (queue != null && !queue.isEmpty()) {
                    executor.execute(queue.poll()); // 提交下一个任务
                } else {
                    taskQueues.remove(id); // 队列为空时移除对应的ID
                }
            }
        }
    
        public ThreadPoolExecutor getExecutor() {
            return executor;
        }
    
        public Map getTaskQueues() {
            return taskQueues;
        }
    
        public Object getLock() {
            return lock;
        }
    
        @Override
        public void shutdown() {
            executor.shutdown();
        }
    }
    
    

    这里面的代码细节,我就不说了,因为不难,再说篇幅太大了,还有好多东西要说呢。

    执行器实现

    ok,我们说了这个流程,我们来看到这个执行器是如何实现的。

    接口

    首先的话,我们是有一个接口的:

    package com.huterox.todoscheduler.core.execute;
    
    /**
     * 我们核心的调度器,通过TodoListFactory可以得到可以执行的任务清单
     * */
    public interface ExecuteCore {
    
        void execute(String ListName);
    
        String getClsId();
    
        void run();
    
        //服务器意外宕机之后,恢复这个任务的时候要进行的操作
        void repair();
    
    
    
    }
    
    

    看到这个接口的话,有好几个方法,首先是执行提交的,然后是run,这个主要是这个原因:

    package com.huterox.todoscheduler.core.wapper;
    
    import com.huterox.todoscheduler.core.enumType.ExecuteType;
    import com.huterox.todoscheduler.core.execute.ExecuteCore;
    
    import java.io.Serializable;
    
    
    public class TaskWrapper implements Runnable, Serializable {
    
        private ExecuteCore executeCore;
    
        private ExecuteType executeType = ExecuteType.Run;
    
        public TaskWrapper() {
        }
    
        public ExecuteCore getExecuteCore() {
            return executeCore;
        }
    
        public ExecuteType getExecuteType() {
            return executeType;
        }
    
        public void setExecuteType(ExecuteType executeType) {
            this.executeType = executeType;
        }
    
        public void setExecuteCore(ExecuteCore executeCore) {
            this.executeCore = executeCore;
        }
    
        public TaskWrapper(ExecuteCore executeCore) {
            this.executeCore = executeCore;
        }
    
        @Override
        public void run() {
            if(executeType==ExecuteType.Run){
                executeCore.run();
            }else if(executeType==ExecuteType.Repair){
                executeCore.repair();
            }
    
        }
    }
    
    

    这个TaskWrapper是实现了Runable接口,里面有run,所以就索性这样写了。

    状态标志

    之后的话,我们的项目到这里的话,只是实现了一个正向的过程,就是当项目宕机的时候,我们要尽可能去恢复任务清单的一个执行,或者状态,比如你买东西的接口,后面执行退款代码的时候,服务器宕机了,那么这个时候,我要尽可能去恢复这个宕机退款的执行。当然这里面要考虑的东西要多得多,小项目的话,你要相信第三方组件要比自己写的代码靠谱(狗头)

    这里主要是这两个:
    在这里插入图片描述

    package com.huterox.todoscheduler.core.enumType;
    
    import java.io.Serializable;
    
    /**
     * 当前的任务清单执行的情况
     * */
    public enum TodoListStateType implements Serializable {
    
        CreatFailed,
        Running,
        Fine,
        Error,
        Repairing;
    }
    
    
    package com.huterox.todoscheduler.core.enumType;
    
    import java.io.Serializable;
    
    public enum TodoItemStateType implements Serializable {
    
        /**
         * 需要重新运行启动,适用于强一致的清单
         * */
        Again,
    
        Running,
    
        Error,
    
        /**
         * 运行正常
         * */
        Fine,
    
        /**
         * 只需要执行修复,适用于弱一致的清单
         * */
        Repairing;
    }
    
    

    执行周期实现

    之后的话,就是我们要实现一个完整的执行周期了:

    清单代理创建

    在执行的时候 ,我们先要去创建这个代理对象。但是这个代理对象的话,也有创建前执行处理器,之后处理器等待。所以这里也要进行一个处理:

        @Override
        public void execute(String ListName) {
            //1. 先拿到可执行清单对象
            todoListExBean = TodoListFactory.getInstance(ListName);
            //2. 查看返回的结果
            if(todoListExBean==null){
                //说明创建就失败了,这个失败是清单对象都没有起来
                this.running = false;
            }else {
                if(todoListExBean.getTodoListStateType()== TodoListStateType.CreatFailed){
                    //创建失败了
                    if(todoListExBean.getTodoListElementType()== TodoListElementType.NothingConsistency
                        || todoListExBean.getTodoListElementType()== TodoListElementType.WeakConsistency
                    ){
                        //在创建阶段,只要你不是强一致性,那么我就不管你,如果创建都失败了
                        System.err.println("任务清单创建失败,取消执行");
                    }else if(todoListExBean.getTodoListElementType()==TodoListElementType.StrongConsistency){
                        //强一致,这个时候,直接进入失败队列,这个时候也是创建失败,但是这个失败是指
                        //清单项目创建的时候有问题,清单对象起来了
                        TodoListFailedList.addFailedWithSerializable(todoListExBean);
                        this.running = false;
                    }
                }
            }
        }
    
    

    清单项执行

    之后的 话,才是执行我们的 方法。执行之后结束。
    然后对于一个清单项,它其实有这样的几个过程:

  • 清单项创建前(在Factory的时候就可以看到)
  • 当前清单项执行前
  • 当前清单项执行时刻
  • 当前清单项执行后
  • 清单项执行异常
  • 一个完整的清单周期包括:

  • 清单创建前(创建之后是立刻执行的,因此没有执行前这个方法)
  • 清单项周期
  • 清单结束
  • 清单执行异常
  •  @Override
        public void run() {
    
            if(!this.running){
                System.err.println("执行失败,停止执行该任务清单");
                return;
            }
    
            todoListExBean.setExTimes(todoListExBean.getExTimes()+1);
            todoListExBean.setTodoListStateType(TodoListStateType.Running);
            //在这里完成方法任务清单项的执行
            //同时在这里完成状态的持久化处理,方便恢复状态
            Map sortedMap = todoListExBean.getSortedMap();
    
            //开始遍历执行清单项
            for(Map.Entry entry:sortedMap.entrySet()){
                Integer key = entry.getKey();
                TodoItemExBean entryValue = entry.getValue();
                entryValue.setTodoItemStateType(TodoItemStateType.Running);
                //这里开始按照生命周期执行代码
                try {
                    if (entryValue.getTodoItemBeforeRunningHandler()!=null)
                    {
                        //执行第一个运行时,运行前的代码
                        TodoItemBeforeRunningHandler todoItemBeforeRunningHandler = entryValue.
                                getTodoItemBeforeRunningHandler();
                        boolean concierge = todoItemBeforeRunningHandler.concierge(
                                entryValue.getStateWrapper(),
                                todoListExBean.getStateWrapper()
                        );
                        if(!concierge){
    
                            //没有满足通过条件,需要跳过或者终止
                            if(entryValue.getTodoItemElementType()== TodoItemElementType.CONTINUTEITEM){
                                System.err.println("任务清单:"+todoListExBean.getTodoListName()
                                        +"-Cid:" + todoListExBean.getClsId()+"第"+entryValue.getOrder()+
                                        "方法:"+entryValue.getWrapperMethod().getName()+"未通过运行时执行前Handler"
                                        +"正在前往执行下一个任务项"
                                        );
                            }else {
                                //查看当前任务清单类型,如果是那种强一致性的,那就加入失败队列,等待重启
                                if(todoListExBean.getTodoListElementType()==TodoListElementType.StrongConsistency) {
                                    //强一致,这个时候,直接进入失败队列
                                    entryValue.setTodoItemStateType(TodoItemStateType.Again);
                                    TodoListFailedList.addFailedWithSerializable(todoListExBean);
                                }else if(
                                    todoListExBean.getTodoListElementType()==TodoListElementType.WeakConsistency
                                ){
                                    entryValue.setTodoItemStateType(TodoItemStateType.Repairing);
                                }
                                return;
                            }
                            entryValue.setTodoItemStateType(TodoItemStateType.Repairing);
                            //这个时候由于没有满足条件,那么这个时候要执行对应的恢复函数
                            todoItemBeforeRunningHandler.repair(
                                    entryValue.getStateWrapper(),
                                    todoListExBean.getStateWrapper()
                            );
                        }
                    }
    
                    //执行运行时刻代码
                    Method wrapperMethod = entryValue.getWrapperMethod();
                    wrapperMethod.setAccessible(true);
                    Parameter[] parameters = wrapperMethod.getParameters();
    
                    // 构造参数数组
                    Object[] argsArray = new Object[parameters.length];
                    for (int i = 0; i < parameters.length; i++) {
                        Parameter parameter = parameters[i];
                        if (parameter.getType() == ListStateWrapper.class) {
                            // 设置特定参数的值
                            argsArray[i] = todoListExBean.getStateWrapper();
                        }else if(parameter.getType() == ItemStateWrapper.class){
                            argsArray[i] = entryValue.getStateWrapper();
                        }
                    }
                    // 执行方法
                    Object result = wrapperMethod.invoke(entryValue, argsArray);
    
                    //执行后置处理,这个执行流程和前置是一样的
                    if (entryValue.getTodoItemAfterRunningHandler()!=null){
                        TodoItemAfterRunningHandler todoItemAfterRunningHandler = entryValue.getTodoItemAfterRunningHandler();
                        boolean concierge = todoItemAfterRunningHandler.concierge(
                                entryValue.getStateWrapper(),
                                todoListExBean.getStateWrapper()
                        );
                        if(!concierge){
                            entryValue.setTodoItemStateType(TodoItemStateType.Repairing);
                            if(entryValue.getTodoItemElementType()== TodoItemElementType.CONTINUTEITEM){
                                System.err.println("任务清单:"+todoListExBean.getTodoListName()
                                        +"-Cid:" + todoListExBean.getClsId()+"第"+entryValue.getOrder()+
                                        "方法:"+entryValue.getWrapperMethod().getName()+"未通过运行时执行前Handler"
                                        +"正在前往执行下一个任务项"
                                );
                            }else {
                                if(todoListExBean.getTodoListElementType()==TodoListElementType.StrongConsistency) {
                                    //强一致,这个时候,直接进入失败队列
                                    entryValue.setTodoItemStateType(TodoItemStateType.Again);
                                    TodoListFailedList.addFailedWithSerializable(todoListExBean);
                                }else if(
                                        todoListExBean.getTodoListElementType()==TodoListElementType.WeakConsistency
                                ){
                                    entryValue.setTodoItemStateType(TodoItemStateType.Repairing);
                                }
                                return;
                            }
                            //这个时候由于没有满足条件,那么这个时候要执行对应的恢复函数
                            todoItemAfterRunningHandler.repair(
                                    entryValue.getStateWrapper(),
                                    todoListExBean.getStateWrapper()
                            );
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    //对错误进行处理
                    entryValue.setTodoItemStateType(TodoItemStateType.Error);
                    if(entryValue.getTodoItemErrorHandler()!=null){
                        try {
                            TodoItemErrorHandler todoItemErrorHandler = entryValue.getTodoItemErrorHandler();
                            todoItemErrorHandler.concierge(
                                    entryValue.getStateWrapper(),
                                    todoListExBean.getStateWrapper()
                            );
                        }catch (Exception e1){
                            e1.printStackTrace();
                            //如果这个都执行失败了,那真的没救了
                            //加入失败列表看看了,只能,如果是一定要执行的话
                            if(todoListExBean.getTodoListElementType()==TodoListElementType.StrongConsistency){
                                TodoListFailedList.addFailedWithSerializable(todoListExBean);
                            }else {
                                System.err.println("任务强制终止");
                            }
                            return;
                        }
                    }
                }
                //此时这个任务清单的小项目才算执行正常
                entryValue.setTodoItemStateType(TodoItemStateType.Fine);
            }
            //清单项目是执行完毕了,那么接下来是这个清单的后置处理部分
            if(todoListExBean.getTodoListAfterHandler()!=null) {
                TodoListAfterHandler todoListAfterHandler = todoListExBean.getTodoListAfterHandler();
                try {
                    boolean concierge = todoListAfterHandler.concierge(todoListExBean.getStateWrapper());
                    if(!concierge) {
    
                        todoListExBean.setTodoListStateType(TodoListStateType.Repairing);
                        //这个时候由于没有满足条件,那么这个时候要执行对应的恢复函数
                        todoListAfterHandler.repair(
                                todoListExBean.getStateWrapper()
                        );
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    todoListExBean.setTodoListStateType(TodoListStateType.Error);
                    //对错误进行处理
                    if(todoListExBean.getTodoListErrorHandler()!=null){
                        try {
                            TodoListErrorHandler todoListErrorHandler = todoListExBean.getTodoListErrorHandler();
                            todoListErrorHandler.concierge(
                                    todoListExBean.getStateWrapper()
                            );
                        }catch (Exception e1){
                            e1.printStackTrace();
                            if(todoListExBean.getTodoListElementType()==TodoListElementType.StrongConsistency){
                                todoListExBean.setTodoListStateType(TodoListStateType.Error);
                                TodoListFailedList.addFailedWithSerializable(todoListExBean);
                            }else {
                                System.err.println("任务强制终止");
                            }
                        }
                    }
                }
            }
    
            todoListExBean.setTodoListStateType(TodoListStateType.Fine);
        }
    

    那么这里的核心代码其实就是这一块儿。当然这个家伙的实现还有一部分,关于这个状态恢复的。

    总结

    那么这篇博文就到这里,今天还有一篇,这里再重复一次(第二次了),我们的项目地址是:gitee.com/Huterox/hto…

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论