diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java index f0b942f5..139acf78 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java @@ -31,11 +31,11 @@ String ACCESS_KEY_PLACEHOLDER = "${rocketmq.simple-consumer.accessKey:}"; String SECRET_KEY_PLACEHOLDER = "${rocketmq.simple-consumer.secretKey:}"; - String TAG_PLACEHOLDER = "${rocketmq.simple-consumer.tag:}"; + String TAG_PLACEHOLDER = "${rocketmq.simple-consumer.tag:*}"; String TOPIC_PLACEHOLDER = "${rocketmq.simple-consumer.topic:}"; String ENDPOINTS_PLACEHOLDER = "${rocketmq.simple-consumer.endpoints:}"; String CONSUMER_GROUP_PLACEHOLDER = "${rocketmq.simple-consumer.consumerGroup:}"; - String FILTER_EXPRESSION_TYPE_PLACEHOLDER = "${rocketmq.simple-consumer.filterExpressionType:}"; + String FILTER_EXPRESSION_TYPE_PLACEHOLDER = "${rocketmq.simple-consumer.filterExpressionType:tag}"; /** * The component name of the Consumer configuration. @@ -53,7 +53,9 @@ String secretKey() default SECRET_KEY_PLACEHOLDER; /** - * Tag of consumer. + * Tag of consumer. Used for message filtering. + * For TAG filterExpressionType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. + * For SQL92 filterExpressionType, use SQL92 expression like "a > 5 AND b < 10". */ String tag() default TAG_PLACEHOLDER; @@ -73,7 +75,9 @@ String consumerGroup() default CONSUMER_GROUP_PLACEHOLDER; /** - * The type of filter expression + * Control how to filter messages. + * For TAG filterExpressionType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. + * For SQL92 filterExpressionType, use SQL92 expression like "a > 5 AND b < 10". */ String filterExpressionType() default FILTER_EXPRESSION_TYPE_PLACEHOLDER; diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java index 6f0f836d..4450c0cb 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java @@ -31,7 +31,7 @@ String SECRET_KEY_PLACEHOLDER = "${rocketmq.push-consumer.secret-key:}"; String ENDPOINTS_PLACEHOLDER = "${rocketmq.push-consumer.endpoints:}"; String TOPIC_PLACEHOLDER = "${rocketmq.push-consumer.topic:}"; - String TAG_PLACEHOLDER = "${rocketmq.push-consumer.tag:}"; + String TAG_PLACEHOLDER = "${rocketmq.push-consumer.tag:*}"; /** * The property of "access-key". @@ -54,7 +54,9 @@ String topic() default TOPIC_PLACEHOLDER; /** - * Tag of consumer. + * Control which message can be select. + * For TAG type, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. + * For SQL92 type, use SQL92 expression like "a > 5 AND b < 10". */ String tag() default TAG_PLACEHOLDER; @@ -64,9 +66,11 @@ boolean sslEnabled() default true; /** - * The type of filter expression + * Control how to filter messages. + * For TAG filterExpressionType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. + * For SQL92 filterExpressionType, use SQL92 expression like "a > 5 AND b < 10". */ - String filterExpressionType() default "tag"; + String filterExpressionType() default TAG_PLACEHOLDER; /** * The load balancing group for the simple consumer. diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java index a501e64e..8b8b5413 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java @@ -116,7 +116,8 @@ private SimpleConsumerInfo createConsumer( String endPoints = resolvePlaceholders(annotation.endpoints(), simpleConsumer.getEndpoints()); String namespace = resolvePlaceholders(annotation.namespace(), simpleConsumer.getNamespace()); String tag = resolvePlaceholders(annotation.tag(), simpleConsumer.getTag()); - String filterExpressionType = resolvePlaceholders(annotation.filterExpressionType(), simpleConsumer.getFilterExpressionType()); + // Use filterExpressionType to determine the filter expression type, similar to rocketmq-spring-boot + String filterExpressionType = annotation.filterExpressionType(); Duration requestTimeout = Duration.ofSeconds(annotation.requestTimeout()); int awaitDuration = annotation.awaitDuration(); Boolean sslEnabled = simpleConsumer.isSslEnabled(); diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java index 1975dccf..e39f26bf 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java @@ -107,6 +107,7 @@ private DefaultListenerContainer createRocketMQListenerContainer(String name, Ob container.setMaxCachedMessageCount(annotation.maxCachedMessageCount()); container.setConsumptionThreadCount(annotation.consumptionThreadCount()); container.setMaxCacheMessageSizeInBytes(annotation.maxCacheMessageSizeInBytes()); + // Use selectorType to determine the filter expression type, similar to rocketmq-spring-boot container.setType(annotation.filterExpressionType()); container.setSslEnabled(annotation.sslEnabled()); return container; diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java index 346311f2..45c12434 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java @@ -105,7 +105,10 @@ public SimpleConsumerBuilder simpleConsumerBuilder(RocketMQProperties rocketMQPr RocketMQProperties.SimpleConsumer simpleConsumer = rocketMQProperties.getSimpleConsumer(); final ClientServiceProvider provider = ClientServiceProvider.loadService(); String consumerGroup = simpleConsumer.getConsumerGroup(); - FilterExpression filterExpression = RocketMQUtil.createFilterExpression(simpleConsumer.getTag(), simpleConsumer.getFilterExpressionType()); + // Use filterExpressionType to determine the filter expression type, similar to rocketmq-spring-boot + String filterExpressionTypeValue = simpleConsumer.getFilterExpressionType() != null ? simpleConsumer.getFilterExpressionType() : "TAG"; + String tagExpression = simpleConsumer.getTag() != null ? simpleConsumer.getTag() : "*"; + FilterExpression filterExpression = RocketMQUtil.createFilterExpression(tagExpression, filterExpressionTypeValue.toLowerCase()); ClientConfiguration clientConfiguration = RocketMQUtil.createConsumerClientConfiguration(simpleConsumer); SimpleConsumerBuilder simpleConsumerBuilder = provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration); diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java index 8f3d4941..9aa27ce6 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java @@ -185,7 +185,9 @@ public static class SimpleConsumer { private int awaitDuration = 5; /** - * Tag of consumer. + * Tag of consumer. Used for message filtering. + * For TAG filterExpressionType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. + * For SQL92 filterExpressionType, use SQL92 expression like "a > 5 AND b < 10". */ private String tag; @@ -200,9 +202,11 @@ public static class SimpleConsumer { private int requestTimeout = 3; /** - * The type of filter expression + * Control how to filter messages. + * For TAG filterExpressionType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags. + * For SQL92 filterExpressionType, use SQL92 expression like "a > 5 AND b < 10". */ - private String filterExpressionType = "tag"; + private String filterExpressionType = "TAG"; /** * Enable or disable the use of Secure Sockets Layer (SSL) for network transport. @@ -308,7 +312,6 @@ public String toString() { ", tag='" + tag + '\'' + ", topic='" + topic + '\'' + ", requestTimeout=" + requestTimeout + - ", filterExpressionType='" + filterExpressionType + '\'' + ", sslEnabled=" + sslEnabled + ", namespace='" + namespace + '\'' + '}'; diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java index 4d0925d0..9c12ea96 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java @@ -19,7 +19,6 @@ import org.apache.rocketmq.client.annotation.RocketMQMessageListener; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientServiceProvider; -import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder; import org.apache.rocketmq.client.apis.consumer.FilterExpression; @@ -74,9 +73,7 @@ public class DefaultListenerContainer implements InitializingBean, String topic; - String type; - - FilterExpressionType filterExpressionType; + String filterExpressionType; Duration requestTimeout; @@ -90,6 +87,8 @@ public class DefaultListenerContainer implements InitializingBean, String namespace; + String type; + public String getName() { return name; } @@ -179,40 +178,40 @@ public void setRequestTimeout(Duration requestTimeout) { this.requestTimeout = requestTimeout; } - public FilterExpressionType getFilterExpressionType() { + public String getFilterExpressionType() { return filterExpressionType; } - public void setFilterExpressionType(FilterExpressionType filterExpressionType) { + public void setFilterExpressionType(String filterExpressionType) { this.filterExpressionType = filterExpressionType; } - public int getMaxCachedMessageCount() { - return maxCachedMessageCount; + public Boolean getSslEnabled() { + return sslEnabled; } - public void setMaxCachedMessageCount(int maxCachedMessageCount) { - this.maxCachedMessageCount = maxCachedMessageCount; + public void setSslEnabled(Boolean sslEnabled) { + this.sslEnabled = sslEnabled; } - public int getMaxCacheMessageSizeInBytes() { - return maxCacheMessageSizeInBytes; + public String getNamespace() { + return namespace; } - public void setMaxCacheMessageSizeInBytes(int maxCacheMessageSizeInBytes) { - this.maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes; + public void setNamespace(String namespace) { + this.namespace = namespace; } - public int getConsumptionThreadCount() { - return consumptionThreadCount; + public RocketMQListener getMessageListener() { + return rocketMQListener; } - public void setConsumptionThreadCount(int consumptionThreadCount) { - this.consumptionThreadCount = consumptionThreadCount; + public String getType() { + return type; } - public RocketMQListener getMessageListener() { - return rocketMQListener; + public void setType(String type) { + this.type = type; } public void setMessageListener(RocketMQListener rocketMQListener) { @@ -225,30 +224,44 @@ public RocketMQMessageListener getRocketMQMessageListener() { public void setRocketMQMessageListener(RocketMQMessageListener rocketMQMessageListener) { this.rocketMQMessageListener = rocketMQMessageListener; + + this.accessKey = rocketMQMessageListener.accessKey(); + this.secretKey = rocketMQMessageListener.secretKey(); + this.endpoints = rocketMQMessageListener.endpoints(); + this.topic = rocketMQMessageListener.topic(); + this.tag = rocketMQMessageListener.tag(); + this.filterExpressionType = rocketMQMessageListener.filterExpressionType(); + this.sslEnabled = rocketMQMessageListener.sslEnabled(); + this.consumerGroup = rocketMQMessageListener.consumerGroup(); + this.requestTimeout = Duration.ofSeconds(rocketMQMessageListener.requestTimeout()); + this.maxCachedMessageCount = rocketMQMessageListener.maxCachedMessageCount(); + this.maxCacheMessageSizeInBytes = rocketMQMessageListener.maxCacheMessageSizeInBytes(); + this.consumptionThreadCount = rocketMQMessageListener.consumptionThreadCount(); + this.namespace = rocketMQMessageListener.namespace(); } - public String getType() { - return type; + public int getMaxCachedMessageCount() { + return maxCachedMessageCount; } - public void setType(String type) { - this.type = type; + public void setMaxCachedMessageCount(int maxCachedMessageCount) { + this.maxCachedMessageCount = maxCachedMessageCount; } - public Boolean getSslEnabled() { - return sslEnabled; + public int getMaxCacheMessageSizeInBytes() { + return maxCacheMessageSizeInBytes; } - public void setSslEnabled(Boolean sslEnabled) { - this.sslEnabled = sslEnabled; + public void setMaxCacheMessageSizeInBytes(int maxCacheMessageSizeInBytes) { + this.maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes; } - public String getNamespace() { - return namespace; + public int getConsumptionThreadCount() { + return consumptionThreadCount; } - public void setNamespace(String namespace) { - this.namespace = namespace; + public void setConsumptionThreadCount(int consumptionThreadCount) { + this.consumptionThreadCount = consumptionThreadCount; } private void initRocketMQPushConsumer() { @@ -258,10 +271,14 @@ private void initRocketMQPushConsumer() { Assert.hasText(consumerGroup, "Property 'consumerGroup' is required"); Assert.hasText(topic, "Property 'topic' is required"); Assert.hasText(tag, "Property 'tag' is required"); + + // Use filterExpressionType directly instead of converting from + String filterExpressionTypeStr = this.getFilterExpressionType() != null ? this.getFilterExpressionType().toLowerCase() : "tag"; + FilterExpression filterExpression = null; final ClientServiceProvider provider = ClientServiceProvider.loadService(); if (StringUtils.hasLength(this.getTag())) { - filterExpression = RocketMQUtil.createFilterExpression(this.getTag(),this.getType()); + filterExpression = RocketMQUtil.createFilterExpression(this.getTag(), filterExpressionTypeStr); } ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(this.getAccessKey(), this.getSecretKey(), this.getEndpoints(), this.getRequestTimeout(), this.sslEnabled, this.namespace); @@ -360,8 +377,7 @@ public String toString() { ", consumerGroup='" + consumerGroup + '\'' + ", tag='" + tag + '\'' + ", topic='" + topic + '\'' + - ", type='" + type + '\'' + - ", filterExpressionType=" + filterExpressionType + + ", filterExpressionType='" + filterExpressionType + '\'' + ", requestTimeout=" + requestTimeout + ", maxCachedMessageCount=" + maxCachedMessageCount + ", maxCacheMessageSizeInBytes=" + maxCacheMessageSizeInBytes +