From bc67306bb59a6720ded4a5a6432120ece04f0908 Mon Sep 17 00:00:00 2001 From: Quan Date: Sun, 28 Sep 2025 14:03:14 +0800 Subject: [PATCH 1/6] support lite topic --- apache/rocketmq/v2/definition.proto | 31 +++++++++++++++++++++++++++++ apache/rocketmq/v2/service.proto | 27 +++++++++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/apache/rocketmq/v2/definition.proto b/apache/rocketmq/v2/definition.proto index 468c410..56ab393 100644 --- a/apache/rocketmq/v2/definition.proto +++ b/apache/rocketmq/v2/definition.proto @@ -146,6 +146,9 @@ enum MessageType { // Messages that are transactional. Only committed messages are delivered to // subscribers. TRANSACTION = 4; + + // lite topic + LITE = 5; } enum DigestType { @@ -186,6 +189,7 @@ enum ClientType { PUSH_CONSUMER = 2; SIMPLE_CONSUMER = 3; PULL_CONSUMER = 4; + LITE_PUSH_CONSUMER = 5; } enum Encoding { @@ -270,6 +274,9 @@ message SystemProperties { // Information to identify whether this message is from dead letter queue. optional DeadLetterQueue dead_letter_queue = 20; + + // lite topic + optional string lite_topic = 21; } message DeadLetterQueue { @@ -348,6 +355,8 @@ enum Code { ILLEGAL_POLLING_TIME = 40018; // Offset is illegal. ILLEGAL_OFFSET = 40019; + // Format of lite topic is illegal. + ILLEGAL_LITE_TOPIC = 40020; // Generic code indicates that the client request lacks valid authentication // credentials for the requested resource. @@ -389,6 +398,10 @@ enum Code { // Requests are throttled. TOO_MANY_REQUESTS = 42900; + // LiteTopic related quota exceeded + LITE_TOPIC_QUOTA_EXCEEDED = 42901; + LITE_SUBSCRIPTION_QUOTA_EXCEEDED = 42902; + // Generic code for the case that the server is unwilling to process the request because its header fields are too large. // The request may be resubmitted after reducing the size of the request header fields. REQUEST_HEADER_FIELDS_TOO_LARGE = 43100; @@ -548,6 +561,24 @@ message Subscription { // Long-polling timeout for `ReceiveMessageRequest`, which is essential for // push consumer. optional google.protobuf.Duration long_polling_timeout = 5; + + // Only lite push consumer + // client-side lite subscription quota limit + optional int32 lite_subscription_quota = 6; + // Only lite push consumer + // Maximum length limit for lite topic + optional int32 max_lite_topic_size = 7; +} + +enum LiteSubscriptionAction { + // incremental add + INCREMENTAL_ADD = 0; + // incremental remove + INCREMENTAL_REMOVE = 1; + // all add + ALL_ADD = 3; + // add remove + ALL_REMOVE = 4; } message Metric { diff --git a/apache/rocketmq/v2/service.proto b/apache/rocketmq/v2/service.proto index 18db185..7f8186e 100644 --- a/apache/rocketmq/v2/service.proto +++ b/apache/rocketmq/v2/service.proto @@ -114,6 +114,7 @@ message ReceiveMessageResponse { message AckMessageEntry { string message_id = 1; string receipt_handle = 2; + optional string lite_topic = 3; } message AckMessageRequest { @@ -148,6 +149,7 @@ message ForwardMessageToDeadLetterQueueRequest { string message_id = 4; int32 delivery_attempt = 5; int32 max_delivery_attempts = 6; + optional string lite_topic = 7; } message ForwardMessageToDeadLetterQueueResponse { Status status = 1; } @@ -193,6 +195,10 @@ message RecoverOrphanedTransactionCommand { string transaction_id = 2; } +message NotifyUnsubscribeLiteCommand { + string liteTopic = 1; +} + message TelemetryCommand { optional Status status = 1; @@ -221,6 +227,8 @@ message TelemetryCommand { // Request client to reconnect server use the latest endpoints. ReconnectEndpointsCommand reconnect_endpoints_command = 8; + + NotifyUnsubscribeLiteCommand notify_unsubscribe_lite_command = 9; } } @@ -311,6 +319,21 @@ message RecallMessageResponse { string message_id = 2; } +message SyncLiteSubscriptionRequest { + LiteSubscriptionAction action = 1; + // bindTopic for lite push consumer + Resource topic = 2; + // consumer group + Resource group = 3; + // lite subscription set of lite topics + repeated string liteTopicSet = 4; + optional int64 version = 5; +} + +message SyncLiteSubscriptionResponse { + Status status = 1; +} + // For all the RPCs in MessagingService, the following error handling policies // apply: // @@ -440,4 +463,8 @@ service MessagingService { // for normal message, not supported for now. rpc RecallMessage(RecallMessageRequest) returns (RecallMessageResponse) { } + + // Sync lite subscription info, lite push consumer only + rpc SyncLiteSubscription(SyncLiteSubscriptionRequest) returns (SyncLiteSubscriptionResponse) {} + } \ No newline at end of file From 8a77c8fbb7a8268be4d348054850480d4766a091 Mon Sep 17 00:00:00 2001 From: Quan Date: Sun, 28 Sep 2025 14:08:13 +0800 Subject: [PATCH 2/6] update version to 2.1.0 --- java/VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/VERSION b/java/VERSION index b9d2bdf..50aea0e 100644 --- a/java/VERSION +++ b/java/VERSION @@ -1 +1 @@ -2.0.5 \ No newline at end of file +2.1.0 \ No newline at end of file From 3da74ad1ecbc18eb70628093ced99917caf31eb6 Mon Sep 17 00:00:00 2001 From: Quan Date: Sun, 28 Sep 2025 14:51:57 +0800 Subject: [PATCH 3/6] add comments --- apache/rocketmq/v2/service.proto | 1 + 1 file changed, 1 insertion(+) diff --git a/apache/rocketmq/v2/service.proto b/apache/rocketmq/v2/service.proto index 7f8186e..7ab1f75 100644 --- a/apache/rocketmq/v2/service.proto +++ b/apache/rocketmq/v2/service.proto @@ -228,6 +228,7 @@ message TelemetryCommand { // Request client to reconnect server use the latest endpoints. ReconnectEndpointsCommand reconnect_endpoints_command = 8; + // Request client to unsubscribe lite topic. NotifyUnsubscribeLiteCommand notify_unsubscribe_lite_command = 9; } } From fe81a6227405b4742864ea21e39c12b3b877fa00 Mon Sep 17 00:00:00 2001 From: Quan Date: Fri, 10 Oct 2025 09:57:11 +0800 Subject: [PATCH 4/6] add ClientType LITE_SIMPLE_CONSUMER Change-Id: I7d6abc466ad153d5fe3a9c0fa40d40886ba7a203 --- apache/rocketmq/v2/definition.proto | 1 + 1 file changed, 1 insertion(+) diff --git a/apache/rocketmq/v2/definition.proto b/apache/rocketmq/v2/definition.proto index 56ab393..b7ff127 100644 --- a/apache/rocketmq/v2/definition.proto +++ b/apache/rocketmq/v2/definition.proto @@ -190,6 +190,7 @@ enum ClientType { SIMPLE_CONSUMER = 3; PULL_CONSUMER = 4; LITE_PUSH_CONSUMER = 5; + LITE_SIMPLE_CONSUMER = 6; } enum Encoding { From 33cc7be43f997668ae16f47130e29ab516d96ae4 Mon Sep 17 00:00:00 2001 From: Quan Date: Fri, 10 Oct 2025 10:40:18 +0800 Subject: [PATCH 5/6] Rename field names to comply with conventions. Change-Id: Ide23812ed2df0b7a7c51605b05faf0a7efd332c2 --- apache/rocketmq/v2/definition.proto | 9 +++++---- apache/rocketmq/v2/service.proto | 4 ++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/apache/rocketmq/v2/definition.proto b/apache/rocketmq/v2/definition.proto index b7ff127..757d97f 100644 --- a/apache/rocketmq/v2/definition.proto +++ b/apache/rocketmq/v2/definition.proto @@ -566,6 +566,7 @@ message Subscription { // Only lite push consumer // client-side lite subscription quota limit optional int32 lite_subscription_quota = 6; + // Only lite push consumer // Maximum length limit for lite topic optional int32 max_lite_topic_size = 7; @@ -573,13 +574,13 @@ message Subscription { enum LiteSubscriptionAction { // incremental add - INCREMENTAL_ADD = 0; + PARTIAL_ADD = 0; // incremental remove - INCREMENTAL_REMOVE = 1; + PARTIAL_REMOVE = 1; // all add - ALL_ADD = 3; + COMPLETE_ADD = 2; // add remove - ALL_REMOVE = 4; + COMPLETE_REMOVE = 3; } message Metric { diff --git a/apache/rocketmq/v2/service.proto b/apache/rocketmq/v2/service.proto index 7ab1f75..136b3ee 100644 --- a/apache/rocketmq/v2/service.proto +++ b/apache/rocketmq/v2/service.proto @@ -196,7 +196,7 @@ message RecoverOrphanedTransactionCommand { } message NotifyUnsubscribeLiteCommand { - string liteTopic = 1; + string lite_topic = 1; } message TelemetryCommand { @@ -327,7 +327,7 @@ message SyncLiteSubscriptionRequest { // consumer group Resource group = 3; // lite subscription set of lite topics - repeated string liteTopicSet = 4; + repeated string lite_topic_set = 4; optional int64 version = 5; } From a1a955f766804dcaefbe9ef59a30e102c08c174d Mon Sep 17 00:00:00 2001 From: Quan Date: Mon, 13 Oct 2025 16:35:51 +0800 Subject: [PATCH 6/6] remove comments Change-Id: I4df0503e361102c87f7f70afe4e5d88741d84e39 --- apache/rocketmq/v2/definition.proto | 4 ---- 1 file changed, 4 deletions(-) diff --git a/apache/rocketmq/v2/definition.proto b/apache/rocketmq/v2/definition.proto index 757d97f..2513bac 100644 --- a/apache/rocketmq/v2/definition.proto +++ b/apache/rocketmq/v2/definition.proto @@ -573,13 +573,9 @@ message Subscription { } enum LiteSubscriptionAction { - // incremental add PARTIAL_ADD = 0; - // incremental remove PARTIAL_REMOVE = 1; - // all add COMPLETE_ADD = 2; - // add remove COMPLETE_REMOVE = 3; }