前言
今天要要分享的要是紧接之前的设计:物联网设备流水入库TDengine改造方案,这里是具体的实现过程。这个是TDengine可自动扩展列方案,这个方案实现代码绝对是目前独家,关注我,你值得拥有。
一、整体思路
整体思路:消费信息 》》 数据转换 》》组织sql 》》orm框架自动配备数据源》》执行入库TDengine》》异常处理(扩展的核心)》》DDL执行扩列》》再次执行入库。。。。
这里大家应该可以猜到具体做法了,其实要不是因为这个列不固定,实现起来可简单多了,也可以用超级表,而且性能也会好很多。更重要的是可以用ORM框架,基本不用写啥sql。而且查询结果用实体接受数据,不会出现VARCHAR字段不能正确显示字符串的问题(我就是被这个坑了下)。
其实也可以用flink等消费信息,做入库处理,当然这样处理可就不能用ORM框架了,只能用经典的JDBC。
核心思路:根据设备上报数据,做插入数据转换sql,执行入库处理异常,根据异常做DDL操作,实现自动扩列,最后入库。上报的数据:json串做数据转换,数据值做反射获取类型,转换为对应的扩列sql执行、组织入库sql。
二、实现流程图
我的整体环境:SpringBoot3 + mybatisPlus + 双数据源(mysql、TDengine)+ 集成kafka
消费上游平台放入kafka的信息,然后走以上流程,目标执行入库TDengine。
三、核心代码
这里的整体框架我之前的博文有写,并且是公开独家分享到csdn的gitCode:gitcode.net/zwrlj527/da…
1.引入库
org.springframework.kafka
spring-kafka
2.配置文件
spring:
#kafka配置
kafka:
#bootstrap-servers: 192.168.200.72:9092,192.168.200.73:9092
#bootstrap-servers: 192.168.200.83:9092,192.168.200.84:9092
bootstrap-servers: localhost:9092
client-id: dc-device-flow-analyze
consumer:
group-id: dc-device-flow-analyze-consumer-group
max-poll-records: 10
#Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项 【latest, earliest, none】
auto-offset-reset: earliest
#是否开启自动提交
enable-auto-commit: false
#自动提交的时间间隔
auto-commit-interval: 1000
listener:
ack-mode: MANUAL_IMMEDIATE
concurrency: 1 #推荐设置为topic的分区数
type: BATCH #开启批量监听
#消费topic配置
xiaotian:
analyze:
device:
flow:
topic:
consumer: device-flow
3.kafka消费监听
package com.xiaotian.datagenius.kafka;
import com.xiaotian.datagenius.service.DataTransService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 消费者listener
*
* @author zhengwen
**/
@Slf4j
@Component
public class KafkaListenConsumer {
@Autowired
private DataTransService dataTransService;
/**
* 设备流水listenner
*
* @param records 消费信息
* @param ack Ack机制
*/
@KafkaListener(topics = "${easylinkin.analyze.device.flow.topic.consumer}")
public void deviceFlowListen(List records, Acknowledgment ack) {
log.debug("=====设备流水deviceFlowListen消费者接收信息====");
try {
for (ConsumerRecord record : records) {
log.debug("---开启线程解析设备流水数据:{}", record.toString());
dataTransService.deviceFlowTransSave(record);
}
} catch (Exception e) {
log.error("----设备流水数据消费者解析数据异常:{}", e.getMessage(), e);
} finally {
//手动提交偏移量
ack.acknowledge();
}
}
}
4.消息具体处理方法(实现)
package com.xiaotian.datagenius.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.xiaotian.datagenius.mapper.tdengine.DeviceFlowRecordMapper;
import com.xiaotian.datagenius.mapper.tdengine.TableOperateMapper;
import com.xiaotian.datagenius.service.DataTransService;
import com.xiaotian.datagenius.utils.TDengineDbUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.*;
/**
* @author zhengwen
*/
@Slf4j
@Service
public class DataTransServiceImpl implements DataTransService {
/**
* 专门记录业务错误日志
*/
private final static Logger logger = LoggerFactory.getLogger("businessExp");
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private TableOperateMapper tableOperateMapper;
@Autowired
private DeviceFlowRecordMapper deviceFlowRecordMapper;
@Override
public void deviceFlowTransSave(ConsumerRecord record) {
log.debug("----设备流水转换解析存储----");
log.debug(String.format("offset = %d, key = %s, value = %s%n n", record.offset(), record.key(), record.value()));
//字段不可控,所以没有实体可言,只能直接sql
//先直接执行插入,try异常 -> 如果是报字段不存在 -> 执行校验字段 -> dml创建字段
//再执行插入
String stableName = "device_flow_mater";
String tableName = "device_flow_record";
String recordStr = record.value().toString();
if (JSONUtil.isTypeJSON(recordStr)) {
JSONObject recordJson = JSONUtil.parseObj(recordStr);
//初始化语句
Map columnData = new HashMap();
String insertSql = initDataInsertSql(recordJson, tableName, columnData);
//保存数据
saveRecord(recordJson, insertSql, columnData, tableName);
} else {
logger.error("---设备上报数据推送信息格式异常,无法解析---");
}
}
/**
* 初始化数据插入语句
*
* @param recordJson 记录json
* @param tableName 表名
* @param columnData 字段信息
* @return 数据插入语句
*/
private String initDataInsertSql(JSONObject recordJson, String tableName, Map columnData) {
//这里先转换成sql的字段、value
StringJoiner columnSj = new StringJoiner(",");
StringJoiner valueSj = new StringJoiner(",");
String insertSql = transInitInsertSql(tableName, columnSj, valueSj, recordJson, columnData);
if (StringUtils.isBlank(insertSql)) {
logger.error("---上报数据转插入语句异常,上报数据:{}", JSONUtil.toJsonStr(recordJson));
return null;
}
return insertSql;
}
/**
* 保存记录
*
* @param recordJson 记录json对象
* @param insertSql 插入语句
* @param columnData 字段信息
* @param tableName 普通表或子表
*/
private void saveRecord(JSONObject recordJson, String insertSql, Map columnData, String tableName) {
try {
//boolean insertRes = SqlRunner.db(DeviceFlowMaterRecord.class).insert(insertSql, '1');
int num = deviceFlowRecordMapper.insert(insertSql);
} catch (Exception e) {
logger.error("Error inserting,{}", e.getMessage());
Throwable throwable = e.getCause();
String msg = throwable.getMessage();
//报缺少字段、字段长度不够
if (msg.contains("Invalid column name:") || msg.contains("Value too long for column/tag")) {
transAddOrChangeColumnsSql(columnData, tableName, recordJson, insertSql);
}
}
}
/**
* 转换扩展列
*
* @param columnData 上报数据字段信息map
* @param tableName 表名
* @param recordJson 上报数据json
* @param insertSql 插入语句
*/
private void transAddOrChangeColumnsSql(Map columnData, String tableName, JSONObject recordJson, String insertSql) {
String showColumnsSql = "desc " + tableName;
List columnLs = tableOperateMapper.operateSql(showColumnsSql);
if (CollectionUtil.isNotEmpty(columnLs)) {
//StringBuffer sbf = new StringBuffer();
//sbf.append("ALTER TABLE ").append(tableName).append(" ADD COLUMN ");
Map tableColumns = new HashMap();
columnLs.stream().forEach(c -> {
Object byBufferObj = c.get("field");
//获取字段
String field = TDengineDbUtil.getColumnInfoBy(byBufferObj);
tableColumns.put(field, c);
});
columnData.entrySet().forEach(c -> {
String key = c.getKey();
Map columnMp = c.getValue();
String length = columnMp.get("length");
if (tableColumns.containsKey(key)) {
//包含字段,比较数据类型长度
Map tcMp = tableColumns.get(key);
Object byBufferObj = tcMp.get("length");
//获取字段长度
String dbLength = TDengineDbUtil.getColumnInfoBy(byBufferObj);
if (dbLength != null) {
if (Integer.parseInt(length) > Integer.parseInt(dbLength)) {
String changeColumnSql = TDengineDbUtil.getColumnChangeSql(tableName,length,key);
tableOperateMapper.operateSql(changeColumnSql);
}
}
} else {
//不包含需要执行增加字段
String addColumnSql = TDengineDbUtil.getColumnAddSql(tableName,length,key);
tableOperateMapper.operateSql(addColumnSql);
}
});
//复调存储
saveRecord(recordJson, insertSql, columnData, tableName);
}
}
/**
* 转换初始化插入语句sql
*
* @param tableName 表名
* @param columnSj 字段字符串
* @param valueSj 值字符串
* @param recordJson 上报数据json
* @param columnData 字段Map
* @return 插入语句sql
*/
private String transInitInsertSql(String tableName, StringJoiner columnSj, StringJoiner
valueSj, JSONObject recordJson, Map columnData) {
StringBuffer sb = new StringBuffer();
//子表不能扩展列,所以超级表思路走不通
sb.append("insert into ").append(tableName);
if (!JSONUtil.isNull(recordJson)) {
JSONObject tmpRecordJson = recordJson;
JSONObject dataJson = tmpRecordJson.getJSONObject("data");
Date collectTime = tmpRecordJson.getDate("collectTime");
tmpRecordJson.remove("data");
tmpRecordJson.entrySet().forEach(entry -> {
//TODO 这里要设置调整下数据库区分大小写后去掉
//String key = entry.getKey().toLowerCase();
String key = entry.getKey();
columnSj.add("`" + key + "`");
Object val = entry.getValue();
//TODO 校验字符串类型处理sql
int length = 5;
if (val != null) {
//TODO 几个时间字段传的是long,是转时间类型,还是改字段为字符串?
String valStr = TDengineDbUtil.convertValByKey(val,key);
valueSj.add(valStr);
length = valStr.length() + 5;
} else {
valueSj.add(null).add(",");
}
//TODO 字段数据类型后面要优化处理
Map columnMp = TDengineDbUtil.checkColumnType(key, val, length);
columnData.put(key, columnMp);
});
if (!JSONUtil.isNull(dataJson)) {
dataJson.entrySet().forEach(entry -> {
//TODO 这里要设置调整下数据库区分大小写后去掉
String key = entry.getKey();
columnSj.add("`" + key + "`");
Object val = entry.getValue();
int length = 3;
if (val != null) {
//TODO 几个时间字段传的是long,是转时间类型,还是改字段为字符串?
String valStr = TDengineDbUtil.convertValByKey(val,key);
valueSj.add(valStr);
length = valStr.length() + 1;
} else {
valueSj.add(null).add(",");
}
//TODO 字段数据类型后面要优化处理
Map columnMp = TDengineDbUtil.checkColumnType(key, val, length);
columnData.put(key, columnMp);
});
}
//Tags
//sb.append(" TAGS (").append(dataJson).append(",").append(deviceUnitCode).append(",").append(deviceCode).append(") ");
//sb.append(" TAGS ('").append(JSONUtil.toJsonStr(dataJson)).append("') ");
//Columns
columnSj.add("`data_ts`");
sb.append("(").append(columnSj.toString()).append(") ");
//Values
//valueSj.add("'" + DateUtil.format(collectTime, DatePattern.NORM_DATETIME_MS_FORMAT) + "'");
//主键应该是时间,不能是设备上报数据的时间,因为设备上报数据万一相同就更新了
valueSj.add("NOW");
sb.append(" VALUES (").append(valueSj.toString()).append(")");
logger.debug("----插入语句sql:", sb.toString());
return sb.toString();
}
return null;
}
}
5.工具类
package com.xiaotian.datagenius.utils;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import io.micrometer.core.instrument.util.TimeUtils;
import lombok.extern.slf4j.Slf4j;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
/**
* TDengine数据库工具类
*
* @author zhengwen
*/
@Slf4j
public class TDengineDbUtil {
/**
* orm框架执行ddl语句返回的字段是byte数组处理
*
* @param byBufferObj byte数组object对象
* @return
*/
public static String getColumnInfoBy(Object byBufferObj) {
try {
if (byBufferObj instanceof byte[]) {
byte[] bytes = (byte[]) byBufferObj;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.write(bytes);
oos.flush();
String strRead = new String(bytes);
oos.close();
bos.close();
return strRead;
}
} catch (IOException e) {
log.error("----字段异常:{}", e.getMessage());
}
return null;
}
/**
* 校验字段类型返回字段信息
*
* @param key 字段
* @param val 值
* @param length 长度
* @return 字段信息
*/
public static Map checkColumnType(String key, Object val, int length) {
Map columnMp = new HashMap();
columnMp.put("type", "String");
columnMp.put("length", String.valueOf(length));
return columnMp;
}
/**
* @param tableName
* @param length
* @param key
* @return
*/
public static String getColumnAddSql(String tableName, String length, String key) {
String beforeSql = "ALTER TABLE " + tableName + " ADD COLUMN ";
//TODO 处理字段类型
String addColumnSql = beforeSql + "`" + key + "` NCHAR(" + Integer.parseInt(length) + ")";
return addColumnSql;
}
/**
* @param tableName
* @param length
* @param key
* @return
*/
public static String getColumnChangeSql(String tableName, String length, String key) {
String changeLengthSql = "ALTER TABLE " + tableName + " MODIFY COLUMN ";
//TODO 处理字段类型
String changeColumnSql = changeLengthSql + "`" + key + "` NCHAR(" + length + ")";
return changeColumnSql;
}
/**
* 根据字段、字段值对插入sql的字段值做处理
*
* @param val 字段原始值
* @param key 字段
* @return 字段转换后的值
*/
public static String convertValByKey(Object val, String key) {
//其他全部当字符串处理
String valStr = "'" + val.toString() + "'";
//TODO 根据字段处理转换后的字段值,这里暂时对几个时间字段做特殊处理
if (key.equals("collectTime") || key.equals("createTime") || key.equals("storageTime")) {
if (val instanceof Long){
LocalDateTime localDateTime = LocalDateTimeUtil.of(Long.parseLong(val.toString()));
valStr = "'" + DateUtil.format(localDateTime,DatePattern.NORM_DATETIME_MS_PATTERN) + "'";
}
if (val instanceof Integer){
LocalDateTime localDateTime = LocalDateTimeUtil.of(Long.parseLong(val.toString() + 100));
valStr = "'" + DateUtil.format(localDateTime,DatePattern.NORM_DATETIME_MS_PATTERN) + "'";
}
}
return valStr;
}
}
6.Mapper的重要方法
TableOperateMapper
${sql}
DeviceFlowRecordMapper
${sql}
${sql}
select r.*
from device_flow_record r
where 1 = 1
and r.`deviceCode` = #{param2.deviceCode}
= #{param2.startTime} ]]>
order by r.`data_ts` desc
核心点就以上这些位置了,大家自行体会。
总结
- TDengine还不错,官方有交流群,群里也有技术支持,不过肯定不是每一个问题都有回复
- 各方面都在支持它,它的优化空间还很多,我们用开源实际也是在帮忙测试。就从开年到现在我就整这玩意,刚开始是3.0.2.3现在都迭代到3.0.2.5了
- ORM框架也在逐步支持,但是官方支持明确跟我说了可能ORM框架会拖慢,影响性能。
- 里面坑还是很多的,我就踩了乱码、返回字节码的问题
就写到这里,希望能帮到大家!!有需要帮助的可以发消息我。