功能背景: 从txt文档拉取1000w左右的电话号码
Rdisson:使用分布式锁,选出分配者来处理分片任务分配。
Kafka:创建主题指定三个分区每个分区对应一个消费者(服务实例)和再均衡特性(实例宕机或重启kafka会重新分配任务)完成数据处理。解耦数据和处理者。
代码实现
Kafka生成者配置类
package com.example.demo.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Bean
public Map producerConfigs() {
Map props = new HashMap();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.100:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional.id");//支持事务
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);//分区器
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);//消息幂等
return props;
}
@Bean
public ProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory(producerConfigs());
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
}
kafka消费者配置类
package com.example.demo.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig {
@Value("${server.port}")
private String serverPort;
@Bean
public Map consumerConfigs() {
Map props = new HashMap();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.100:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3);
// READ_COMMITTED 读取已提交的事务
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory(consumerConfigs());
}
@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
}
Redisson配置类
package com.example.demo.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redisson(){
Config config = new Config();
config.setLockWatchdogTimeout(60 * 1000);
//config.setTransportMode(TransportMode.EPOLL);
config.useSingleServer().setAddress("redis://192.168.8.100:6379");
// .addNodeAddress("redis://192.168.8.100:6379");
return Redisson.create(config);
}
}
任务分配处理
package com.example.demo.job;
import cn.hutool.core.date.DateUtil;
import com.alibaba.druid.util.StringUtils;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.example.demo.annotation.RedissonAopLock;
import com.example.demo.dao.TaskDao;
import com.example.demo.pojo.TaskInfo;
import com.example.demo.pojo.TaskMsgInfo;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
/**
* 任务分配
*/
@Slf4j
@Component
public class TaskDistribute {
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private TaskDao taskDao;
@Transactional
@RedissonAopLock(key = JobConstants.LOCK_KEY)
@Scheduled(cron = "0 19 17 * * ?")
public void distribute()
{
int handleNum = JobConstants.HANDLE_NUM;
QueryWrapper queryWrapper = new QueryWrapper();
String executeDate = DateUtil.format(new Date(), JobConstants.PULL_TIME_FORMAT);
queryWrapper.lambda().eq(TaskInfo::getExecuteDate, executeDate);
Integer count = taskDao.selectCount(queryWrapper);
if (count != null && count > 0)
{
log.info("任务已经创建!");
return;
}
log.info("开始任务创建!");
// 调用脚本执行 读取文件获取总数
int size = callCommand();
if (size == 0) {
return;
}
log.info("size:{}",size);
int frequency = (size % handleNum) == 0 ? size / handleNum : (size / handleNum) + 1;
List tasks = new ArrayList(frequency);
// 组装数据
int startIndex;
for (int j = 0; j ids:{}",ids);
sendMsg(ids);
log.info("完成任务创建!");
}
/**
* 发送数据到kafka
* @param ids 切片数据主键
*/
private void sendMsg(List ids){
ObjectMapper mapper = new ObjectMapper();
String msg;
for (Integer id : ids)
{
try
{
msg = mapper.writeValueAsString(new TaskMsgInfo(id));
}
catch (JsonProcessingException e)
{
continue;
}
kafkaTemplate.send(JobConstants.TASK_TOPIC,msg).addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable ex) {
throw new RuntimeException(ex);
}
@Override
public void onSuccess(SendResult result)
{
RecordMetadata data = result.getRecordMetadata();
log.info("send success topic:{},partition:{},offset{}",data.topic(),data.partition(),data.offset());
}
});
}
}
private int callCommand(){
String osName = System.getProperty("os.name");
String cmd = null;
if (osName.toLowerCase().contains("linux")){
cmd="/home/felven/word2vec/demo-classes.sh";
}
if (osName.toLowerCase().contains("windows")){
cmd="cmd /c type C:UsersyjDesktopiphone_num.txt | find /v /c""";
}
if(StringUtils.isEmpty(cmd)){
log.info("osName:{}",osName);
return 0;
}
try {
log.info("cmd:{}",cmd);
Process ps = Runtime.getRuntime().exec(cmd);
ps.waitFor();
BufferedReader br = new BufferedReader(new InputStreamReader(ps.getInputStream()));
StringBuilder sb = new StringBuilder();
String line;
while ((line = br.readLine()) != null) {
sb.append(line);
}
if (sb.length() == 0) {
log.info("文件内容为空!");
return 0;
}
return Integer.parseInt(sb.toString());
} catch (Exception e) {
e.printStackTrace();
}
return 0;
}
}
消息消费者类
package com.example.demo.job;
import cn.hutool.core.collection.CollectionUtil;
import com.example.demo.pojo.TaskMsgInfo;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Component
public class TaskConsumer {
@Autowired
private TaskExecute taskExecute;
@KafkaListener(topics = JobConstants.TASK_TOPIC,concurrency = "1",containerFactory = "kafkaListenerContainerFactory")
public void consumer(List record, Acknowledgment ack){
log.info("开始处理消息!!!");
if (CollectionUtil.isNotEmpty(record)){
List data = new ArrayList(record.size());
ObjectMapper mapper = new ObjectMapper();
for (Object obj : record){
TaskMsgInfo msgInfo;
try {
msgInfo = mapper.readValue(obj.toString(), TaskMsgInfo.class);
} catch (JsonProcessingException e) {
log.error("转换异常:{}",obj);
continue;
}
msgInfo.setNum(msgInfo.getNum() + 1);
if (msgInfo.getNum() > 3) {
log.info("重试次数超过3次数据丢弃,taskMsgInfo:{}",msgInfo);
continue;
}
data.add(msgInfo);
}
taskExecute.execute(data);
}
ack.acknowledge();
}
}
消息处理类
package com.example.demo.job;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.example.demo.contants.TaskStatusEnum;
import com.example.demo.dao.TaskDao;
import com.example.demo.pojo.TaskInfo;
import com.example.demo.pojo.TaskMsgInfo;
import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
@Component
public class TaskExecute {
@Autowired
private TaskDao taskDao;
@Value("${server.port}")
private String serverPort;
@Autowired
private KafkaTemplate kafkaTemplate;
public void execute(List record){
ObjectMapper mapper = new ObjectMapper();
try {
List ids = record.stream().map(TaskMsgInfo::getDbId).collect(Collectors.toList());
log.info("ids:{}",ids);
QueryWrapper queryWrapper = new QueryWrapper();
queryWrapper.lambda().in(TaskInfo::getId,ids)
.eq(TaskInfo::getExecuteDate, DateUtil.format(new Date(),JobConstants.PULL_TIME_FORMAT))
.eq(TaskInfo::getStatus, TaskStatusEnum.PENDING);
List tasks = taskDao.selectList(queryWrapper);
if (CollectionUtil.isEmpty(tasks))
{
log.info("taskId:{} 任务已经处理完成",ids);
return;
}
List list = null;
for (TaskInfo taskInfo :tasks) {
list = Files.readAllLines(Paths.get("C:UsersyjDesktopiphone_num.txt"), StandardCharsets.UTF_8).
stream().skip(taskInfo.getStart()).limit(JobConstants.HANDLE_NUM).collect(Collectors.toList());
// 读取到数据
log.info("num_list:{}",list);
taskInfo.setStatus(TaskStatusEnum.COMPLETE.name());
taskInfo.setHandle(serverPort);
taskDao.updateById(taskInfo);
}
list.clear();
} catch (IOException e) {
if (e instanceof JacksonException){
log.error("msg readValue Exception:{}",e.getMessage());
return;
}
try {
for (TaskMsgInfo taskMsgInfo : record){
kafkaTemplate.send(JobConstants.TASK_TOPIC,mapper.writeValueAsString(taskMsgInfo));
}
} catch (JsonProcessingException ex) {
log.error("writeValueAsString Exception:{}",ex.getMessage());
}
}
}
}
Redisson + AOP 获取锁
织点配置类
package com.example.demo.aop;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;
/**
* execution:匹配方法签名 "execution(* testExecution(..))"
* **within:**指定所在类或所在包下面的方法(Spring AOP 独有) "within(ric.study.demo.aop.svc..*)"
* @annotation:方法上具有特定的注解 @annotation:方法上具有特定的注解
* bean(idOrNameOfBean):匹配 bean 的名字(Spring AOP 独有)
*/
@Aspect
@Component
public class PointcutConfig {
@Pointcut("@annotation(com.example.demo.annotation.RedissonAopLock)")
public void redissonPointcut(){}
}
切面处理
package com.example.demo.aop;
import com.example.demo.annotation.RedissonAopLock;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
@Aspect
@Component
public class RedissonLockAspect {
@Autowired
private RedissonClient redissonClient;
@Around("com.example.demo.aop.PointcutConfig.redissonPointcut()")
public Object redissonLockAspect(ProceedingJoinPoint pjp) throws Throwable {
Object proceed;
MethodSignature signature = (MethodSignature)pjp.getSignature();
Method method = signature.getMethod();
RedissonAopLock annotation = method.getAnnotation(RedissonAopLock.class);
String key = annotation.key();
RLock lock = redissonClient.getLock(key);
try {
lock.lock();
proceed = pjp.proceed();
} finally {
if(lock != null && lock.isHeldByCurrentThread())
{
lock.unlock();
}
}
return proceed;
}
}
数据准备
配置三个实例
启动项目
实例二获取到锁任务分配
实例一读取129, 131主键的数据
实例二读取130, 132主键的数据
实例三未读到数据
数据库