RocketMQ消费者基于自定义属性实现SQL过滤

2023年 7月 16日 26.3k 0

1.介绍

基于SQL的过滤,因为对SQL语句执行了校验和SQL语法的解析,执行效率相对于tag方式较低,同时作为自定义属性,他不像tag是标准的结构体,所以在进行消息筛选的时候,相比起tag过滤,这种SQL过滤的方式,执行效率是相对比较慢的,只适用于数据量不大,需要灵活来进行调整的时候,才会考虑用它。

下面来看案例:

1.1.服务端配置

RocketMQ 默认并未开启自定义属性SQL过滤的选项,需要在配置文件中额外开启,如下所示:

enablePropertyFilter=true

master和slave都需要配置

2.消息生产者

消息发送方和标准发送有两点变化:

  • 可以不设置消息的Tag和Key,转而使用用户自定义属性,这里实现了source与id两个自定义属性的赋值
  • 利用message.putUserProperty为用户属性赋值
//消息过滤案例生产者
@Slf4j
public class SfProducer {
    public static void main(String[] args) {
        //DefaultMQProducer用于发送非事务消息
        DefaultMQProducer producer = new DefaultMQProducer("sf-producer-group");
        //注册NameServer地址
        producer.setNamesrvAddr("172.16.247.3:9876");
        try {
            //启动生产者实例
            producer.start();
            for (Integer i = 0; i < 10; i++) {
                Thread.sleep(1000);
                Integer rnd = new Random().nextInt(10);
                //用户自定义属性
                String source = "";
                switch (rnd % 3) {
                    case 0:
                        source = "jd";
                        break;
                    case 1:
                        source = "tmall";
                        break;
                    case 2:
                        source = "taobao";
                        break;
                }
                //消息数据
                String data = "第" + i + "条消息数据";
                //消息主题,使用用户自定义属性时可以不设置tag与key
                Message message = new Message("sf-sample-data", data.getBytes());
                message.putUserProperty("id", i.toString());
                message.putUserProperty("source", source);
                //发送结果
                SendResult result = producer.send(message);
                log.info("id:{},source:{},data:{}", i.toString(), source, data);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                //关闭连接
                producer.shutdown();
                log.info("连接已关闭");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

运行结果:

id:0,source:jd,data:第0条消息数据
id:1,source:tmall,data:第1条消息数据
id:2,source:taobao,data:第2条消息数据
id:3,source:taobao,data:第3条消息数据
id:4,source:jd,data:第4条消息数据
id:5,source:tmall,data:第5条消息数据
id:6,source:jd,data:第6条消息数据
id:7,source:tmall,data:第7条消息数据
id:8,source:jd,data:第8条消息数据
id:9,source:taobao,data:第9条消息数据

3.消息消费者

京东消费者

京东消费者负责消费source=’jd’的数据,和标准消费者最大的不同便是在subscribe方法第二个参数不再是Tag,而改为MessageSelector.bySql方法,利用WHERE子句写法对自定义属性实现过滤,源码如下:

@Slf4j
public class SfJDConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sf-jd-consumer-group");
        consumer.setNamesrvAddr("172.16.247.3:9876");
        consumer.setMessageModel(MessageModel.CLUSTERING);
        //利用SQL WHERE子句写法对自定义属性进行过滤
        consumer.subscribe("sf-sample-data", MessageSelector.bySql("source='jd'"));
        // 注册消息监听者
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                list.forEach(msg -> {
                    log.info("id:{},source:{},data:{}", msg.getUserProperty("id"), msg.getUserProperty("source"), new String(msg.getBody()));
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        log.info("集群消费者启动成功,正在监听新消息");
    }
}

运行结果:

集群消费者启动成功,正在监听新消息
id:8,source:jd,data:第8条消息数据
id:4,source:jd,data:第4条消息数据
id:0,source:jd,data:第0条消息数据
id:6,source:jd,data:第6条消息数据

阿里消费者

阿里消费者负责消费天猫与淘宝的数据,与京东消费者最明显的区别是:

因为业务范围不同,消费者组不一样;

bySQL的要获取多个数值,可用下面语法

  • source in (‘tmall’,’taobao’)
  • source = ‘tmall’ or source = ‘taobao’
//利用SQL WHERE子句写法对自定义属性进行过滤
consumer.subscribe("sf-sample-data", MessageSelector.bySql("source in ('tmall' ,'taobao')"));

运行结果:

集群消费者启动成功,正在监听新消息
id:9,source:taobao,data:第9条消息数据
id:5,source:tmall,data:第5条消息数据
id:7,source:tmall,data:第7条消息数据
id:3,source:taobao,data:第3条消息数据
id:2,source:taobao,data:第2条消息数据
id:1,source:tmall,data:第1条消息数据

 

相关文章

服务器端口转发,带你了解服务器端口转发
服务器开放端口,服务器开放端口的步骤
产品推荐:7月受欢迎AI容器镜像来了,有Qwen系列大模型镜像
如何使用 WinGet 下载 Microsoft Store 应用
百度搜索:蓝易云 – 熟悉ubuntu apt-get命令详解
百度搜索:蓝易云 – 域名解析成功但ping不通解决方案

发布评论