使用SpringTask+Redisson+Kafka 完成一个分布式的调度功能

2023年 9月 2日 63.3k 0

功能背景: 从txt文档拉取1000w左右的电话号码

Rdisson:使用分布式锁,选出分配者来处理分片任务分配。
Kafka:创建主题指定三个分区每个分区对应一个消费者(服务实例)和再均衡特性(实例宕机或重启kafka会重新分配任务)完成数据处理。解耦数据和处理者。

image.png

代码实现

Kafka生成者配置类

package com.example.demo.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {
    @Bean
    public Map producerConfigs() {
        Map props = new HashMap();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.100:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional.id");//支持事务
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);//分区器
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);//消息幂等
        return props;
    }

    @Bean
    public ProducerFactory producerFactory() {
        return new DefaultKafkaProducerFactory(producerConfigs());
    }

    @Bean
    public KafkaTemplate kafkaTemplate() {
        return new KafkaTemplate(producerFactory());
    }
}

kafka消费者配置类

package com.example.demo.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {

    @Value("${server.port}")
    private String serverPort;

    @Bean
    public Map consumerConfigs() {
        Map props = new HashMap();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.100:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3);
        // READ_COMMITTED 读取已提交的事务
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean
    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory(consumerConfigs());
    }

    @Bean("kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
}

Redisson配置类

package com.example.demo.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redisson(){
        Config config = new Config();
        config.setLockWatchdogTimeout(60 * 1000);
        //config.setTransportMode(TransportMode.EPOLL);
        config.useSingleServer().setAddress("redis://192.168.8.100:6379");
               // .addNodeAddress("redis://192.168.8.100:6379");
        return Redisson.create(config);
    }
}

任务分配处理

package com.example.demo.job;


import cn.hutool.core.date.DateUtil;
import com.alibaba.druid.util.StringUtils;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.example.demo.annotation.RedissonAopLock;
import com.example.demo.dao.TaskDao;
import com.example.demo.pojo.TaskInfo;
import com.example.demo.pojo.TaskMsgInfo;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;

/**
 * 任务分配
 */
@Slf4j
@Component
public class TaskDistribute {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Autowired
    private TaskDao taskDao;

    @Transactional
    @RedissonAopLock(key = JobConstants.LOCK_KEY)
    @Scheduled(cron = "0 19 17 * * ?")
    public void distribute()
    {
        int handleNum = JobConstants.HANDLE_NUM;
        QueryWrapper queryWrapper = new QueryWrapper();
        String executeDate = DateUtil.format(new Date(), JobConstants.PULL_TIME_FORMAT);
        queryWrapper.lambda().eq(TaskInfo::getExecuteDate, executeDate);
        Integer count = taskDao.selectCount(queryWrapper);
        if (count != null && count > 0)
        {
            log.info("任务已经创建!");
            return;
        }
        log.info("开始任务创建!");
        // 调用脚本执行 读取文件获取总数
        int size = callCommand();
        if (size == 0) {
            return;
        }
        log.info("size:{}",size);
        int frequency = (size % handleNum) == 0 ? size / handleNum : (size / handleNum) + 1;
        List tasks = new ArrayList(frequency);
        // 组装数据
        int startIndex;
        for (int j = 0; j ids:{}",ids);
        sendMsg(ids);
        log.info("完成任务创建!");
    }

    /**
     * 发送数据到kafka
     * @param ids 切片数据主键
     */
    private void sendMsg(List ids){
        ObjectMapper mapper = new ObjectMapper();
        String msg;
        for (Integer id : ids)
        {
            try
            {
                msg = mapper.writeValueAsString(new TaskMsgInfo(id));
            }
            catch (JsonProcessingException e)
            {
                continue;
            }
            kafkaTemplate.send(JobConstants.TASK_TOPIC,msg).addCallback(new ListenableFutureCallback() {
                @Override
                public void onFailure(Throwable ex) {
                    throw new RuntimeException(ex);
                }

                @Override
                public void onSuccess(SendResult result)
                {
                    RecordMetadata data = result.getRecordMetadata();
                    log.info("send success topic:{},partition:{},offset{}",data.topic(),data.partition(),data.offset());
                }
            });
        }
    }

    private int callCommand(){
        String osName = System.getProperty("os.name");
        String cmd = null;
        if (osName.toLowerCase().contains("linux")){
            cmd="/home/felven/word2vec/demo-classes.sh";
        }
        if (osName.toLowerCase().contains("windows")){
            cmd="cmd /c type C:UsersyjDesktopiphone_num.txt | find /v /c""";
        }
        if(StringUtils.isEmpty(cmd)){
            log.info("osName:{}",osName);
            return 0;
        }
        try {
            log.info("cmd:{}",cmd);
            Process ps = Runtime.getRuntime().exec(cmd);
            ps.waitFor();
            BufferedReader br = new BufferedReader(new InputStreamReader(ps.getInputStream()));
            StringBuilder sb = new StringBuilder();
            String line;
            while ((line = br.readLine()) != null) {
                sb.append(line);
            }
            if (sb.length() == 0) {
                log.info("文件内容为空!");
                return 0;
            }
            return Integer.parseInt(sb.toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
        return 0;
    }
}

消息消费者类

package com.example.demo.job;

import cn.hutool.core.collection.CollectionUtil;
import com.example.demo.pojo.TaskMsgInfo;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

@Slf4j
@Component
public class TaskConsumer {

    @Autowired
    private TaskExecute taskExecute;

    @KafkaListener(topics = JobConstants.TASK_TOPIC,concurrency = "1",containerFactory = "kafkaListenerContainerFactory")
    public void consumer(List record, Acknowledgment ack){
        log.info("开始处理消息!!!");
        if (CollectionUtil.isNotEmpty(record)){
            List data = new ArrayList(record.size());
            ObjectMapper mapper = new ObjectMapper();
            for (Object obj : record){
                TaskMsgInfo msgInfo;
                try {
                    msgInfo = mapper.readValue(obj.toString(), TaskMsgInfo.class);
                } catch (JsonProcessingException e) {
                    log.error("转换异常:{}",obj);
                    continue;
                }
                msgInfo.setNum(msgInfo.getNum() + 1);
                if (msgInfo.getNum() > 3) {
                    log.info("重试次数超过3次数据丢弃,taskMsgInfo:{}",msgInfo);
                    continue;
                }
                data.add(msgInfo);
            }
            taskExecute.execute(data);
        }
        ack.acknowledge();
    }
}

消息处理类

package com.example.demo.job;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.example.demo.contants.TaskStatusEnum;
import com.example.demo.dao.TaskDao;
import com.example.demo.pojo.TaskInfo;
import com.example.demo.pojo.TaskMsgInfo;
import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;

@Slf4j
@Component
public class TaskExecute {

    @Autowired
    private TaskDao taskDao;

    @Value("${server.port}")
    private String serverPort;

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void execute(List record){
        ObjectMapper mapper = new ObjectMapper();
        try {
            List ids = record.stream().map(TaskMsgInfo::getDbId).collect(Collectors.toList());
            log.info("ids:{}",ids);
            QueryWrapper queryWrapper = new QueryWrapper();
            queryWrapper.lambda().in(TaskInfo::getId,ids)
                    .eq(TaskInfo::getExecuteDate, DateUtil.format(new Date(),JobConstants.PULL_TIME_FORMAT))
                    .eq(TaskInfo::getStatus, TaskStatusEnum.PENDING);
            List tasks = taskDao.selectList(queryWrapper);
            if (CollectionUtil.isEmpty(tasks))
            {
                log.info("taskId:{} 任务已经处理完成",ids);
                return;
            }
            List list = null;
            for (TaskInfo taskInfo :tasks) {
                 list = Files.readAllLines(Paths.get("C:UsersyjDesktopiphone_num.txt"), StandardCharsets.UTF_8).
                        stream().skip(taskInfo.getStart()).limit(JobConstants.HANDLE_NUM).collect(Collectors.toList());
                // 读取到数据
                log.info("num_list:{}",list);
                taskInfo.setStatus(TaskStatusEnum.COMPLETE.name());
                taskInfo.setHandle(serverPort);
                taskDao.updateById(taskInfo);
            }
            list.clear();
        } catch (IOException e) {
            if (e instanceof JacksonException){
                log.error("msg readValue Exception:{}",e.getMessage());
                return;
            }
            try {
                for (TaskMsgInfo taskMsgInfo : record){
                    kafkaTemplate.send(JobConstants.TASK_TOPIC,mapper.writeValueAsString(taskMsgInfo));
                }
            } catch (JsonProcessingException ex) {
                log.error("writeValueAsString Exception:{}",ex.getMessage());
            }
        }
    }
}

Redisson + AOP 获取锁

织点配置类

package com.example.demo.aop;

import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;

/**
 * execution:匹配方法签名 "execution(* testExecution(..))"
 * **within:**指定所在类或所在包下面的方法(Spring AOP 独有) "within(ric.study.demo.aop.svc..*)"
 * @annotation:方法上具有特定的注解 @annotation:方法上具有特定的注解
 * bean(idOrNameOfBean):匹配 bean 的名字(Spring AOP 独有)
 */

@Aspect
@Component
public class PointcutConfig {

    @Pointcut("@annotation(com.example.demo.annotation.RedissonAopLock)")
    public void redissonPointcut(){}
}

切面处理

package com.example.demo.aop;

import com.example.demo.annotation.RedissonAopLock;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;

@Aspect
@Component
public class RedissonLockAspect {

    @Autowired
    private RedissonClient redissonClient;

    @Around("com.example.demo.aop.PointcutConfig.redissonPointcut()")
    public Object redissonLockAspect(ProceedingJoinPoint pjp) throws Throwable {
        Object proceed;
        MethodSignature signature = (MethodSignature)pjp.getSignature();
        Method method = signature.getMethod();
        RedissonAopLock annotation = method.getAnnotation(RedissonAopLock.class);
        String key = annotation.key();
        RLock lock = redissonClient.getLock(key);
        try {
            lock.lock();
            proceed = pjp.proceed();
        } finally {
            if(lock != null && lock.isHeldByCurrentThread())
            {
                lock.unlock();
            }
        }
        return proceed;
    }
}

数据准备

image.png

配置三个实例

image.png

启动项目

实例二获取到锁任务分配
image.png
实例一读取129, 131主键的数据
image.png
实例二读取130, 132主键的数据
image.png
实例三未读到数据
image.png

数据库

image.png

相关文章

服务器端口转发,带你了解服务器端口转发
服务器开放端口,服务器开放端口的步骤
产品推荐:7月受欢迎AI容器镜像来了,有Qwen系列大模型镜像
如何使用 WinGet 下载 Microsoft Store 应用
百度搜索:蓝易云 – 熟悉ubuntu apt-get命令详解
百度搜索:蓝易云 – 域名解析成功但ping不通解决方案

发布评论