Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@

String ACCESS_KEY_PLACEHOLDER = "${rocketmq.simple-consumer.accessKey:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.simple-consumer.secretKey:}";
String TAG_PLACEHOLDER = "${rocketmq.simple-consumer.tag:}";
String TAG_PLACEHOLDER = "${rocketmq.simple-consumer.tag:*}";
String TOPIC_PLACEHOLDER = "${rocketmq.simple-consumer.topic:}";
String ENDPOINTS_PLACEHOLDER = "${rocketmq.simple-consumer.endpoints:}";
String CONSUMER_GROUP_PLACEHOLDER = "${rocketmq.simple-consumer.consumerGroup:}";
String FILTER_EXPRESSION_TYPE_PLACEHOLDER = "${rocketmq.simple-consumer.filterExpressionType:}";
String FILTER_EXPRESSION_TYPE_PLACEHOLDER = "${rocketmq.simple-consumer.filterExpressionType:tag}";

/**
* The component name of the Consumer configuration.
Expand All @@ -53,7 +53,9 @@
String secretKey() default SECRET_KEY_PLACEHOLDER;

/**
* Tag of consumer.
* Tag of consumer. Used for message filtering.
* For TAG filterExpressionType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags.
* For SQL92 filterExpressionType, use SQL92 expression like "a > 5 AND b < 10".
*/
String tag() default TAG_PLACEHOLDER;

Expand All @@ -73,7 +75,9 @@
String consumerGroup() default CONSUMER_GROUP_PLACEHOLDER;

/**
* The type of filter expression
* Control how to filter messages.
* For TAG filterExpressionType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags.
* For SQL92 filterExpressionType, use SQL92 expression like "a > 5 AND b < 10".
*/
String filterExpressionType() default FILTER_EXPRESSION_TYPE_PLACEHOLDER;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
String SECRET_KEY_PLACEHOLDER = "${rocketmq.push-consumer.secret-key:}";
String ENDPOINTS_PLACEHOLDER = "${rocketmq.push-consumer.endpoints:}";
String TOPIC_PLACEHOLDER = "${rocketmq.push-consumer.topic:}";
String TAG_PLACEHOLDER = "${rocketmq.push-consumer.tag:}";
String TAG_PLACEHOLDER = "${rocketmq.push-consumer.tag:*}";

/**
* The property of "access-key".
Expand All @@ -54,7 +54,9 @@
String topic() default TOPIC_PLACEHOLDER;

/**
* Tag of consumer.
* Control which message can be select.
* For TAG type, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags.
* For SQL92 type, use SQL92 expression like "a > 5 AND b < 10".
*/
String tag() default TAG_PLACEHOLDER;

Expand All @@ -64,9 +66,11 @@
boolean sslEnabled() default true;

/**
* The type of filter expression
* Control how to filter messages.
* For TAG filterExpressionType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags.
* For SQL92 filterExpressionType, use SQL92 expression like "a > 5 AND b < 10".
*/
String filterExpressionType() default "tag";
String filterExpressionType() default TAG_PLACEHOLDER;

/**
* The load balancing group for the simple consumer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ private SimpleConsumerInfo createConsumer(
String endPoints = resolvePlaceholders(annotation.endpoints(), simpleConsumer.getEndpoints());
String namespace = resolvePlaceholders(annotation.namespace(), simpleConsumer.getNamespace());
String tag = resolvePlaceholders(annotation.tag(), simpleConsumer.getTag());
String filterExpressionType = resolvePlaceholders(annotation.filterExpressionType(), simpleConsumer.getFilterExpressionType());
// Use filterExpressionType to determine the filter expression type, similar to rocketmq-spring-boot
String filterExpressionType = annotation.filterExpressionType();
Duration requestTimeout = Duration.ofSeconds(annotation.requestTimeout());
int awaitDuration = annotation.awaitDuration();
Boolean sslEnabled = simpleConsumer.isSslEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ private DefaultListenerContainer createRocketMQListenerContainer(String name, Ob
container.setMaxCachedMessageCount(annotation.maxCachedMessageCount());
container.setConsumptionThreadCount(annotation.consumptionThreadCount());
container.setMaxCacheMessageSizeInBytes(annotation.maxCacheMessageSizeInBytes());
// Use selectorType to determine the filter expression type, similar to rocketmq-spring-boot
container.setType(annotation.filterExpressionType());
container.setSslEnabled(annotation.sslEnabled());
return container;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ public SimpleConsumerBuilder simpleConsumerBuilder(RocketMQProperties rocketMQPr
RocketMQProperties.SimpleConsumer simpleConsumer = rocketMQProperties.getSimpleConsumer();
final ClientServiceProvider provider = ClientServiceProvider.loadService();
String consumerGroup = simpleConsumer.getConsumerGroup();
FilterExpression filterExpression = RocketMQUtil.createFilterExpression(simpleConsumer.getTag(), simpleConsumer.getFilterExpressionType());
// Use filterExpressionType to determine the filter expression type, similar to rocketmq-spring-boot
String filterExpressionTypeValue = simpleConsumer.getFilterExpressionType() != null ? simpleConsumer.getFilterExpressionType() : "TAG";
String tagExpression = simpleConsumer.getTag() != null ? simpleConsumer.getTag() : "*";
FilterExpression filterExpression = RocketMQUtil.createFilterExpression(tagExpression, filterExpressionTypeValue.toLowerCase());
ClientConfiguration clientConfiguration = RocketMQUtil.createConsumerClientConfiguration(simpleConsumer);
SimpleConsumerBuilder simpleConsumerBuilder = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ public static class SimpleConsumer {
private int awaitDuration = 5;

/**
* Tag of consumer.
* Tag of consumer. Used for message filtering.
* For TAG filterExpressionType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags.
* For SQL92 filterExpressionType, use SQL92 expression like "a > 5 AND b < 10".
*/
private String tag;

Expand All @@ -200,9 +202,11 @@ public static class SimpleConsumer {
private int requestTimeout = 3;

/**
* The type of filter expression
* Control how to filter messages.
* For TAG filterExpressionType, use "*" to subscribe all messages, or use "tagA||tagB" for multiple tags.
* For SQL92 filterExpressionType, use SQL92 expression like "a > 5 AND b < 10".
*/
private String filterExpressionType = "tag";
private String filterExpressionType = "TAG";

/**
* Enable or disable the use of Secure Sockets Layer (SSL) for network transport.
Expand Down Expand Up @@ -308,7 +312,6 @@ public String toString() {
", tag='" + tag + '\'' +
", topic='" + topic + '\'' +
", requestTimeout=" + requestTimeout +
", filterExpressionType='" + filterExpressionType + '\'' +
", sslEnabled=" + sslEnabled +
", namespace='" + namespace + '\'' +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
Expand Down Expand Up @@ -74,9 +73,7 @@ public class DefaultListenerContainer implements InitializingBean,

String topic;

String type;

FilterExpressionType filterExpressionType;
String filterExpressionType;

Duration requestTimeout;

Expand All @@ -90,6 +87,8 @@ public class DefaultListenerContainer implements InitializingBean,

String namespace;

String type;

public String getName() {
return name;
}
Expand Down Expand Up @@ -179,40 +178,40 @@ public void setRequestTimeout(Duration requestTimeout) {
this.requestTimeout = requestTimeout;
}

public FilterExpressionType getFilterExpressionType() {
public String getFilterExpressionType() {
return filterExpressionType;
}

public void setFilterExpressionType(FilterExpressionType filterExpressionType) {
public void setFilterExpressionType(String filterExpressionType) {
this.filterExpressionType = filterExpressionType;
}

public int getMaxCachedMessageCount() {
return maxCachedMessageCount;
public Boolean getSslEnabled() {
return sslEnabled;
}

public void setMaxCachedMessageCount(int maxCachedMessageCount) {
this.maxCachedMessageCount = maxCachedMessageCount;
public void setSslEnabled(Boolean sslEnabled) {
this.sslEnabled = sslEnabled;
}

public int getMaxCacheMessageSizeInBytes() {
return maxCacheMessageSizeInBytes;
public String getNamespace() {
return namespace;
}

public void setMaxCacheMessageSizeInBytes(int maxCacheMessageSizeInBytes) {
this.maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes;
public void setNamespace(String namespace) {
this.namespace = namespace;
}

public int getConsumptionThreadCount() {
return consumptionThreadCount;
public RocketMQListener getMessageListener() {
return rocketMQListener;
}

public void setConsumptionThreadCount(int consumptionThreadCount) {
this.consumptionThreadCount = consumptionThreadCount;
public String getType() {
return type;
}

public RocketMQListener getMessageListener() {
return rocketMQListener;
public void setType(String type) {
this.type = type;
}

public void setMessageListener(RocketMQListener rocketMQListener) {
Expand All @@ -225,30 +224,44 @@ public RocketMQMessageListener getRocketMQMessageListener() {

public void setRocketMQMessageListener(RocketMQMessageListener rocketMQMessageListener) {
this.rocketMQMessageListener = rocketMQMessageListener;

this.accessKey = rocketMQMessageListener.accessKey();
this.secretKey = rocketMQMessageListener.secretKey();
this.endpoints = rocketMQMessageListener.endpoints();
this.topic = rocketMQMessageListener.topic();
this.tag = rocketMQMessageListener.tag();
this.filterExpressionType = rocketMQMessageListener.filterExpressionType();
this.sslEnabled = rocketMQMessageListener.sslEnabled();
this.consumerGroup = rocketMQMessageListener.consumerGroup();
this.requestTimeout = Duration.ofSeconds(rocketMQMessageListener.requestTimeout());
this.maxCachedMessageCount = rocketMQMessageListener.maxCachedMessageCount();
this.maxCacheMessageSizeInBytes = rocketMQMessageListener.maxCacheMessageSizeInBytes();
this.consumptionThreadCount = rocketMQMessageListener.consumptionThreadCount();
this.namespace = rocketMQMessageListener.namespace();
}

public String getType() {
return type;
public int getMaxCachedMessageCount() {
return maxCachedMessageCount;
}

public void setType(String type) {
this.type = type;
public void setMaxCachedMessageCount(int maxCachedMessageCount) {
this.maxCachedMessageCount = maxCachedMessageCount;
}

public Boolean getSslEnabled() {
return sslEnabled;
public int getMaxCacheMessageSizeInBytes() {
return maxCacheMessageSizeInBytes;
}

public void setSslEnabled(Boolean sslEnabled) {
this.sslEnabled = sslEnabled;
public void setMaxCacheMessageSizeInBytes(int maxCacheMessageSizeInBytes) {
this.maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes;
}

public String getNamespace() {
return namespace;
public int getConsumptionThreadCount() {
return consumptionThreadCount;
}

public void setNamespace(String namespace) {
this.namespace = namespace;
public void setConsumptionThreadCount(int consumptionThreadCount) {
this.consumptionThreadCount = consumptionThreadCount;
}

private void initRocketMQPushConsumer() {
Expand All @@ -258,10 +271,14 @@ private void initRocketMQPushConsumer() {
Assert.hasText(consumerGroup, "Property 'consumerGroup' is required");
Assert.hasText(topic, "Property 'topic' is required");
Assert.hasText(tag, "Property 'tag' is required");

// Use filterExpressionType directly instead of converting from
String filterExpressionTypeStr = this.getFilterExpressionType() != null ? this.getFilterExpressionType().toLowerCase() : "tag";

FilterExpression filterExpression = null;
final ClientServiceProvider provider = ClientServiceProvider.loadService();
if (StringUtils.hasLength(this.getTag())) {
filterExpression = RocketMQUtil.createFilterExpression(this.getTag(),this.getType());
filterExpression = RocketMQUtil.createFilterExpression(this.getTag(), filterExpressionTypeStr);
}
ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(this.getAccessKey(), this.getSecretKey(),
this.getEndpoints(), this.getRequestTimeout(), this.sslEnabled, this.namespace);
Expand Down Expand Up @@ -360,8 +377,7 @@ public String toString() {
", consumerGroup='" + consumerGroup + '\'' +
", tag='" + tag + '\'' +
", topic='" + topic + '\'' +
", type='" + type + '\'' +
", filterExpressionType=" + filterExpressionType +
", filterExpressionType='" + filterExpressionType + '\'' +
", requestTimeout=" + requestTimeout +
", maxCachedMessageCount=" + maxCachedMessageCount +
", maxCacheMessageSizeInBytes=" + maxCacheMessageSizeInBytes +
Expand Down
Loading