From 756df0a7cc2d6f3ab4e1eee072438c1b1ae98fe8 Mon Sep 17 00:00:00 2001 From: zhaohaihzb Date: Thu, 5 Mar 2026 21:33:46 +0800 Subject: [PATCH 01/11] update tag --- .../consumer/Sql92FilterConsumer.java | 49 +++++++++++++++++++ .../consumer/TagFilterConsumer.java | 48 ++++++++++++++++++ .../ExtConsumerResetConfiguration.java | 15 +++++- .../annotation/RocketMQMessageListener.java | 15 +++++- .../client/annotation/SelectorType.java | 33 +++++++++++++ .../ExtConsumerResetConfiguration.java | 3 +- .../ListenerContainerConfiguration.java | 4 +- .../RocketMQAutoConfiguration.java | 5 +- .../autoconfigure/RocketMQProperties.java | 20 +++++--- 9 files changed, 177 insertions(+), 15 deletions(-) create mode 100644 rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/Sql92FilterConsumer.java create mode 100644 rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/TagFilterConsumer.java create mode 100644 rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/SelectorType.java diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/Sql92FilterConsumer.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/Sql92FilterConsumer.java new file mode 100644 index 00000000..a9b82de0 --- /dev/null +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/Sql92FilterConsumer.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.samples.springboot.consumer; + +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.message.MessageView; +import org.apache.rocketmq.client.annotation.RocketMQMessageListener; +import org.apache.rocketmq.client.annotation.SelectorType; +import org.apache.rocketmq.client.core.RocketMQListener; +import org.springframework.stereotype.Service; + +/** + * Example consumer using SQL92-based message filtering with selectorType. + * This demonstrates SQL92 expression filtering capability. + */ +@Service +@RocketMQMessageListener( + endpoints = "${demo.sql92-filter.rocketmq.endpoints:}", + topic = "${demo.sql92-filter.rocketmq.topic:}", + consumerGroup = "${demo.sql92-filter.rocketmq.consumer-group:}", + selectorType = SelectorType.SQL92, + tag = "a > 5 AND b < 10" +) +public class Sql92FilterConsumer implements RocketMQListener { + + @Override + public ConsumeResult consume(MessageView messageView) { + System.out.println("Received message with SQL92 filter: " + messageView); + // Get message properties + messageView.getProperties().forEach((key, value) -> + System.out.println("Property: " + key + " = " + value) + ); + return ConsumeResult.SUCCESS; + } +} diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/TagFilterConsumer.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/TagFilterConsumer.java new file mode 100644 index 00000000..e9c6e6d9 --- /dev/null +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/TagFilterConsumer.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.samples.springboot.consumer; + +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.message.MessageView; +import org.apache.rocketmq.client.annotation.RocketMQMessageListener; +import org.apache.rocketmq.client.annotation.SelectorType; +import org.apache.rocketmq.client.core.RocketMQListener; +import org.springframework.stereotype.Service; + +/** + * Example consumer using tag-based message filtering with selectorType. + * This demonstrates the new way to configure message filtering. + */ +@Service +@RocketMQMessageListener( + endpoints = "${demo.tag-filter.rocketmq.endpoints:}", + topic = "${demo.tag-filter.rocketmq.topic:}", + consumerGroup = "${demo.tag-filter.rocketmq.consumer-group:}", + selectorType = SelectorType.TAG, + tag = "tagA || tagB" +) +public class TagFilterConsumer implements RocketMQListener { + + @Override + public ConsumeResult consume(MessageView messageView) { + System.out.println("Received message with tag filter: " + messageView); + // Get message tag from properties + String tag = messageView.getTag().orElse(null); + System.out.println("Message tag: " + tag); + return ConsumeResult.SUCCESS; + } +} diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java index f0b942f5..ea340d16 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java @@ -53,7 +53,9 @@ String secretKey() default SECRET_KEY_PLACEHOLDER; /** - * Tag of consumer. + * Tag of consumer. Used for message filtering. + * For TAG selectorType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. + * For SQL92 selectorType, use SQL92 expression like "a > 5 AND b < 10". */ String tag() default TAG_PLACEHOLDER; @@ -73,8 +75,17 @@ String consumerGroup() default CONSUMER_GROUP_PLACEHOLDER; /** - * The type of filter expression + * Control how to selector message. + * + * @see SelectorType */ + SelectorType selectorType() default SelectorType.TAG; + + /** + * @deprecated Use {@link #tag()} and {@link #selectorType()} instead. + * This field will be removed in a future version. + */ + @Deprecated String filterExpressionType() default FILTER_EXPRESSION_TYPE_PLACEHOLDER; /** diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java index 6f0f836d..a62a3b81 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java @@ -54,7 +54,16 @@ String topic() default TOPIC_PLACEHOLDER; /** - * Tag of consumer. + * Control how to selector message. + * + * @see SelectorType + */ + SelectorType selectorType() default SelectorType.TAG; + + /** + * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92} + * For TAG type, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. + * For SQL92 type, use SQL92 expression like "a > 5 AND b < 10". */ String tag() default TAG_PLACEHOLDER; @@ -64,8 +73,10 @@ boolean sslEnabled() default true; /** - * The type of filter expression + * @deprecated Use {@link #tag()} and {@link #selectorType()} instead. + * This field will be removed in a future version. */ + @Deprecated String filterExpressionType() default "tag"; /** diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/SelectorType.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/SelectorType.java new file mode 100644 index 00000000..beb4c068 --- /dev/null +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/SelectorType.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.annotation; + +/** + * Message selector type. + */ +public enum SelectorType { + + /** + * Tag expression. + */ + TAG, + + /** + * SQL92 expression. + */ + SQL92 +} \ No newline at end of file diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java index a501e64e..e9e78358 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java @@ -116,7 +116,8 @@ private SimpleConsumerInfo createConsumer( String endPoints = resolvePlaceholders(annotation.endpoints(), simpleConsumer.getEndpoints()); String namespace = resolvePlaceholders(annotation.namespace(), simpleConsumer.getNamespace()); String tag = resolvePlaceholders(annotation.tag(), simpleConsumer.getTag()); - String filterExpressionType = resolvePlaceholders(annotation.filterExpressionType(), simpleConsumer.getFilterExpressionType()); + // Use selectorType to determine the filter expression type, similar to rocketmq-spring-boot + String filterExpressionType = annotation.selectorType() == org.apache.rocketmq.client.annotation.SelectorType.TAG ? "tag" : "sql92"; Duration requestTimeout = Duration.ofSeconds(annotation.requestTimeout()); int awaitDuration = annotation.awaitDuration(); Boolean sslEnabled = simpleConsumer.isSslEnabled(); diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java index 1975dccf..e8b040d2 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.client.autoconfigure; import org.apache.rocketmq.client.annotation.RocketMQMessageListener; +import org.apache.rocketmq.client.annotation.SelectorType; import org.apache.rocketmq.client.core.RocketMQListener; import org.apache.rocketmq.client.support.DefaultListenerContainer; import org.apache.rocketmq.client.support.RocketMQMessageConverter; @@ -107,7 +108,8 @@ private DefaultListenerContainer createRocketMQListenerContainer(String name, Ob container.setMaxCachedMessageCount(annotation.maxCachedMessageCount()); container.setConsumptionThreadCount(annotation.consumptionThreadCount()); container.setMaxCacheMessageSizeInBytes(annotation.maxCacheMessageSizeInBytes()); - container.setType(annotation.filterExpressionType()); + // Use selectorType to determine the filter expression type, similar to rocketmq-spring-boot + container.setType(annotation.selectorType() == SelectorType.TAG ? "tag" : "sql92"); container.setSslEnabled(annotation.sslEnabled()); return container; } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java index 346311f2..871df933 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java @@ -105,7 +105,10 @@ public SimpleConsumerBuilder simpleConsumerBuilder(RocketMQProperties rocketMQPr RocketMQProperties.SimpleConsumer simpleConsumer = rocketMQProperties.getSimpleConsumer(); final ClientServiceProvider provider = ClientServiceProvider.loadService(); String consumerGroup = simpleConsumer.getConsumerGroup(); - FilterExpression filterExpression = RocketMQUtil.createFilterExpression(simpleConsumer.getTag(), simpleConsumer.getFilterExpressionType()); + // Use selectorType to determine the filter expression type, similar to rocketmq-spring-boot + String filterExpressionTypeValue = simpleConsumer.getSelectorType() != null ? simpleConsumer.getSelectorType() : "TAG"; + String tagExpression = simpleConsumer.getTag() != null ? simpleConsumer.getTag() : "*"; + FilterExpression filterExpression = RocketMQUtil.createFilterExpression(tagExpression, filterExpressionTypeValue.toLowerCase()); ClientConfiguration clientConfiguration = RocketMQUtil.createConsumerClientConfiguration(simpleConsumer); SimpleConsumerBuilder simpleConsumerBuilder = provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration); diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java index 8f3d4941..0003c196 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java @@ -185,7 +185,9 @@ public static class SimpleConsumer { private int awaitDuration = 5; /** - * Tag of consumer. + * Tag of consumer. Used for message filtering. + * For TAG selectorType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. + * For SQL92 selectorType, use SQL92 expression like "a > 5 AND b < 10". */ private String tag; @@ -200,9 +202,11 @@ public static class SimpleConsumer { private int requestTimeout = 3; /** - * The type of filter expression + * Control how to selector message. + * + * @see org.apache.rocketmq.client.annotation.SelectorType */ - private String filterExpressionType = "tag"; + private String selectorType = "TAG"; /** * Enable or disable the use of Secure Sockets Layer (SSL) for network transport. @@ -283,12 +287,12 @@ public void setSslEnabled(boolean sslEnabled) { this.sslEnabled = sslEnabled; } - public String getFilterExpressionType() { - return filterExpressionType; + public String getSelectorType() { + return selectorType; } - public void setFilterExpressionType(String filterExpressionType) { - this.filterExpressionType = filterExpressionType; + public void setSelectorType(String selectorType) { + this.selectorType = selectorType; } public String getNamespace() { @@ -308,7 +312,7 @@ public String toString() { ", tag='" + tag + '\'' + ", topic='" + topic + '\'' + ", requestTimeout=" + requestTimeout + - ", filterExpressionType='" + filterExpressionType + '\'' + + ", selectorType='" + selectorType + '\'' + ", sslEnabled=" + sslEnabled + ", namespace='" + namespace + '\'' + '}'; From 436321d2c4bb963f233a51a604a0070f0ea8dac1 Mon Sep 17 00:00:00 2001 From: zhaohaihzb Date: Sun, 8 Mar 2026 15:40:25 +0800 Subject: [PATCH 02/11] update tag default --- .../rocketmq/client/annotation/RocketMQMessageListener.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java index a62a3b81..0b5e5b2b 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java @@ -31,7 +31,6 @@ String SECRET_KEY_PLACEHOLDER = "${rocketmq.push-consumer.secret-key:}"; String ENDPOINTS_PLACEHOLDER = "${rocketmq.push-consumer.endpoints:}"; String TOPIC_PLACEHOLDER = "${rocketmq.push-consumer.topic:}"; - String TAG_PLACEHOLDER = "${rocketmq.push-consumer.tag:}"; /** * The property of "access-key". @@ -65,7 +64,7 @@ * For TAG type, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. * For SQL92 type, use SQL92 expression like "a > 5 AND b < 10". */ - String tag() default TAG_PLACEHOLDER; + String tag() default "*"; /** * Enable or disable the use of Secure Sockets Layer (SSL) for network transport. From 80546697f28e37e8c8ff04d8643b895096afd450 Mon Sep 17 00:00:00 2001 From: zhaohaihzb Date: Sun, 8 Mar 2026 15:43:41 +0800 Subject: [PATCH 03/11] update tag default --- .../client/annotation/ExtConsumerResetConfiguration.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java index ea340d16..8102654d 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java @@ -31,11 +31,9 @@ String ACCESS_KEY_PLACEHOLDER = "${rocketmq.simple-consumer.accessKey:}"; String SECRET_KEY_PLACEHOLDER = "${rocketmq.simple-consumer.secretKey:}"; - String TAG_PLACEHOLDER = "${rocketmq.simple-consumer.tag:}"; String TOPIC_PLACEHOLDER = "${rocketmq.simple-consumer.topic:}"; String ENDPOINTS_PLACEHOLDER = "${rocketmq.simple-consumer.endpoints:}"; String CONSUMER_GROUP_PLACEHOLDER = "${rocketmq.simple-consumer.consumerGroup:}"; - String FILTER_EXPRESSION_TYPE_PLACEHOLDER = "${rocketmq.simple-consumer.filterExpressionType:}"; /** * The component name of the Consumer configuration. @@ -57,7 +55,7 @@ * For TAG selectorType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. * For SQL92 selectorType, use SQL92 expression like "a > 5 AND b < 10". */ - String tag() default TAG_PLACEHOLDER; + String tag() default "*"; /** * Topic name of consumer. @@ -86,7 +84,7 @@ * This field will be removed in a future version. */ @Deprecated - String filterExpressionType() default FILTER_EXPRESSION_TYPE_PLACEHOLDER; + String filterExpressionType() default "tag"; /** * The requestTimeout of client,it is 3s by default. From 31a3fce1e85a970d8a339d038ac208840336bb29 Mon Sep 17 00:00:00 2001 From: zhaohaihzb Date: Sun, 8 Mar 2026 15:55:12 +0800 Subject: [PATCH 04/11] v5 add sql92 --- .../support/DefaultListenerContainer.java | 86 ++++++++++--------- 1 file changed, 47 insertions(+), 39 deletions(-) diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java index 4d0925d0..1539e3fe 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java @@ -17,9 +17,9 @@ package org.apache.rocketmq.client.support; import org.apache.rocketmq.client.annotation.RocketMQMessageListener; +import org.apache.rocketmq.client.annotation.SelectorType; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientServiceProvider; -import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder; import org.apache.rocketmq.client.apis.consumer.FilterExpression; @@ -74,9 +74,7 @@ public class DefaultListenerContainer implements InitializingBean, String topic; - String type; - - FilterExpressionType filterExpressionType; + SelectorType selectorType; Duration requestTimeout; @@ -179,36 +177,28 @@ public void setRequestTimeout(Duration requestTimeout) { this.requestTimeout = requestTimeout; } - public FilterExpressionType getFilterExpressionType() { - return filterExpressionType; - } - - public void setFilterExpressionType(FilterExpressionType filterExpressionType) { - this.filterExpressionType = filterExpressionType; - } - - public int getMaxCachedMessageCount() { - return maxCachedMessageCount; + public SelectorType getSelectorType() { + return selectorType; } - public void setMaxCachedMessageCount(int maxCachedMessageCount) { - this.maxCachedMessageCount = maxCachedMessageCount; + public void setSelectorType(SelectorType selectorType) { + this.selectorType = selectorType; } - public int getMaxCacheMessageSizeInBytes() { - return maxCacheMessageSizeInBytes; + public Boolean getSslEnabled() { + return sslEnabled; } - public void setMaxCacheMessageSizeInBytes(int maxCacheMessageSizeInBytes) { - this.maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes; + public void setSslEnabled(Boolean sslEnabled) { + this.sslEnabled = sslEnabled; } - public int getConsumptionThreadCount() { - return consumptionThreadCount; + public String getNamespace() { + return namespace; } - public void setConsumptionThreadCount(int consumptionThreadCount) { - this.consumptionThreadCount = consumptionThreadCount; + public void setNamespace(String namespace) { + this.namespace = namespace; } public RocketMQListener getMessageListener() { @@ -225,30 +215,44 @@ public RocketMQMessageListener getRocketMQMessageListener() { public void setRocketMQMessageListener(RocketMQMessageListener rocketMQMessageListener) { this.rocketMQMessageListener = rocketMQMessageListener; + + this.accessKey = rocketMQMessageListener.accessKey(); + this.secretKey = rocketMQMessageListener.secretKey(); + this.endpoints = rocketMQMessageListener.endpoints(); + this.topic = rocketMQMessageListener.topic(); + this.tag = rocketMQMessageListener.tag(); + this.selectorType = rocketMQMessageListener.selectorType(); + this.sslEnabled = rocketMQMessageListener.sslEnabled(); + this.consumerGroup = rocketMQMessageListener.consumerGroup(); + this.requestTimeout = Duration.ofSeconds(rocketMQMessageListener.requestTimeout()); + this.maxCachedMessageCount = rocketMQMessageListener.maxCachedMessageCount(); + this.maxCacheMessageSizeInBytes = rocketMQMessageListener.maxCacheMessageSizeInBytes(); + this.consumptionThreadCount = rocketMQMessageListener.consumptionThreadCount(); + this.namespace = rocketMQMessageListener.namespace(); } - public String getType() { - return type; + public int getMaxCachedMessageCount() { + return maxCachedMessageCount; } - public void setType(String type) { - this.type = type; + public void setMaxCachedMessageCount(int maxCachedMessageCount) { + this.maxCachedMessageCount = maxCachedMessageCount; } - public Boolean getSslEnabled() { - return sslEnabled; + public int getMaxCacheMessageSizeInBytes() { + return maxCacheMessageSizeInBytes; } - public void setSslEnabled(Boolean sslEnabled) { - this.sslEnabled = sslEnabled; + public void setMaxCacheMessageSizeInBytes(int maxCacheMessageSizeInBytes) { + this.maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes; } - public String getNamespace() { - return namespace; + public int getConsumptionThreadCount() { + return consumptionThreadCount; } - public void setNamespace(String namespace) { - this.namespace = namespace; + public void setConsumptionThreadCount(int consumptionThreadCount) { + this.consumptionThreadCount = consumptionThreadCount; } private void initRocketMQPushConsumer() { @@ -258,10 +262,15 @@ private void initRocketMQPushConsumer() { Assert.hasText(consumerGroup, "Property 'consumerGroup' is required"); Assert.hasText(topic, "Property 'topic' is required"); Assert.hasText(tag, "Property 'tag' is required"); + + // Convert SelectorType to FilterExpressionType string for createFilterExpression + String filterExpressionTypeStr = (selectorType != null && selectorType == SelectorType.SQL92) + ? "sql92" : "tag"; + FilterExpression filterExpression = null; final ClientServiceProvider provider = ClientServiceProvider.loadService(); if (StringUtils.hasLength(this.getTag())) { - filterExpression = RocketMQUtil.createFilterExpression(this.getTag(),this.getType()); + filterExpression = RocketMQUtil.createFilterExpression(this.getTag(), filterExpressionTypeStr); } ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(this.getAccessKey(), this.getSecretKey(), this.getEndpoints(), this.getRequestTimeout(), this.sslEnabled, this.namespace); @@ -360,8 +369,7 @@ public String toString() { ", consumerGroup='" + consumerGroup + '\'' + ", tag='" + tag + '\'' + ", topic='" + topic + '\'' + - ", type='" + type + '\'' + - ", filterExpressionType=" + filterExpressionType + + ", selectorType=" + selectorType + ", requestTimeout=" + requestTimeout + ", maxCachedMessageCount=" + maxCachedMessageCount + ", maxCacheMessageSizeInBytes=" + maxCacheMessageSizeInBytes + From 700b3aabf1e3c89e9c293abcbb9e81ffdcd18acc Mon Sep 17 00:00:00 2001 From: zhaohaihzb Date: Sun, 8 Mar 2026 16:04:12 +0800 Subject: [PATCH 05/11] v5 add sql92 --- .../client/autoconfigure/ExtConsumerResetConfiguration.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java index e9e78358..547d3b22 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.client.autoconfigure; +import org.apache.rocketmq.client.annotation.SelectorType; import org.apache.rocketmq.client.support.RocketMQMessageConverter; import org.apache.rocketmq.client.support.RocketMQUtil; import org.apache.rocketmq.client.apis.ClientConfiguration; @@ -117,7 +118,7 @@ private SimpleConsumerInfo createConsumer( String namespace = resolvePlaceholders(annotation.namespace(), simpleConsumer.getNamespace()); String tag = resolvePlaceholders(annotation.tag(), simpleConsumer.getTag()); // Use selectorType to determine the filter expression type, similar to rocketmq-spring-boot - String filterExpressionType = annotation.selectorType() == org.apache.rocketmq.client.annotation.SelectorType.TAG ? "tag" : "sql92"; + String filterExpressionType = annotation.selectorType() == SelectorType.TAG ? "tag" : "sql92"; Duration requestTimeout = Duration.ofSeconds(annotation.requestTimeout()); int awaitDuration = annotation.awaitDuration(); Boolean sslEnabled = simpleConsumer.isSslEnabled(); From 69231819a789471f44939c225384befade2a7445 Mon Sep 17 00:00:00 2001 From: zhaohaihzb Date: Sun, 8 Mar 2026 16:10:55 +0800 Subject: [PATCH 06/11] v5 add sql92 --- .../autoconfigure/ListenerContainerConfiguration.java | 1 + .../client/support/DefaultListenerContainer.java | 10 ++++++++++ 2 files changed, 11 insertions(+) diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java index e8b040d2..f17f45f1 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java @@ -110,6 +110,7 @@ private DefaultListenerContainer createRocketMQListenerContainer(String name, Ob container.setMaxCacheMessageSizeInBytes(annotation.maxCacheMessageSizeInBytes()); // Use selectorType to determine the filter expression type, similar to rocketmq-spring-boot container.setType(annotation.selectorType() == SelectorType.TAG ? "tag" : "sql92"); + container.setSelectorType(annotation.selectorType()); container.setSslEnabled(annotation.sslEnabled()); return container; } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java index 1539e3fe..459ece56 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java @@ -88,6 +88,8 @@ public class DefaultListenerContainer implements InitializingBean, String namespace; + String type; + public String getName() { return name; } @@ -205,6 +207,14 @@ public RocketMQListener getMessageListener() { return rocketMQListener; } + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + public void setMessageListener(RocketMQListener rocketMQListener) { this.rocketMQListener = rocketMQListener; } From 7fc0833c12c3c2a58c8d9d1f2575c4d7a135691f Mon Sep 17 00:00:00 2001 From: zhaohaihzb Date: Tue, 10 Mar 2026 14:19:39 +0800 Subject: [PATCH 07/11] update tag --- .../client/annotation/ExtConsumerResetConfiguration.java | 6 ++++-- .../rocketmq/client/annotation/RocketMQMessageListener.java | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java index 8102654d..7e125368 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java @@ -31,9 +31,11 @@ String ACCESS_KEY_PLACEHOLDER = "${rocketmq.simple-consumer.accessKey:}"; String SECRET_KEY_PLACEHOLDER = "${rocketmq.simple-consumer.secretKey:}"; + String TAG_PLACEHOLDER = "${rocketmq.simple-consumer.tag:*}"; String TOPIC_PLACEHOLDER = "${rocketmq.simple-consumer.topic:}"; String ENDPOINTS_PLACEHOLDER = "${rocketmq.simple-consumer.endpoints:}"; String CONSUMER_GROUP_PLACEHOLDER = "${rocketmq.simple-consumer.consumerGroup:}"; + String FILTER_EXPRESSION_TYPE_PLACEHOLDER = "${rocketmq.simple-consumer.filterExpressionType:tag}"; /** * The component name of the Consumer configuration. @@ -55,7 +57,7 @@ * For TAG selectorType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. * For SQL92 selectorType, use SQL92 expression like "a > 5 AND b < 10". */ - String tag() default "*"; + String tag() default TAG_PLACEHOLDER; /** * Topic name of consumer. @@ -84,7 +86,7 @@ * This field will be removed in a future version. */ @Deprecated - String filterExpressionType() default "tag"; + String filterExpressionType() default FILTER_EXPRESSION_TYPE_PLACEHOLDER; /** * The requestTimeout of client,it is 3s by default. diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java index 0b5e5b2b..c91060bb 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java @@ -31,6 +31,7 @@ String SECRET_KEY_PLACEHOLDER = "${rocketmq.push-consumer.secret-key:}"; String ENDPOINTS_PLACEHOLDER = "${rocketmq.push-consumer.endpoints:}"; String TOPIC_PLACEHOLDER = "${rocketmq.push-consumer.topic:}"; + String TAG_PLACEHOLDER = "${rocketmq.push-consumer.tag:*}"; /** * The property of "access-key". @@ -64,7 +65,7 @@ * For TAG type, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. * For SQL92 type, use SQL92 expression like "a > 5 AND b < 10". */ - String tag() default "*"; + String tag() default TAG_PLACEHOLDER; /** * Enable or disable the use of Secure Sockets Layer (SSL) for network transport. From 7660171159ffe2dc93a62ef57aeeebcfc65da335 Mon Sep 17 00:00:00 2001 From: zhaohaihzb Date: Tue, 10 Mar 2026 14:44:42 +0800 Subject: [PATCH 08/11] update tag --- .../ExtConsumerResetConfiguration.java | 17 +++------- .../annotation/RocketMQMessageListener.java | 17 +++------- .../client/annotation/SelectorType.java | 33 ------------------- .../ExtConsumerResetConfiguration.java | 5 ++- .../ListenerContainerConfiguration.java | 4 +-- .../RocketMQAutoConfiguration.java | 4 +-- .../autoconfigure/RocketMQProperties.java | 17 +++++----- .../support/DefaultListenerContainer.java | 20 +++++------ 8 files changed, 32 insertions(+), 85 deletions(-) delete mode 100644 rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/SelectorType.java diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java index 7e125368..139acf78 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java @@ -54,8 +54,8 @@ /** * Tag of consumer. Used for message filtering. - * For TAG selectorType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. - * For SQL92 selectorType, use SQL92 expression like "a > 5 AND b < 10". + * For TAG filterExpressionType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. + * For SQL92 filterExpressionType, use SQL92 expression like "a > 5 AND b < 10". */ String tag() default TAG_PLACEHOLDER; @@ -75,17 +75,10 @@ String consumerGroup() default CONSUMER_GROUP_PLACEHOLDER; /** - * Control how to selector message. - * - * @see SelectorType + * Control how to filter messages. + * For TAG filterExpressionType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. + * For SQL92 filterExpressionType, use SQL92 expression like "a > 5 AND b < 10". */ - SelectorType selectorType() default SelectorType.TAG; - - /** - * @deprecated Use {@link #tag()} and {@link #selectorType()} instead. - * This field will be removed in a future version. - */ - @Deprecated String filterExpressionType() default FILTER_EXPRESSION_TYPE_PLACEHOLDER; /** diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java index c91060bb..4450c0cb 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java @@ -54,14 +54,7 @@ String topic() default TOPIC_PLACEHOLDER; /** - * Control how to selector message. - * - * @see SelectorType - */ - SelectorType selectorType() default SelectorType.TAG; - - /** - * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92} + * Control which message can be select. * For TAG type, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. * For SQL92 type, use SQL92 expression like "a > 5 AND b < 10". */ @@ -73,11 +66,11 @@ boolean sslEnabled() default true; /** - * @deprecated Use {@link #tag()} and {@link #selectorType()} instead. - * This field will be removed in a future version. + * Control how to filter messages. + * For TAG filterExpressionType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. + * For SQL92 filterExpressionType, use SQL92 expression like "a > 5 AND b < 10". */ - @Deprecated - String filterExpressionType() default "tag"; + String filterExpressionType() default TAG_PLACEHOLDER; /** * The load balancing group for the simple consumer. diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/SelectorType.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/SelectorType.java deleted file mode 100644 index beb4c068..00000000 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/SelectorType.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.client.annotation; - -/** - * Message selector type. - */ -public enum SelectorType { - - /** - * Tag expression. - */ - TAG, - - /** - * SQL92 expression. - */ - SQL92 -} \ No newline at end of file diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java index 547d3b22..8b8b5413 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.client.autoconfigure; -import org.apache.rocketmq.client.annotation.SelectorType; import org.apache.rocketmq.client.support.RocketMQMessageConverter; import org.apache.rocketmq.client.support.RocketMQUtil; import org.apache.rocketmq.client.apis.ClientConfiguration; @@ -117,8 +116,8 @@ private SimpleConsumerInfo createConsumer( String endPoints = resolvePlaceholders(annotation.endpoints(), simpleConsumer.getEndpoints()); String namespace = resolvePlaceholders(annotation.namespace(), simpleConsumer.getNamespace()); String tag = resolvePlaceholders(annotation.tag(), simpleConsumer.getTag()); - // Use selectorType to determine the filter expression type, similar to rocketmq-spring-boot - String filterExpressionType = annotation.selectorType() == SelectorType.TAG ? "tag" : "sql92"; + // Use filterExpressionType to determine the filter expression type, similar to rocketmq-spring-boot + String filterExpressionType = annotation.filterExpressionType(); Duration requestTimeout = Duration.ofSeconds(annotation.requestTimeout()); int awaitDuration = annotation.awaitDuration(); Boolean sslEnabled = simpleConsumer.isSslEnabled(); diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java index f17f45f1..e39f26bf 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.client.autoconfigure; import org.apache.rocketmq.client.annotation.RocketMQMessageListener; -import org.apache.rocketmq.client.annotation.SelectorType; import org.apache.rocketmq.client.core.RocketMQListener; import org.apache.rocketmq.client.support.DefaultListenerContainer; import org.apache.rocketmq.client.support.RocketMQMessageConverter; @@ -109,8 +108,7 @@ private DefaultListenerContainer createRocketMQListenerContainer(String name, Ob container.setConsumptionThreadCount(annotation.consumptionThreadCount()); container.setMaxCacheMessageSizeInBytes(annotation.maxCacheMessageSizeInBytes()); // Use selectorType to determine the filter expression type, similar to rocketmq-spring-boot - container.setType(annotation.selectorType() == SelectorType.TAG ? "tag" : "sql92"); - container.setSelectorType(annotation.selectorType()); + container.setType(annotation.filterExpressionType()); container.setSslEnabled(annotation.sslEnabled()); return container; } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java index 871df933..45c12434 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java @@ -105,8 +105,8 @@ public SimpleConsumerBuilder simpleConsumerBuilder(RocketMQProperties rocketMQPr RocketMQProperties.SimpleConsumer simpleConsumer = rocketMQProperties.getSimpleConsumer(); final ClientServiceProvider provider = ClientServiceProvider.loadService(); String consumerGroup = simpleConsumer.getConsumerGroup(); - // Use selectorType to determine the filter expression type, similar to rocketmq-spring-boot - String filterExpressionTypeValue = simpleConsumer.getSelectorType() != null ? simpleConsumer.getSelectorType() : "TAG"; + // Use filterExpressionType to determine the filter expression type, similar to rocketmq-spring-boot + String filterExpressionTypeValue = simpleConsumer.getFilterExpressionType() != null ? simpleConsumer.getFilterExpressionType() : "TAG"; String tagExpression = simpleConsumer.getTag() != null ? simpleConsumer.getTag() : "*"; FilterExpression filterExpression = RocketMQUtil.createFilterExpression(tagExpression, filterExpressionTypeValue.toLowerCase()); ClientConfiguration clientConfiguration = RocketMQUtil.createConsumerClientConfiguration(simpleConsumer); diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java index 0003c196..c92ddbad 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java @@ -202,11 +202,11 @@ public static class SimpleConsumer { private int requestTimeout = 3; /** - * Control how to selector message. - * - * @see org.apache.rocketmq.client.annotation.SelectorType + * Control how to filter messages. + * For TAG filterExpressionType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. + * For SQL92 filterExpressionType, use SQL92 expression like "a > 5 AND b < 10". */ - private String selectorType = "TAG"; + private String filterExpressionType = "TAG"; /** * Enable or disable the use of Secure Sockets Layer (SSL) for network transport. @@ -287,12 +287,12 @@ public void setSslEnabled(boolean sslEnabled) { this.sslEnabled = sslEnabled; } - public String getSelectorType() { - return selectorType; + public String getFilterExpressionType() { + return filterExpressionType; } - public void setSelectorType(String selectorType) { - this.selectorType = selectorType; + public void setFilterExpressionType(String filterExpressionType) { + this.filterExpressionType = filterExpressionType; } public String getNamespace() { @@ -312,7 +312,6 @@ public String toString() { ", tag='" + tag + '\'' + ", topic='" + topic + '\'' + ", requestTimeout=" + requestTimeout + - ", selectorType='" + selectorType + '\'' + ", sslEnabled=" + sslEnabled + ", namespace='" + namespace + '\'' + '}'; diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java index 459ece56..4fafeb48 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.client.support; import org.apache.rocketmq.client.annotation.RocketMQMessageListener; -import org.apache.rocketmq.client.annotation.SelectorType; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.consumer.PushConsumer; @@ -74,7 +73,7 @@ public class DefaultListenerContainer implements InitializingBean, String topic; - SelectorType selectorType; + String filterExpressionType; Duration requestTimeout; @@ -179,12 +178,12 @@ public void setRequestTimeout(Duration requestTimeout) { this.requestTimeout = requestTimeout; } - public SelectorType getSelectorType() { - return selectorType; + public String getFilterExpressionType() { + return filterExpressionType; } - public void setSelectorType(SelectorType selectorType) { - this.selectorType = selectorType; + public void setFilterExpressionType(String filterExpressionType) { + this.filterExpressionType = filterExpressionType; } public Boolean getSslEnabled() { @@ -231,7 +230,7 @@ public void setRocketMQMessageListener(RocketMQMessageListener rocketMQMessageLi this.endpoints = rocketMQMessageListener.endpoints(); this.topic = rocketMQMessageListener.topic(); this.tag = rocketMQMessageListener.tag(); - this.selectorType = rocketMQMessageListener.selectorType(); + this.filterExpressionType = rocketMQMessageListener.filterExpressionType(); this.sslEnabled = rocketMQMessageListener.sslEnabled(); this.consumerGroup = rocketMQMessageListener.consumerGroup(); this.requestTimeout = Duration.ofSeconds(rocketMQMessageListener.requestTimeout()); @@ -273,9 +272,8 @@ private void initRocketMQPushConsumer() { Assert.hasText(topic, "Property 'topic' is required"); Assert.hasText(tag, "Property 'tag' is required"); - // Convert SelectorType to FilterExpressionType string for createFilterExpression - String filterExpressionTypeStr = (selectorType != null && selectorType == SelectorType.SQL92) - ? "sql92" : "tag"; + // Use filterExpressionType directly instead of converting from SelectorType + String filterExpressionTypeStr = this.getFilterExpressionType() != null ? this.getFilterExpressionType().toLowerCase() : "tag"; FilterExpression filterExpression = null; final ClientServiceProvider provider = ClientServiceProvider.loadService(); @@ -379,7 +377,7 @@ public String toString() { ", consumerGroup='" + consumerGroup + '\'' + ", tag='" + tag + '\'' + ", topic='" + topic + '\'' + - ", selectorType=" + selectorType + + ", filterExpressionType='" + filterExpressionType + '\'' + ", requestTimeout=" + requestTimeout + ", maxCachedMessageCount=" + maxCachedMessageCount + ", maxCacheMessageSizeInBytes=" + maxCacheMessageSizeInBytes + From bd24c312e25e47fc5fd98a5f274131515261970f Mon Sep 17 00:00:00 2001 From: zhaohaihzb Date: Tue, 10 Mar 2026 14:52:29 +0800 Subject: [PATCH 09/11] update tag --- .../rocketmq/client/autoconfigure/RocketMQProperties.java | 4 ++-- .../rocketmq/client/support/DefaultListenerContainer.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java index c92ddbad..9aa27ce6 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java @@ -186,8 +186,8 @@ public static class SimpleConsumer { /** * Tag of consumer. Used for message filtering. - * For TAG selectorType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. - * For SQL92 selectorType, use SQL92 expression like "a > 5 AND b < 10". + * For TAG filterExpressionType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. + * For SQL92 filterExpressionType, use SQL92 expression like "a > 5 AND b < 10". */ private String tag; diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java index 4fafeb48..9c12ea96 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java @@ -272,7 +272,7 @@ private void initRocketMQPushConsumer() { Assert.hasText(topic, "Property 'topic' is required"); Assert.hasText(tag, "Property 'tag' is required"); - // Use filterExpressionType directly instead of converting from SelectorType + // Use filterExpressionType directly instead of converting from String filterExpressionTypeStr = this.getFilterExpressionType() != null ? this.getFilterExpressionType().toLowerCase() : "tag"; FilterExpression filterExpression = null; From 9d7ff2d0e70019b2034e71fa94f4292afd71836c Mon Sep 17 00:00:00 2001 From: zhaohaihzb Date: Tue, 10 Mar 2026 15:05:22 +0800 Subject: [PATCH 10/11] Revert "update tag" This reverts commit 756df0a7cc2d6f3ab4e1eee072438c1b1ae98fe8. # Conflicts: # rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java # rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java # rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java # rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java # rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java # rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java --- .../consumer/Sql92FilterConsumer.java | 49 ------------------- .../consumer/TagFilterConsumer.java | 48 ------------------ 2 files changed, 97 deletions(-) delete mode 100644 rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/Sql92FilterConsumer.java delete mode 100644 rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/TagFilterConsumer.java diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/Sql92FilterConsumer.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/Sql92FilterConsumer.java deleted file mode 100644 index a9b82de0..00000000 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/Sql92FilterConsumer.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.samples.springboot.consumer; - -import org.apache.rocketmq.client.apis.consumer.ConsumeResult; -import org.apache.rocketmq.client.apis.message.MessageView; -import org.apache.rocketmq.client.annotation.RocketMQMessageListener; -import org.apache.rocketmq.client.annotation.SelectorType; -import org.apache.rocketmq.client.core.RocketMQListener; -import org.springframework.stereotype.Service; - -/** - * Example consumer using SQL92-based message filtering with selectorType. - * This demonstrates SQL92 expression filtering capability. - */ -@Service -@RocketMQMessageListener( - endpoints = "${demo.sql92-filter.rocketmq.endpoints:}", - topic = "${demo.sql92-filter.rocketmq.topic:}", - consumerGroup = "${demo.sql92-filter.rocketmq.consumer-group:}", - selectorType = SelectorType.SQL92, - tag = "a > 5 AND b < 10" -) -public class Sql92FilterConsumer implements RocketMQListener { - - @Override - public ConsumeResult consume(MessageView messageView) { - System.out.println("Received message with SQL92 filter: " + messageView); - // Get message properties - messageView.getProperties().forEach((key, value) -> - System.out.println("Property: " + key + " = " + value) - ); - return ConsumeResult.SUCCESS; - } -} diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/TagFilterConsumer.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/TagFilterConsumer.java deleted file mode 100644 index e9c6e6d9..00000000 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/TagFilterConsumer.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.samples.springboot.consumer; - -import org.apache.rocketmq.client.apis.consumer.ConsumeResult; -import org.apache.rocketmq.client.apis.message.MessageView; -import org.apache.rocketmq.client.annotation.RocketMQMessageListener; -import org.apache.rocketmq.client.annotation.SelectorType; -import org.apache.rocketmq.client.core.RocketMQListener; -import org.springframework.stereotype.Service; - -/** - * Example consumer using tag-based message filtering with selectorType. - * This demonstrates the new way to configure message filtering. - */ -@Service -@RocketMQMessageListener( - endpoints = "${demo.tag-filter.rocketmq.endpoints:}", - topic = "${demo.tag-filter.rocketmq.topic:}", - consumerGroup = "${demo.tag-filter.rocketmq.consumer-group:}", - selectorType = SelectorType.TAG, - tag = "tagA || tagB" -) -public class TagFilterConsumer implements RocketMQListener { - - @Override - public ConsumeResult consume(MessageView messageView) { - System.out.println("Received message with tag filter: " + messageView); - // Get message tag from properties - String tag = messageView.getTag().orElse(null); - System.out.println("Message tag: " + tag); - return ConsumeResult.SUCCESS; - } -} From 0d97ab2efcb0d3af2f0c7f26df58f48112e4c785 Mon Sep 17 00:00:00 2001 From: zhaohaihzb Date: Tue, 10 Mar 2026 15:08:21 +0800 Subject: [PATCH 11/11] update log --- .../ExtConsumerResetConfiguration.java | 21 ++--- .../annotation/RocketMQMessageListener.java | 19 ++-- .../ExtConsumerResetConfiguration.java | 4 +- .../ListenerContainerConfiguration.java | 3 +- .../RocketMQAutoConfiguration.java | 4 +- .../autoconfigure/RocketMQProperties.java | 21 +++-- .../support/DefaultListenerContainer.java | 86 +++++++++++-------- 7 files changed, 79 insertions(+), 79 deletions(-) diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java index ea340d16..139acf78 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java @@ -31,11 +31,11 @@ String ACCESS_KEY_PLACEHOLDER = "${rocketmq.simple-consumer.accessKey:}"; String SECRET_KEY_PLACEHOLDER = "${rocketmq.simple-consumer.secretKey:}"; - String TAG_PLACEHOLDER = "${rocketmq.simple-consumer.tag:}"; + String TAG_PLACEHOLDER = "${rocketmq.simple-consumer.tag:*}"; String TOPIC_PLACEHOLDER = "${rocketmq.simple-consumer.topic:}"; String ENDPOINTS_PLACEHOLDER = "${rocketmq.simple-consumer.endpoints:}"; String CONSUMER_GROUP_PLACEHOLDER = "${rocketmq.simple-consumer.consumerGroup:}"; - String FILTER_EXPRESSION_TYPE_PLACEHOLDER = "${rocketmq.simple-consumer.filterExpressionType:}"; + String FILTER_EXPRESSION_TYPE_PLACEHOLDER = "${rocketmq.simple-consumer.filterExpressionType:tag}"; /** * The component name of the Consumer configuration. @@ -54,8 +54,8 @@ /** * Tag of consumer. Used for message filtering. - * For TAG selectorType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. - * For SQL92 selectorType, use SQL92 expression like "a > 5 AND b < 10". + * For TAG filterExpressionType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. + * For SQL92 filterExpressionType, use SQL92 expression like "a > 5 AND b < 10". */ String tag() default TAG_PLACEHOLDER; @@ -75,17 +75,10 @@ String consumerGroup() default CONSUMER_GROUP_PLACEHOLDER; /** - * Control how to selector message. - * - * @see SelectorType + * Control how to filter messages. + * For TAG filterExpressionType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. + * For SQL92 filterExpressionType, use SQL92 expression like "a > 5 AND b < 10". */ - SelectorType selectorType() default SelectorType.TAG; - - /** - * @deprecated Use {@link #tag()} and {@link #selectorType()} instead. - * This field will be removed in a future version. - */ - @Deprecated String filterExpressionType() default FILTER_EXPRESSION_TYPE_PLACEHOLDER; /** diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java index a62a3b81..4450c0cb 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java @@ -31,7 +31,7 @@ String SECRET_KEY_PLACEHOLDER = "${rocketmq.push-consumer.secret-key:}"; String ENDPOINTS_PLACEHOLDER = "${rocketmq.push-consumer.endpoints:}"; String TOPIC_PLACEHOLDER = "${rocketmq.push-consumer.topic:}"; - String TAG_PLACEHOLDER = "${rocketmq.push-consumer.tag:}"; + String TAG_PLACEHOLDER = "${rocketmq.push-consumer.tag:*}"; /** * The property of "access-key". @@ -54,14 +54,7 @@ String topic() default TOPIC_PLACEHOLDER; /** - * Control how to selector message. - * - * @see SelectorType - */ - SelectorType selectorType() default SelectorType.TAG; - - /** - * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92} + * Control which message can be select. * For TAG type, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. * For SQL92 type, use SQL92 expression like "a > 5 AND b < 10". */ @@ -73,11 +66,11 @@ boolean sslEnabled() default true; /** - * @deprecated Use {@link #tag()} and {@link #selectorType()} instead. - * This field will be removed in a future version. + * Control how to filter messages. + * For TAG filterExpressionType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. + * For SQL92 filterExpressionType, use SQL92 expression like "a > 5 AND b < 10". */ - @Deprecated - String filterExpressionType() default "tag"; + String filterExpressionType() default TAG_PLACEHOLDER; /** * The load balancing group for the simple consumer. diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java index e9e78358..8b8b5413 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java @@ -116,8 +116,8 @@ private SimpleConsumerInfo createConsumer( String endPoints = resolvePlaceholders(annotation.endpoints(), simpleConsumer.getEndpoints()); String namespace = resolvePlaceholders(annotation.namespace(), simpleConsumer.getNamespace()); String tag = resolvePlaceholders(annotation.tag(), simpleConsumer.getTag()); - // Use selectorType to determine the filter expression type, similar to rocketmq-spring-boot - String filterExpressionType = annotation.selectorType() == org.apache.rocketmq.client.annotation.SelectorType.TAG ? "tag" : "sql92"; + // Use filterExpressionType to determine the filter expression type, similar to rocketmq-spring-boot + String filterExpressionType = annotation.filterExpressionType(); Duration requestTimeout = Duration.ofSeconds(annotation.requestTimeout()); int awaitDuration = annotation.awaitDuration(); Boolean sslEnabled = simpleConsumer.isSslEnabled(); diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java index e8b040d2..e39f26bf 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.client.autoconfigure; import org.apache.rocketmq.client.annotation.RocketMQMessageListener; -import org.apache.rocketmq.client.annotation.SelectorType; import org.apache.rocketmq.client.core.RocketMQListener; import org.apache.rocketmq.client.support.DefaultListenerContainer; import org.apache.rocketmq.client.support.RocketMQMessageConverter; @@ -109,7 +108,7 @@ private DefaultListenerContainer createRocketMQListenerContainer(String name, Ob container.setConsumptionThreadCount(annotation.consumptionThreadCount()); container.setMaxCacheMessageSizeInBytes(annotation.maxCacheMessageSizeInBytes()); // Use selectorType to determine the filter expression type, similar to rocketmq-spring-boot - container.setType(annotation.selectorType() == SelectorType.TAG ? "tag" : "sql92"); + container.setType(annotation.filterExpressionType()); container.setSslEnabled(annotation.sslEnabled()); return container; } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java index 871df933..45c12434 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java @@ -105,8 +105,8 @@ public SimpleConsumerBuilder simpleConsumerBuilder(RocketMQProperties rocketMQPr RocketMQProperties.SimpleConsumer simpleConsumer = rocketMQProperties.getSimpleConsumer(); final ClientServiceProvider provider = ClientServiceProvider.loadService(); String consumerGroup = simpleConsumer.getConsumerGroup(); - // Use selectorType to determine the filter expression type, similar to rocketmq-spring-boot - String filterExpressionTypeValue = simpleConsumer.getSelectorType() != null ? simpleConsumer.getSelectorType() : "TAG"; + // Use filterExpressionType to determine the filter expression type, similar to rocketmq-spring-boot + String filterExpressionTypeValue = simpleConsumer.getFilterExpressionType() != null ? simpleConsumer.getFilterExpressionType() : "TAG"; String tagExpression = simpleConsumer.getTag() != null ? simpleConsumer.getTag() : "*"; FilterExpression filterExpression = RocketMQUtil.createFilterExpression(tagExpression, filterExpressionTypeValue.toLowerCase()); ClientConfiguration clientConfiguration = RocketMQUtil.createConsumerClientConfiguration(simpleConsumer); diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java index 0003c196..9aa27ce6 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java @@ -186,8 +186,8 @@ public static class SimpleConsumer { /** * Tag of consumer. Used for message filtering. - * For TAG selectorType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. - * For SQL92 selectorType, use SQL92 expression like "a > 5 AND b < 10". + * For TAG filterExpressionType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. + * For SQL92 filterExpressionType, use SQL92 expression like "a > 5 AND b < 10". */ private String tag; @@ -202,11 +202,11 @@ public static class SimpleConsumer { private int requestTimeout = 3; /** - * Control how to selector message. - * - * @see org.apache.rocketmq.client.annotation.SelectorType + * Control how to filter messages. + * For TAG filterExpressionType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. + * For SQL92 filterExpressionType, use SQL92 expression like "a > 5 AND b < 10". */ - private String selectorType = "TAG"; + private String filterExpressionType = "TAG"; /** * Enable or disable the use of Secure Sockets Layer (SSL) for network transport. @@ -287,12 +287,12 @@ public void setSslEnabled(boolean sslEnabled) { this.sslEnabled = sslEnabled; } - public String getSelectorType() { - return selectorType; + public String getFilterExpressionType() { + return filterExpressionType; } - public void setSelectorType(String selectorType) { - this.selectorType = selectorType; + public void setFilterExpressionType(String filterExpressionType) { + this.filterExpressionType = filterExpressionType; } public String getNamespace() { @@ -312,7 +312,6 @@ public String toString() { ", tag='" + tag + '\'' + ", topic='" + topic + '\'' + ", requestTimeout=" + requestTimeout + - ", selectorType='" + selectorType + '\'' + ", sslEnabled=" + sslEnabled + ", namespace='" + namespace + '\'' + '}'; diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java index 4d0925d0..9c12ea96 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java @@ -19,7 +19,6 @@ import org.apache.rocketmq.client.annotation.RocketMQMessageListener; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientServiceProvider; -import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder; import org.apache.rocketmq.client.apis.consumer.FilterExpression; @@ -74,9 +73,7 @@ public class DefaultListenerContainer implements InitializingBean, String topic; - String type; - - FilterExpressionType filterExpressionType; + String filterExpressionType; Duration requestTimeout; @@ -90,6 +87,8 @@ public class DefaultListenerContainer implements InitializingBean, String namespace; + String type; + public String getName() { return name; } @@ -179,40 +178,40 @@ public void setRequestTimeout(Duration requestTimeout) { this.requestTimeout = requestTimeout; } - public FilterExpressionType getFilterExpressionType() { + public String getFilterExpressionType() { return filterExpressionType; } - public void setFilterExpressionType(FilterExpressionType filterExpressionType) { + public void setFilterExpressionType(String filterExpressionType) { this.filterExpressionType = filterExpressionType; } - public int getMaxCachedMessageCount() { - return maxCachedMessageCount; + public Boolean getSslEnabled() { + return sslEnabled; } - public void setMaxCachedMessageCount(int maxCachedMessageCount) { - this.maxCachedMessageCount = maxCachedMessageCount; + public void setSslEnabled(Boolean sslEnabled) { + this.sslEnabled = sslEnabled; } - public int getMaxCacheMessageSizeInBytes() { - return maxCacheMessageSizeInBytes; + public String getNamespace() { + return namespace; } - public void setMaxCacheMessageSizeInBytes(int maxCacheMessageSizeInBytes) { - this.maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes; + public void setNamespace(String namespace) { + this.namespace = namespace; } - public int getConsumptionThreadCount() { - return consumptionThreadCount; + public RocketMQListener getMessageListener() { + return rocketMQListener; } - public void setConsumptionThreadCount(int consumptionThreadCount) { - this.consumptionThreadCount = consumptionThreadCount; + public String getType() { + return type; } - public RocketMQListener getMessageListener() { - return rocketMQListener; + public void setType(String type) { + this.type = type; } public void setMessageListener(RocketMQListener rocketMQListener) { @@ -225,30 +224,44 @@ public RocketMQMessageListener getRocketMQMessageListener() { public void setRocketMQMessageListener(RocketMQMessageListener rocketMQMessageListener) { this.rocketMQMessageListener = rocketMQMessageListener; + + this.accessKey = rocketMQMessageListener.accessKey(); + this.secretKey = rocketMQMessageListener.secretKey(); + this.endpoints = rocketMQMessageListener.endpoints(); + this.topic = rocketMQMessageListener.topic(); + this.tag = rocketMQMessageListener.tag(); + this.filterExpressionType = rocketMQMessageListener.filterExpressionType(); + this.sslEnabled = rocketMQMessageListener.sslEnabled(); + this.consumerGroup = rocketMQMessageListener.consumerGroup(); + this.requestTimeout = Duration.ofSeconds(rocketMQMessageListener.requestTimeout()); + this.maxCachedMessageCount = rocketMQMessageListener.maxCachedMessageCount(); + this.maxCacheMessageSizeInBytes = rocketMQMessageListener.maxCacheMessageSizeInBytes(); + this.consumptionThreadCount = rocketMQMessageListener.consumptionThreadCount(); + this.namespace = rocketMQMessageListener.namespace(); } - public String getType() { - return type; + public int getMaxCachedMessageCount() { + return maxCachedMessageCount; } - public void setType(String type) { - this.type = type; + public void setMaxCachedMessageCount(int maxCachedMessageCount) { + this.maxCachedMessageCount = maxCachedMessageCount; } - public Boolean getSslEnabled() { - return sslEnabled; + public int getMaxCacheMessageSizeInBytes() { + return maxCacheMessageSizeInBytes; } - public void setSslEnabled(Boolean sslEnabled) { - this.sslEnabled = sslEnabled; + public void setMaxCacheMessageSizeInBytes(int maxCacheMessageSizeInBytes) { + this.maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes; } - public String getNamespace() { - return namespace; + public int getConsumptionThreadCount() { + return consumptionThreadCount; } - public void setNamespace(String namespace) { - this.namespace = namespace; + public void setConsumptionThreadCount(int consumptionThreadCount) { + this.consumptionThreadCount = consumptionThreadCount; } private void initRocketMQPushConsumer() { @@ -258,10 +271,14 @@ private void initRocketMQPushConsumer() { Assert.hasText(consumerGroup, "Property 'consumerGroup' is required"); Assert.hasText(topic, "Property 'topic' is required"); Assert.hasText(tag, "Property 'tag' is required"); + + // Use filterExpressionType directly instead of converting from + String filterExpressionTypeStr = this.getFilterExpressionType() != null ? this.getFilterExpressionType().toLowerCase() : "tag"; + FilterExpression filterExpression = null; final ClientServiceProvider provider = ClientServiceProvider.loadService(); if (StringUtils.hasLength(this.getTag())) { - filterExpression = RocketMQUtil.createFilterExpression(this.getTag(),this.getType()); + filterExpression = RocketMQUtil.createFilterExpression(this.getTag(), filterExpressionTypeStr); } ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(this.getAccessKey(), this.getSecretKey(), this.getEndpoints(), this.getRequestTimeout(), this.sslEnabled, this.namespace); @@ -360,8 +377,7 @@ public String toString() { ", consumerGroup='" + consumerGroup + '\'' + ", tag='" + tag + '\'' + ", topic='" + topic + '\'' + - ", type='" + type + '\'' + - ", filterExpressionType=" + filterExpressionType + + ", filterExpressionType='" + filterExpressionType + '\'' + ", requestTimeout=" + requestTimeout + ", maxCachedMessageCount=" + maxCachedMessageCount + ", maxCacheMessageSizeInBytes=" + maxCacheMessageSizeInBytes +