From 2181830afbf09debc9c7be9c004c1650088ae507 Mon Sep 17 00:00:00 2001 From: zhaohaihzb Date: Tue, 10 Mar 2026 15:21:42 +0800 Subject: [PATCH 1/5] tag add default --- .../client/annotation/ExtConsumerResetConfiguration.java | 4 ++-- .../rocketmq/client/annotation/RocketMQMessageListener.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java index f0b942f5..4f7dd2f3 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java @@ -31,11 +31,11 @@ String ACCESS_KEY_PLACEHOLDER = "${rocketmq.simple-consumer.accessKey:}"; String SECRET_KEY_PLACEHOLDER = "${rocketmq.simple-consumer.secretKey:}"; - String TAG_PLACEHOLDER = "${rocketmq.simple-consumer.tag:}"; + String TAG_PLACEHOLDER = "${rocketmq.simple-consumer.tag:*}"; String TOPIC_PLACEHOLDER = "${rocketmq.simple-consumer.topic:}"; String ENDPOINTS_PLACEHOLDER = "${rocketmq.simple-consumer.endpoints:}"; String CONSUMER_GROUP_PLACEHOLDER = "${rocketmq.simple-consumer.consumerGroup:}"; - String FILTER_EXPRESSION_TYPE_PLACEHOLDER = "${rocketmq.simple-consumer.filterExpressionType:}"; + String FILTER_EXPRESSION_TYPE_PLACEHOLDER = "${rocketmq.simple-consumer.filterExpressionType:tag}"; /** * The component name of the Consumer configuration. diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java index 6f0f836d..2c9588e9 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java @@ -31,7 +31,7 @@ String SECRET_KEY_PLACEHOLDER = "${rocketmq.push-consumer.secret-key:}"; String ENDPOINTS_PLACEHOLDER = "${rocketmq.push-consumer.endpoints:}"; String TOPIC_PLACEHOLDER = "${rocketmq.push-consumer.topic:}"; - String TAG_PLACEHOLDER = "${rocketmq.push-consumer.tag:}"; + String TAG_PLACEHOLDER = "${rocketmq.push-consumer.tag:*}"; /** * The property of "access-key". From dbc30a540bfe8025fe6da1cb6b5afe4a327cc444 Mon Sep 17 00:00:00 2001 From: zhaohaihzb Date: Tue, 10 Mar 2026 15:37:57 +0800 Subject: [PATCH 2/5] add SQL92 demo --- .../README-CN.md | 190 +++++++++ .../SQL92_USAGE.md | 389 ++++++++++++++++++ .../consumer/SQL92FilterConsumer.java | 68 +++ .../producer/SQL92ProducerApplication.java | 162 ++++++++ 4 files changed, 809 insertions(+) create mode 100644 rocketmq-v5-client-spring-boot-samples/SQL92_USAGE.md create mode 100644 rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/SQL92FilterConsumer.java create mode 100644 rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/producer/SQL92ProducerApplication.java diff --git a/rocketmq-v5-client-spring-boot-samples/README-CN.md b/rocketmq-v5-client-spring-boot-samples/README-CN.md index 59698fcf..b949274f 100644 --- a/rocketmq-v5-client-spring-boot-samples/README-CN.md +++ b/rocketmq-v5-client-spring-boot-samples/README-CN.md @@ -567,6 +567,196 @@ rocketmq.simple-consumer.filter-expression-type=tag +# SQL92 消息过滤 + + + +## 概述 + +RocketMQ V5 客户端支持 SQL92 消息过滤功能,允许消费者根据消息属性进行复杂的条件过滤。 + +### 支持的过滤类型 + +1. **TAG 过滤**(默认):基于 Tag 的简单过滤 +2. **SQL92 过滤**:基于消息属性的复杂条件过滤 + + + +## 配置说明 + +### application.properties 配置 + +```properties +# 基本配置 +rocketmq.simple-consumer.endpoints=localhost:8081 +rocketmq.simple-consumer.consumer-group=sql92Group +rocketmq.simple-consumer.topic=sql92Topic + +# TAG 过滤(默认) +rocketmq.simple-consumer.tag=* +rocketmq.simple-consumer.filter-expression-type=tag + +# SQL92 过滤 +# rocketmq.simple-consumer.tag=(type = 'vip' AND amount > 500) +# rocketmq.simple-consumer.filter-expression-type=sql92 +``` + + + +## SQL92 表达式语法 + +### 比较操作符 + +- `=` : 等于 +- `<>` : 不等于 +- `>` : 大于 +- `<` : 小于 +- `>=` : 大于等于 +- `<=` : 小于等于 +- `BETWEEN` : 在某个范围内 +- `IN` : 在集合中 +- `LIKE` : 模糊匹配 + +### 逻辑操作符 + +- `AND` : 与 +- `OR` : 或 +- `NOT` : 非 + + + +## 使用示例 + +### 示例 1:简单等值过滤 + +只消费 type='vip' 的消息: + +```properties +rocketmq.simple-consumer.filter-expression-type=sql92 +rocketmq.simple-consumer.tag=(type = 'vip') +``` + +发送消息时添加属性: + +```java +Message message = MessageBuilder.withPayload(order) + .setHeader("type", "vip") + .build(); +``` + +### 示例 2:数值范围过滤 + +消费金额在 100-1000 之间的订单: + +```properties +rocketmq.simple-consumer.filter-expression-type=sql92 +rocketmq.simple-consumer.tag=(amount >= 100 AND amount <= 1000) +``` + +### 示例 3:多条件组合 + +消费 VIP 用户且金额大于 500 的订单: + +```properties +rocketmq.simple-consumer.filter-expression-type=sql92 +rocketmq.simple-consumer.tag=(type = 'vip' AND amount > 500) +``` + +### 示例 4:IN 操作符 + +消费特定地区的订单: + +```properties +rocketmq.simple-consumer.filter-expression-type=sql92 +rocketmq.simple-consumer.tag=(region IN ('Beijing', 'Shanghai', 'Guangzhou')) +``` + +### 示例 5:LIKE 模糊匹配 + +消费以 A 开头的产品类别: + +```properties +rocketmq.simple-consumer.filter-expression-type=sql92 +rocketmq.simple-consumer.tag=(category LIKE 'A%') +``` + + + +## 消费者代码示例 + +```java +@Service +@RocketMQMessageListener( + endpoints = "${demo.sql92.rocketmq.endpoints:}", + topic = "${demo.sql92.rocketmq.topic:}", + consumerGroup = "${demo.sql92.rocketmq.consumer-group:}", + tag = "${demo.sql92.rocketmq.tag:}", + filterExpressionType = "${demo.sql92.rocketmq.filter-expression-type:sql92}" +) +public class SQL92FilterConsumer implements RocketMQListener { + + @Override + public ConsumeResult consume(MessageView messageView) { + log.info("收到 SQL92 过滤消息 - ID: {}, 属性:{}", + messageView.getMessageId(), + messageView.getProperties()); + return ConsumeResult.SUCCESS; + } +} +``` + + + +## 生产者代码示例 + +```java +@SpringBootApplication +public class SQL92ProducerApplication implements CommandLineRunner { + + @Resource + private RocketMQClientTemplate rocketMQClientTemplate; + + @Override + public void run(String... args) throws ClientException { + // 发送带属性的消息 + Message message = MessageBuilder.withPayload("VIP Order") + .setHeader("type", "vip") + .setHeader("amount", 600) + .setHeader("region", "Beijing") + .build(); + + rocketMQClientTemplate.syncSendNormalMessage("orderTopic", message); + } +} +``` + + + +## 注意事项 + +1. **属性名称限制**: + - 属性名称不能包含空格和特殊字符 + - 建议使用字母、数字和下划线 + +2. **性能考虑**: + - SQL92 过滤比 TAG 过滤更消耗资源 + - 简单的过滤场景优先使用 TAG 过滤 + +3. **表达式长度**: + - SQL92 表达式长度有限制,不宜过长 + +4. **数据类型**: + - 确保发送的消息属性类型与过滤表达式中的类型一致 + - 字符串值需要用单引号包裹 + +5. **NULL 值处理**: + - 使用 `IS NULL` 或 `IS NOT NULL` 判断空值 + - 不要使用 `= NULL` + +详细示例请参考 [SQL92_USAGE.md](SQL92_USAGE.md) + + + # ACL功能 diff --git a/rocketmq-v5-client-spring-boot-samples/SQL92_USAGE.md b/rocketmq-v5-client-spring-boot-samples/SQL92_USAGE.md new file mode 100644 index 00000000..7f67e8f3 --- /dev/null +++ b/rocketmq-v5-client-spring-boot-samples/SQL92_USAGE.md @@ -0,0 +1,389 @@ +# SQL92 消息过滤使用指南 + +## 概述 + +RocketMQ V5 客户端支持两种消息过滤方式: +- **TAG 过滤**:基于 Tag 进行简单过滤 +- **SQL92 过滤**:基于消息属性进行更复杂的条件过滤 + +## 配置说明 + +### 1. TAG 过滤(默认) + +#### application.properties 配置 +```properties +rocketmq.simple-consumer.endpoints=localhost:8081 +rocketmq.simple-consumer.consumer-group=sql92Group +rocketmq.simple-consumer.topic=sql92Topic +rocketmq.simple-consumer.tag=* +rocketmq.simple-consumer.filter-expression-type=tag +``` + +#### 注解配置示例 +```java +@Service +@RocketMQMessageListener( + endpoints = "${demo.rocketmq.endpoints:}", + topic = "${demo.rocketmq.topic:}", + consumerGroup = "${demo.rocketmq.consumer-group:}", + tag = "${demo.rocketmq.tag:*}", + filterExpressionType = "tag" // 或者不填,默认为 tag +) +public class TagConsumer implements RocketMQListener { + @Override + public ConsumeResult consume(MessageView messageView) { + System.out.println("收到消息:" + messageView); + return ConsumeResult.SUCCESS; + } +} +``` + +### 2. SQL92 过滤 + +#### application.properties 配置 +```properties +rocketmq.simple-consumer.endpoints=localhost:8081 +rocketmq.simple-consumer.consumer-group=sql92Group +rocketmq.simple-consumer.topic=sql92Topic +rocketmq.simple-consumer.tag= +rocketmq.simple-consumer.filter-expression-type=sql92 +``` + +#### 注解配置示例 +```java +@Service +@RocketMQMessageListener( + endpoints = "${demo.rocketmq.endpoints:}", + topic = "${demo.rocketmq.topic:}", + consumerGroup = "${demo.rocketmq.consumer-group:}", + tag = "", // SQL92 模式下 tag 留空 + filterExpressionType = "sql92" +) +public class SQL92Consumer implements RocketMQListener { + @Override + public ConsumeResult consume(MessageView messageView) { + System.out.println("收到消息:" + messageView); + return ConsumeResult.SUCCESS; + } +} +``` + +## SQL92 表达式语法 + +SQL92 过滤支持以下语法: + +### 1. 比较操作符 +- `=` : 等于 +- `<>` : 不等于 +- `>` : 大于 +- `<` : 小于 +- `>=` : 大于等于 +- `<=` : 小于等于 +- `BETWEEN` : 在某个范围内 +- `IN` : 在集合中 +- `LIKE` : 模糊匹配 + +### 2. 逻辑操作符 +- `AND` : 与 +- `OR` : 或 +- `NOT` : 非 + +### 3. 数据类型 +- 数字:直接写,如 `123` +- 字符串:用单引号包裹,如 `'value'` +- 布尔值:`TRUE`, `FALSE` +- NULL:`IS NULL`, `IS NOT NULL` + +## 使用示例 + +### 示例 1:简单等值过滤 +```java +// 只消费 type='vip' 的消息 +@RocketMQMessageListener( + topic = "orderTopic", + consumerGroup = "vipConsumerGroup", + tag = "", + filterExpressionType = "sql92" +) +public class VIPConsumer implements RocketMQListener { + @Override + public ConsumeResult consume(MessageView messageView) { + return ConsumeResult.SUCCESS; + } +} +``` + +发送消息时添加属性: +```java +MessageBuilder.withPayload(order) + .setHeader("type", "vip") + .build(); +``` + +### 示例 2:数值范围过滤 +```java +// 消费金额在 100-1000 之间的订单 +@RocketMQMessageListener( + topic = "orderTopic", + consumerGroup = "mediumOrderConsumer", + tag = "", + filterExpressionType = "sql92" +) +public class MediumOrderConsumer implements RocketMQListener { + @Override + public ConsumeResult consume(MessageView messageView) { + return ConsumeResult.SUCCESS; + } +} +``` + +配置文件: +```properties +rocketmq.simple-consumer.filter-expression-type=sql92 +rocketmq.simple-consumer.tag=(amount >= 100 AND amount <= 1000) +``` + +### 示例 3:多条件组合 +```java +// 消费 VIP 用户且金额大于 500 的订单 +@RocketMQMessageListener( + topic = "orderTopic", + consumerGroup = "vipLargeOrderConsumer", + tag = "", + filterExpressionType = "sql92" +) +public class VipLargeOrderConsumer implements RocketMQListener { + @Override + public ConsumeResult consume(MessageView messageView) { + return ConsumeResult.SUCCESS; + } +} +``` + +配置文件: +```properties +rocketmq.simple-consumer.filter-expression-type=sql92 +rocketmq.simple-consumer.tag=(type = 'vip' AND amount > 500) +``` + +### 示例 4:IN 操作符 +```java +// 消费特定地区的订单 +@RocketMQMessageListener( + topic = "orderTopic", + consumerGroup = "regionConsumer", + tag = "", + filterExpressionType = "sql92" +) +public class RegionConsumer implements RocketMQListener { + @Override + public ConsumeResult consume(MessageView messageView) { + return ConsumeResult.SUCCESS; + } +} +``` + +配置文件: +```properties +rocketmq.simple-consumer.filter-expression-type=sql92 +rocketmq.simple-consumer.tag=(region IN ('Beijing', 'Shanghai', 'Guangzhou')) +``` + +### 示例 5:LIKE 模糊匹配 +```java +// 消费以 A 开头的产品类别 +@RocketMQMessageListener( + topic = "productTopic", + consumerGroup = "categoryAConsumer", + tag = "", + filterExpressionType = "sql92" +) +public class CategoryAConsumer implements RocketMQListener { + @Override + public ConsumeResult consume(MessageView messageView) { + return ConsumeResult.SUCCESS; + } +} +``` + +配置文件: +```properties +rocketmq.simple-consumer.filter-expression-type=sql92 +rocketmq.simple-consumer.tag=(category LIKE 'A%') +``` + +### 示例 6:ExtConsumerResetConfiguration 方式 +```java +// 使用 ExtConsumerResetConfiguration 注解配置 SQL92 过滤 +@ExtConsumerResetConfiguration( + topic = "${ext.rocketmq.topic:}", + consumerGroup = "${ext.rocketmq.consumer-group:}", + tag = "${ext.rocketmq.tag:}", + filterExpressionType = "${ext.rocketmq.filter-expression-type:sql92}" +) +public class ExtRocketMQTemplate extends RocketMQClientTemplate { +} +``` + +配置文件: +```properties +ext.rocketmq.topic=sql92Topic +ext.rocketmq.consumer-group=extSql92Group +ext.rocketmq.tag=(status = 'ACTIVE' AND priority > 5) +ext.rocketmq.filter-expression-type=sql92 +``` + +## 发送带属性的消息 + +```java +@SpringBootApplication +public class ProducerApplication implements CommandLineRunner { + + @Resource + private RocketMQClientTemplate rocketMQClientTemplate; + + @Value("${demo.rocketmq.topic}") + private String topic; + + @Override + public void run(String... args) { + // 发送带属性的消息 + Order order= new Order(); + order.setId(1L); + order.setAmount(600); + order.setType("vip"); + order.setRegion("Beijing"); + + Message message = MessageBuilder.withPayload(order) + .setHeader("type", "vip") + .setHeader("amount", 600) + .setHeader("region", "Beijing") + .setHeader("status", "ACTIVE") + .setHeader("priority", 8) + .build(); + + rocketMQClientTemplate.syncSendNormalMessage(topic, message); + } +} +``` + +## 注意事项 + +1. **属性名称限制**: + - 属性名称不能包含空格和特殊字符 + - 建议使用字母、数字和下划线 + +2. **性能考虑**: + - SQL92 过滤比 TAG 过滤更消耗资源 + - 简单的过滤场景优先使用 TAG 过滤 + +3. **表达式长度**: + - SQL92 表达式长度有限制,不宜过长 + +4. **数据类型**: + - 确保发送的消息属性类型与过滤表达式中的类型一致 + +5. **NULL 值处理**: + - 使用 `IS NULL` 或 `IS NOT NULL` 判断空值 + - 不要使用 `= NULL` + +## 完整示例 + +### 消费者 +```java +package org.apache.rocketmq.samples.springboot.consumer; + +import org.apache.rocketmq.client.annotation.RocketMQMessageListener; +import org.apache.rocketmq.client.api.consumer.ConsumeResult; +import org.apache.rocketmq.client.api.message.MessageView; +import org.apache.rocketmq.client.support.RocketMQListener; +import org.springframework.stereotype.Service; + +@Service +@RocketMQMessageListener( + endpoints = "${demo.sql92.rocketmq.endpoints:}", + topic = "${demo.sql92.rocketmq.topic:}", + consumerGroup = "${demo.sql92.rocketmq.consumer-group:}", + tag = "${demo.sql92.rocketmq.tag:}", + filterExpressionType = "${demo.sql92.rocketmq.filter-expression-type:sql92}" +) +public class SQL92FilterConsumer implements RocketMQListener { + + @Override + public ConsumeResult consume(MessageView messageView) { + System.out.printf("收到消息 - ID: %s, 属性:%s%n", + messageView.getMessageId(), + messageView.getProperties()); + + // 业务处理逻辑 + return ConsumeResult.SUCCESS; + } +} +``` + +### application.properties +```properties +# SQL92 过滤配置 +demo.sql92.rocketmq.endpoints=localhost:8081 +demo.sql92.rocketmq.topic=orderTopic +demo.sql92.rocketmq.consumer-group=sql92FilterGroup +demo.sql92.rocketmq.tag=(type = 'vip' AND amount > 500) +demo.sql92.rocketmq.filter-expression-type=sql92 +``` + +### 生产者 +```java +package org.apache.rocketmq.samples.springboot.producer; + +import org.apache.rocketmq.client.core.RocketMQClientTemplate; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +import javax.annotation.Resource; + +@SpringBootApplication +public class SQL92ProducerApplication implements CommandLineRunner { + + @Resource + private RocketMQClientTemplate rocketMQClientTemplate; + + public static void main(String[] args) { + SpringApplication.run(SQL92ProducerApplication.class, args); + } + + @Override + public void run(String... args) { + sendVipOrder(); + sendNormalOrder(); + } + + private void sendVipOrder() { + Message message = MessageBuilder.withPayload("VIP Order") + .setHeader("type", "vip") + .setHeader("amount", 600) + .setHeader("region", "Beijing") + .build(); + + rocketMQClientTemplate.syncSendNormalMessage("orderTopic", message); + System.out.println("VIP 订单已发送"); + } + + private void sendNormalOrder() { + Message message = MessageBuilder.withPayload("Normal Order") + .setHeader("type", "normal") + .setHeader("amount", 200) + .setHeader("region", "Shanghai") + .build(); + + rocketMQClientTemplate.syncSendNormalMessage("orderTopic", message); + System.out.println("普通订单已发送"); + } +} +``` + +## 总结 + +RocketMQ V5 客户端完全支持 SQL92 消息过滤,通过设置 `filterExpressionType=sql92` 并在 `tag` 参数中编写 SQL92 表达式即可实现复杂的消息过滤逻辑。这为消息订阅提供了更大的灵活性。 diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/SQL92FilterConsumer.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/SQL92FilterConsumer.java new file mode 100644 index 00000000..32ffb07d --- /dev/null +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/SQL92FilterConsumer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.samples.springboot.consumer; + +import org.apache.rocketmq.client.annotation.RocketMQMessageListener; +import org.apache.rocketmq.client.api.consumer.ConsumeResult; +import org.apache.rocketmq.client.api.message.MessageView; +import org.apache.rocketmq.client.support.RocketMQListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +/** + * SQL92 消息过滤消费者示例 + * + * 该消费者使用 SQL92 表达式过滤消息,只消费满足特定条件的消息: + * - type = 'vip':VIP 类型的消息 + * - amount > 500:金额大于 500 + * + * 配置文件 application.properties 中需要设置: + * demo.sql92.rocketmq.endpoints=localhost:8081 + * demo.sql92.rocketmq.topic=orderTopic + * demo.sql92.rocketmq.consumer-group=sql92VipConsumerGroup + * demo.sql92.rocketmq.tag=(type = 'vip' AND amount > 500) + * demo.sql92.rocketmq.filter-expression-type=sql92 + */ +@Service +@RocketMQMessageListener( + endpoints = "${demo.sql92.rocketmq.endpoints:}", + topic = "${demo.sql92.rocketmq.topic:}", + consumerGroup = "${demo.sql92.rocketmq.consumer-group:}", + tag = "${demo.sql92.rocketmq.tag:}", + filterExpressionType = "${demo.sql92.rocketmq.filter-expression-type:sql92}" +) +public class SQL92FilterConsumer implements RocketMQListener { + + private static final Logger log = LoggerFactory.getLogger(SQL92FilterConsumer.class); + + @Override + public ConsumeResult consume(MessageView messageView) { + log.info("收到 SQL92 过滤消息 - ID: {}, Topic: {}, Tag: {}", + messageView.getMessageId(), + messageView.getTopic(), + messageView.getTag().orElse("")); + + // 打印消息属性 + log.info("消息属性:{}", messageView.getProperties()); + + // 这里可以添加业务逻辑处理 + // 例如:解析消息内容,处理订单等 + + return ConsumeResult.SUCCESS; + } +} diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/producer/SQL92ProducerApplication.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/producer/SQL92ProducerApplication.java new file mode 100644 index 00000000..e11070cd --- /dev/null +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/producer/SQL92ProducerApplication.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.samples.springboot.producer; + +import org.apache.rocketmq.client.apis.ClientException; +import org.apache.rocketmq.client.core.RocketMQClientTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +import javax.annotation.Resource; + +/** + * SQL92 消息生产者示例 + * + * 演示如何发送带有属性的消息,以便消费者可以使用 SQL92 进行过滤 + * + * 配置文件 application.properties 中需要设置: + * rocketmq.producer.endpoints=localhost:8081 + * rocketmq.producer.topic=orderTopic + * demo.rocketmq.order-topic=orderTopic + */ +@SpringBootApplication +public class SQL92ProducerApplication implements CommandLineRunner { + + private static final Logger log = LoggerFactory.getLogger(SQL92ProducerApplication.class); + + @Resource + private RocketMQClientTemplate rocketMQClientTemplate; + + public static void main(String[] args) { + SpringApplication.run(SQL92ProducerApplication.class, args); + } + + @Override + public void run(String... args) throws ClientException { + log.info("开始发送 SQL92 测试消息..."); + + // 发送 VIP 大额订单(会被 SQL92 过滤器匹配) + sendVipOrder(1L, 600.0, "Beijing"); + sendVipOrder(2L, 800.0, "Shanghai"); + + // 发送普通订单(不会被 SQL92 过滤器匹配) + sendNormalOrder(3L, 200.0, "Guangzhou"); + sendNormalOrder(4L, 300.0, "Shenzhen"); + + // 发送 VIP 小额订单(不会被 SQL92 过滤器匹配) + sendVipOrder(5L, 100.0, "Hangzhou"); + + log.info("所有消息已发送完成"); + } + + /** + * 发送 VIP 订单 + */ + private void sendVipOrder(Long orderId, Double amount, String region) throws ClientException { + Order order = new Order(); + order.setId(orderId); + order.setAmount(amount); + order.setType("vip"); + order.setRegion(region); + + Message message = MessageBuilder.withPayload(order) + .setHeader("type", "vip") + .setHeader("amount", amount) + .setHeader("region", region) + .build(); + + rocketMQClientTemplate.syncSendNormalMessage("orderTopic", message); + log.info("VIP 订单已发送 - ID: {}, 金额:{}, 地区:{}", orderId, amount, region); + } + + /** + * 发送普通订单 + */ + private void sendNormalOrder(Long orderId, Double amount, String region) throws ClientException { + Order order = new Order(); + order.setId(orderId); + order.setAmount(amount); + order.setType("normal"); + order.setRegion(region); + + Message message = MessageBuilder.withPayload(order) + .setHeader("type", "normal") + .setHeader("amount", amount) + .setHeader("region", region) + .build(); + + rocketMQClientTemplate.syncSendNormalMessage("orderTopic", message); + log.info("普通订单已发送 - ID: {}, 金额:{}, 地区:{}", orderId, amount, region); + } + + /** + * 订单实体类 + */ + public static class Order { + private Long id; + private Double amount; + private String type; + private String region; + + public Long getId() { + return id; + } + + public void setId(Long id) { + this.id = id; + } + + public Double getAmount() { + return amount; + } + + public void setAmount(Double amount) { + this.amount = amount; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getRegion() { + return region; + } + + public void setRegion(String region) { + this.region = region; + } + + @Override + public String toString() { + return "Order{" + + "id=" + id + + ", amount=" + amount + + ", type='" + type + '\'' + + ", region='" + region + '\'' + + '}'; + } + } +} From a5441cf98b015a372f88b3e8e3169f5ab22f1139 Mon Sep 17 00:00:00 2001 From: zhaohaihzb Date: Thu, 12 Mar 2026 14:28:09 +0800 Subject: [PATCH 3/5] delete md --- .../README-CN.md | 2 - .../SQL92_USAGE.md | 389 ------------------ .../consumer/SQL92FilterConsumer.java | 20 +- .../producer/SQL92ProducerApplication.java | 26 +- 4 files changed, 23 insertions(+), 414 deletions(-) delete mode 100644 rocketmq-v5-client-spring-boot-samples/SQL92_USAGE.md diff --git a/rocketmq-v5-client-spring-boot-samples/README-CN.md b/rocketmq-v5-client-spring-boot-samples/README-CN.md index b949274f..8dc1895f 100644 --- a/rocketmq-v5-client-spring-boot-samples/README-CN.md +++ b/rocketmq-v5-client-spring-boot-samples/README-CN.md @@ -753,8 +753,6 @@ public class SQL92ProducerApplication implements CommandLineRunner { - 使用 `IS NULL` 或 `IS NOT NULL` 判断空值 - 不要使用 `= NULL` -详细示例请参考 [SQL92_USAGE.md](SQL92_USAGE.md) - # ACL功能 diff --git a/rocketmq-v5-client-spring-boot-samples/SQL92_USAGE.md b/rocketmq-v5-client-spring-boot-samples/SQL92_USAGE.md deleted file mode 100644 index 7f67e8f3..00000000 --- a/rocketmq-v5-client-spring-boot-samples/SQL92_USAGE.md +++ /dev/null @@ -1,389 +0,0 @@ -# SQL92 消息过滤使用指南 - -## 概述 - -RocketMQ V5 客户端支持两种消息过滤方式: -- **TAG 过滤**:基于 Tag 进行简单过滤 -- **SQL92 过滤**:基于消息属性进行更复杂的条件过滤 - -## 配置说明 - -### 1. TAG 过滤(默认) - -#### application.properties 配置 -```properties -rocketmq.simple-consumer.endpoints=localhost:8081 -rocketmq.simple-consumer.consumer-group=sql92Group -rocketmq.simple-consumer.topic=sql92Topic -rocketmq.simple-consumer.tag=* -rocketmq.simple-consumer.filter-expression-type=tag -``` - -#### 注解配置示例 -```java -@Service -@RocketMQMessageListener( - endpoints = "${demo.rocketmq.endpoints:}", - topic = "${demo.rocketmq.topic:}", - consumerGroup = "${demo.rocketmq.consumer-group:}", - tag = "${demo.rocketmq.tag:*}", - filterExpressionType = "tag" // 或者不填,默认为 tag -) -public class TagConsumer implements RocketMQListener { - @Override - public ConsumeResult consume(MessageView messageView) { - System.out.println("收到消息:" + messageView); - return ConsumeResult.SUCCESS; - } -} -``` - -### 2. SQL92 过滤 - -#### application.properties 配置 -```properties -rocketmq.simple-consumer.endpoints=localhost:8081 -rocketmq.simple-consumer.consumer-group=sql92Group -rocketmq.simple-consumer.topic=sql92Topic -rocketmq.simple-consumer.tag= -rocketmq.simple-consumer.filter-expression-type=sql92 -``` - -#### 注解配置示例 -```java -@Service -@RocketMQMessageListener( - endpoints = "${demo.rocketmq.endpoints:}", - topic = "${demo.rocketmq.topic:}", - consumerGroup = "${demo.rocketmq.consumer-group:}", - tag = "", // SQL92 模式下 tag 留空 - filterExpressionType = "sql92" -) -public class SQL92Consumer implements RocketMQListener { - @Override - public ConsumeResult consume(MessageView messageView) { - System.out.println("收到消息:" + messageView); - return ConsumeResult.SUCCESS; - } -} -``` - -## SQL92 表达式语法 - -SQL92 过滤支持以下语法: - -### 1. 比较操作符 -- `=` : 等于 -- `<>` : 不等于 -- `>` : 大于 -- `<` : 小于 -- `>=` : 大于等于 -- `<=` : 小于等于 -- `BETWEEN` : 在某个范围内 -- `IN` : 在集合中 -- `LIKE` : 模糊匹配 - -### 2. 逻辑操作符 -- `AND` : 与 -- `OR` : 或 -- `NOT` : 非 - -### 3. 数据类型 -- 数字:直接写,如 `123` -- 字符串:用单引号包裹,如 `'value'` -- 布尔值:`TRUE`, `FALSE` -- NULL:`IS NULL`, `IS NOT NULL` - -## 使用示例 - -### 示例 1:简单等值过滤 -```java -// 只消费 type='vip' 的消息 -@RocketMQMessageListener( - topic = "orderTopic", - consumerGroup = "vipConsumerGroup", - tag = "", - filterExpressionType = "sql92" -) -public class VIPConsumer implements RocketMQListener { - @Override - public ConsumeResult consume(MessageView messageView) { - return ConsumeResult.SUCCESS; - } -} -``` - -发送消息时添加属性: -```java -MessageBuilder.withPayload(order) - .setHeader("type", "vip") - .build(); -``` - -### 示例 2:数值范围过滤 -```java -// 消费金额在 100-1000 之间的订单 -@RocketMQMessageListener( - topic = "orderTopic", - consumerGroup = "mediumOrderConsumer", - tag = "", - filterExpressionType = "sql92" -) -public class MediumOrderConsumer implements RocketMQListener { - @Override - public ConsumeResult consume(MessageView messageView) { - return ConsumeResult.SUCCESS; - } -} -``` - -配置文件: -```properties -rocketmq.simple-consumer.filter-expression-type=sql92 -rocketmq.simple-consumer.tag=(amount >= 100 AND amount <= 1000) -``` - -### 示例 3:多条件组合 -```java -// 消费 VIP 用户且金额大于 500 的订单 -@RocketMQMessageListener( - topic = "orderTopic", - consumerGroup = "vipLargeOrderConsumer", - tag = "", - filterExpressionType = "sql92" -) -public class VipLargeOrderConsumer implements RocketMQListener { - @Override - public ConsumeResult consume(MessageView messageView) { - return ConsumeResult.SUCCESS; - } -} -``` - -配置文件: -```properties -rocketmq.simple-consumer.filter-expression-type=sql92 -rocketmq.simple-consumer.tag=(type = 'vip' AND amount > 500) -``` - -### 示例 4:IN 操作符 -```java -// 消费特定地区的订单 -@RocketMQMessageListener( - topic = "orderTopic", - consumerGroup = "regionConsumer", - tag = "", - filterExpressionType = "sql92" -) -public class RegionConsumer implements RocketMQListener { - @Override - public ConsumeResult consume(MessageView messageView) { - return ConsumeResult.SUCCESS; - } -} -``` - -配置文件: -```properties -rocketmq.simple-consumer.filter-expression-type=sql92 -rocketmq.simple-consumer.tag=(region IN ('Beijing', 'Shanghai', 'Guangzhou')) -``` - -### 示例 5:LIKE 模糊匹配 -```java -// 消费以 A 开头的产品类别 -@RocketMQMessageListener( - topic = "productTopic", - consumerGroup = "categoryAConsumer", - tag = "", - filterExpressionType = "sql92" -) -public class CategoryAConsumer implements RocketMQListener { - @Override - public ConsumeResult consume(MessageView messageView) { - return ConsumeResult.SUCCESS; - } -} -``` - -配置文件: -```properties -rocketmq.simple-consumer.filter-expression-type=sql92 -rocketmq.simple-consumer.tag=(category LIKE 'A%') -``` - -### 示例 6:ExtConsumerResetConfiguration 方式 -```java -// 使用 ExtConsumerResetConfiguration 注解配置 SQL92 过滤 -@ExtConsumerResetConfiguration( - topic = "${ext.rocketmq.topic:}", - consumerGroup = "${ext.rocketmq.consumer-group:}", - tag = "${ext.rocketmq.tag:}", - filterExpressionType = "${ext.rocketmq.filter-expression-type:sql92}" -) -public class ExtRocketMQTemplate extends RocketMQClientTemplate { -} -``` - -配置文件: -```properties -ext.rocketmq.topic=sql92Topic -ext.rocketmq.consumer-group=extSql92Group -ext.rocketmq.tag=(status = 'ACTIVE' AND priority > 5) -ext.rocketmq.filter-expression-type=sql92 -``` - -## 发送带属性的消息 - -```java -@SpringBootApplication -public class ProducerApplication implements CommandLineRunner { - - @Resource - private RocketMQClientTemplate rocketMQClientTemplate; - - @Value("${demo.rocketmq.topic}") - private String topic; - - @Override - public void run(String... args) { - // 发送带属性的消息 - Order order= new Order(); - order.setId(1L); - order.setAmount(600); - order.setType("vip"); - order.setRegion("Beijing"); - - Message message = MessageBuilder.withPayload(order) - .setHeader("type", "vip") - .setHeader("amount", 600) - .setHeader("region", "Beijing") - .setHeader("status", "ACTIVE") - .setHeader("priority", 8) - .build(); - - rocketMQClientTemplate.syncSendNormalMessage(topic, message); - } -} -``` - -## 注意事项 - -1. **属性名称限制**: - - 属性名称不能包含空格和特殊字符 - - 建议使用字母、数字和下划线 - -2. **性能考虑**: - - SQL92 过滤比 TAG 过滤更消耗资源 - - 简单的过滤场景优先使用 TAG 过滤 - -3. **表达式长度**: - - SQL92 表达式长度有限制,不宜过长 - -4. **数据类型**: - - 确保发送的消息属性类型与过滤表达式中的类型一致 - -5. **NULL 值处理**: - - 使用 `IS NULL` 或 `IS NOT NULL` 判断空值 - - 不要使用 `= NULL` - -## 完整示例 - -### 消费者 -```java -package org.apache.rocketmq.samples.springboot.consumer; - -import org.apache.rocketmq.client.annotation.RocketMQMessageListener; -import org.apache.rocketmq.client.api.consumer.ConsumeResult; -import org.apache.rocketmq.client.api.message.MessageView; -import org.apache.rocketmq.client.support.RocketMQListener; -import org.springframework.stereotype.Service; - -@Service -@RocketMQMessageListener( - endpoints = "${demo.sql92.rocketmq.endpoints:}", - topic = "${demo.sql92.rocketmq.topic:}", - consumerGroup = "${demo.sql92.rocketmq.consumer-group:}", - tag = "${demo.sql92.rocketmq.tag:}", - filterExpressionType = "${demo.sql92.rocketmq.filter-expression-type:sql92}" -) -public class SQL92FilterConsumer implements RocketMQListener { - - @Override - public ConsumeResult consume(MessageView messageView) { - System.out.printf("收到消息 - ID: %s, 属性:%s%n", - messageView.getMessageId(), - messageView.getProperties()); - - // 业务处理逻辑 - return ConsumeResult.SUCCESS; - } -} -``` - -### application.properties -```properties -# SQL92 过滤配置 -demo.sql92.rocketmq.endpoints=localhost:8081 -demo.sql92.rocketmq.topic=orderTopic -demo.sql92.rocketmq.consumer-group=sql92FilterGroup -demo.sql92.rocketmq.tag=(type = 'vip' AND amount > 500) -demo.sql92.rocketmq.filter-expression-type=sql92 -``` - -### 生产者 -```java -package org.apache.rocketmq.samples.springboot.producer; - -import org.apache.rocketmq.client.core.RocketMQClientTemplate; -import org.springframework.boot.CommandLineRunner; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.MessageBuilder; - -import javax.annotation.Resource; - -@SpringBootApplication -public class SQL92ProducerApplication implements CommandLineRunner { - - @Resource - private RocketMQClientTemplate rocketMQClientTemplate; - - public static void main(String[] args) { - SpringApplication.run(SQL92ProducerApplication.class, args); - } - - @Override - public void run(String... args) { - sendVipOrder(); - sendNormalOrder(); - } - - private void sendVipOrder() { - Message message = MessageBuilder.withPayload("VIP Order") - .setHeader("type", "vip") - .setHeader("amount", 600) - .setHeader("region", "Beijing") - .build(); - - rocketMQClientTemplate.syncSendNormalMessage("orderTopic", message); - System.out.println("VIP 订单已发送"); - } - - private void sendNormalOrder() { - Message message = MessageBuilder.withPayload("Normal Order") - .setHeader("type", "normal") - .setHeader("amount", 200) - .setHeader("region", "Shanghai") - .build(); - - rocketMQClientTemplate.syncSendNormalMessage("orderTopic", message); - System.out.println("普通订单已发送"); - } -} -``` - -## 总结 - -RocketMQ V5 客户端完全支持 SQL92 消息过滤,通过设置 `filterExpressionType=sql92` 并在 `tag` 参数中编写 SQL92 表达式即可实现复杂的消息过滤逻辑。这为消息订阅提供了更大的灵活性。 diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/SQL92FilterConsumer.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/SQL92FilterConsumer.java index 32ffb07d..4dceb658 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/SQL92FilterConsumer.java +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/SQL92FilterConsumer.java @@ -25,13 +25,13 @@ import org.springframework.stereotype.Service; /** - * SQL92 消息过滤消费者示例 + * SQL92 Message Filter Consumer Example * - * 该消费者使用 SQL92 表达式过滤消息,只消费满足特定条件的消息: - * - type = 'vip':VIP 类型的消息 - * - amount > 500:金额大于 500 + * This consumer uses SQL92 expression to filter messages, consuming only messages that meet specific conditions: + * - type = 'vip': VIP type messages + * - amount > 500: Amount greater than 500 * - * 配置文件 application.properties 中需要设置: + * Configuration properties that need to be set in application.properties: * demo.sql92.rocketmq.endpoints=localhost:8081 * demo.sql92.rocketmq.topic=orderTopic * demo.sql92.rocketmq.consumer-group=sql92VipConsumerGroup @@ -52,16 +52,16 @@ public class SQL92FilterConsumer implements RocketMQListener { @Override public ConsumeResult consume(MessageView messageView) { - log.info("收到 SQL92 过滤消息 - ID: {}, Topic: {}, Tag: {}", + log.info("Received SQL92 filtered message - ID: {}, Topic: {}, Tag: {}", messageView.getMessageId(), messageView.getTopic(), messageView.getTag().orElse("")); - // 打印消息属性 - log.info("消息属性:{}", messageView.getProperties()); + // Print message properties + log.info("Message properties: {}", messageView.getProperties()); - // 这里可以添加业务逻辑处理 - // 例如:解析消息内容,处理订单等 + // Business logic can be added here + // For example: parse message content, process orders, etc. return ConsumeResult.SUCCESS; } diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/producer/SQL92ProducerApplication.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/producer/SQL92ProducerApplication.java index e11070cd..40692cd0 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/producer/SQL92ProducerApplication.java +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/producer/SQL92ProducerApplication.java @@ -29,11 +29,11 @@ import javax.annotation.Resource; /** - * SQL92 消息生产者示例 + * SQL92 Message Producer Example * - * 演示如何发送带有属性的消息,以便消费者可以使用 SQL92 进行过滤 + * Demonstrates how to send messages with attributes so consumers can use SQL92 for filtering * - * 配置文件 application.properties 中需要设置: + * Configuration properties that need to be set in application.properties: * rocketmq.producer.endpoints=localhost:8081 * rocketmq.producer.topic=orderTopic * demo.rocketmq.order-topic=orderTopic @@ -52,24 +52,24 @@ public static void main(String[] args) { @Override public void run(String... args) throws ClientException { - log.info("开始发送 SQL92 测试消息..."); + log.info("Starting to send SQL92 test messages..."); - // 发送 VIP 大额订单(会被 SQL92 过滤器匹配) + // Send VIP high-value orders (will be matched by SQL92 filter) sendVipOrder(1L, 600.0, "Beijing"); sendVipOrder(2L, 800.0, "Shanghai"); - // 发送普通订单(不会被 SQL92 过滤器匹配) + // Send normal orders (will not be matched by SQL92 filter) sendNormalOrder(3L, 200.0, "Guangzhou"); sendNormalOrder(4L, 300.0, "Shenzhen"); - // 发送 VIP 小额订单(不会被 SQL92 过滤器匹配) + // Send VIP low-value orders (will not be matched by SQL92 filter) sendVipOrder(5L, 100.0, "Hangzhou"); - log.info("所有消息已发送完成"); + log.info("All messages have been sent"); } /** - * 发送 VIP 订单 + * Send VIP order */ private void sendVipOrder(Long orderId, Double amount, String region) throws ClientException { Order order = new Order(); @@ -85,11 +85,11 @@ private void sendVipOrder(Long orderId, Double amount, String region) throws Cli .build(); rocketMQClientTemplate.syncSendNormalMessage("orderTopic", message); - log.info("VIP 订单已发送 - ID: {}, 金额:{}, 地区:{}", orderId, amount, region); + log.info("VIP order sent - ID: {}, Amount: {}, Region: {}", orderId, amount, region); } /** - * 发送普通订单 + * Send normal order */ private void sendNormalOrder(Long orderId, Double amount, String region) throws ClientException { Order order = new Order(); @@ -105,11 +105,11 @@ private void sendNormalOrder(Long orderId, Double amount, String region) throws .build(); rocketMQClientTemplate.syncSendNormalMessage("orderTopic", message); - log.info("普通订单已发送 - ID: {}, 金额:{}, 地区:{}", orderId, amount, region); + log.info("Normal order sent - ID: {}, Amount: {}, Region: {}", orderId, amount, region); } /** - * 订单实体类 + * Order entity class */ public static class Order { private Long id; From 7181eb2897b444f75f9c06633fcbd63a03a86102 Mon Sep 17 00:00:00 2001 From: zhaohaihzb Date: Thu, 12 Mar 2026 14:36:41 +0800 Subject: [PATCH 4/5] delete md --- .../rocketmq/client/autoconfigure/RocketMQProperties.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java index 8f3d4941..38df9324 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java @@ -187,7 +187,7 @@ public static class SimpleConsumer { /** * Tag of consumer. */ - private String tag; + private String tag = "*"; /** * Topic name of consumer. From 3c6486d05991086ae73b0cd409ced7c4f69ac0ea Mon Sep 17 00:00:00 2001 From: zhaohaihzb Date: Thu, 12 Mar 2026 15:10:50 +0800 Subject: [PATCH 5/5] delete md --- .../README-CN.md | 188 ------------------ 1 file changed, 188 deletions(-) diff --git a/rocketmq-v5-client-spring-boot-samples/README-CN.md b/rocketmq-v5-client-spring-boot-samples/README-CN.md index 8dc1895f..59698fcf 100644 --- a/rocketmq-v5-client-spring-boot-samples/README-CN.md +++ b/rocketmq-v5-client-spring-boot-samples/README-CN.md @@ -567,194 +567,6 @@ rocketmq.simple-consumer.filter-expression-type=tag -# SQL92 消息过滤 - - - -## 概述 - -RocketMQ V5 客户端支持 SQL92 消息过滤功能,允许消费者根据消息属性进行复杂的条件过滤。 - -### 支持的过滤类型 - -1. **TAG 过滤**(默认):基于 Tag 的简单过滤 -2. **SQL92 过滤**:基于消息属性的复杂条件过滤 - - - -## 配置说明 - -### application.properties 配置 - -```properties -# 基本配置 -rocketmq.simple-consumer.endpoints=localhost:8081 -rocketmq.simple-consumer.consumer-group=sql92Group -rocketmq.simple-consumer.topic=sql92Topic - -# TAG 过滤(默认) -rocketmq.simple-consumer.tag=* -rocketmq.simple-consumer.filter-expression-type=tag - -# SQL92 过滤 -# rocketmq.simple-consumer.tag=(type = 'vip' AND amount > 500) -# rocketmq.simple-consumer.filter-expression-type=sql92 -``` - - - -## SQL92 表达式语法 - -### 比较操作符 - -- `=` : 等于 -- `<>` : 不等于 -- `>` : 大于 -- `<` : 小于 -- `>=` : 大于等于 -- `<=` : 小于等于 -- `BETWEEN` : 在某个范围内 -- `IN` : 在集合中 -- `LIKE` : 模糊匹配 - -### 逻辑操作符 - -- `AND` : 与 -- `OR` : 或 -- `NOT` : 非 - - - -## 使用示例 - -### 示例 1:简单等值过滤 - -只消费 type='vip' 的消息: - -```properties -rocketmq.simple-consumer.filter-expression-type=sql92 -rocketmq.simple-consumer.tag=(type = 'vip') -``` - -发送消息时添加属性: - -```java -Message message = MessageBuilder.withPayload(order) - .setHeader("type", "vip") - .build(); -``` - -### 示例 2:数值范围过滤 - -消费金额在 100-1000 之间的订单: - -```properties -rocketmq.simple-consumer.filter-expression-type=sql92 -rocketmq.simple-consumer.tag=(amount >= 100 AND amount <= 1000) -``` - -### 示例 3:多条件组合 - -消费 VIP 用户且金额大于 500 的订单: - -```properties -rocketmq.simple-consumer.filter-expression-type=sql92 -rocketmq.simple-consumer.tag=(type = 'vip' AND amount > 500) -``` - -### 示例 4:IN 操作符 - -消费特定地区的订单: - -```properties -rocketmq.simple-consumer.filter-expression-type=sql92 -rocketmq.simple-consumer.tag=(region IN ('Beijing', 'Shanghai', 'Guangzhou')) -``` - -### 示例 5:LIKE 模糊匹配 - -消费以 A 开头的产品类别: - -```properties -rocketmq.simple-consumer.filter-expression-type=sql92 -rocketmq.simple-consumer.tag=(category LIKE 'A%') -``` - - - -## 消费者代码示例 - -```java -@Service -@RocketMQMessageListener( - endpoints = "${demo.sql92.rocketmq.endpoints:}", - topic = "${demo.sql92.rocketmq.topic:}", - consumerGroup = "${demo.sql92.rocketmq.consumer-group:}", - tag = "${demo.sql92.rocketmq.tag:}", - filterExpressionType = "${demo.sql92.rocketmq.filter-expression-type:sql92}" -) -public class SQL92FilterConsumer implements RocketMQListener { - - @Override - public ConsumeResult consume(MessageView messageView) { - log.info("收到 SQL92 过滤消息 - ID: {}, 属性:{}", - messageView.getMessageId(), - messageView.getProperties()); - return ConsumeResult.SUCCESS; - } -} -``` - - - -## 生产者代码示例 - -```java -@SpringBootApplication -public class SQL92ProducerApplication implements CommandLineRunner { - - @Resource - private RocketMQClientTemplate rocketMQClientTemplate; - - @Override - public void run(String... args) throws ClientException { - // 发送带属性的消息 - Message message = MessageBuilder.withPayload("VIP Order") - .setHeader("type", "vip") - .setHeader("amount", 600) - .setHeader("region", "Beijing") - .build(); - - rocketMQClientTemplate.syncSendNormalMessage("orderTopic", message); - } -} -``` - - - -## 注意事项 - -1. **属性名称限制**: - - 属性名称不能包含空格和特殊字符 - - 建议使用字母、数字和下划线 - -2. **性能考虑**: - - SQL92 过滤比 TAG 过滤更消耗资源 - - 简单的过滤场景优先使用 TAG 过滤 - -3. **表达式长度**: - - SQL92 表达式长度有限制,不宜过长 - -4. **数据类型**: - - 确保发送的消息属性类型与过滤表达式中的类型一致 - - 字符串值需要用单引号包裹 - -5. **NULL 值处理**: - - 使用 `IS NULL` 或 `IS NOT NULL` 判断空值 - - 不要使用 `= NULL` - - - # ACL功能