Skip to content

KAFKA-20066: Implement KIP-1251: Assignment epochs for consumer groups [3/N]#21594

Open
lucliu1108 wants to merge 22 commits intoapache:trunkfrom
lucliu1108:KAFKA-20066-3
Open

KAFKA-20066: Implement KIP-1251: Assignment epochs for consumer groups [3/N]#21594
lucliu1108 wants to merge 22 commits intoapache:trunkfrom
lucliu1108:KAFKA-20066-3

Conversation

@lucliu1108
Copy link
Contributor

@lucliu1108 lucliu1108 commented Feb 26, 2026

Summary

This PR changes the offset commit fencing logic, as a follow-up to
#21558 change on assignment
structure to include epoch information.

It introduces per-partition assignment epochs to relax the strict member
epoch validation for consumer group offset commits. When receiving an
offset commit request that includes the client-side member epoch and a
member ID, we previously require checking

clientEpoch == brokerEpoch

for a valid offset commit, which could lead to false fencing.

We now allow a relaxed offset commit check using an assignment epoch for
each assigned partition and each member,

assignmentEpoch <= clientEpoch <= brokerEpoch

This prevents false rejections of legitimate offset commits when a
member's epoch is bumped but the client hasn't received the update yet.

Reviewers: Sean Quah squah@confluent.io, Lucas Brutschy
lbrutschy@confluent.io

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends consumer-group assignment state to carry per-partition assignment epochs and updates offset-commit fencing to use a relaxed, per-partition epoch check, reducing false fencing when the member epoch advances before the client observes it.

Changes:

  • Add assignment-epoch information to consumer-group assignment records and in-memory member assignment structures.
  • Update consumer-group offset-commit validation to accept commits when assignmentEpoch <= clientEpoch <= brokerEpoch, with per-partition validation.
  • Update tests and benchmarks to use the epoch-annotated assignment structures and new helpers.

Reviewed changes

Copilot reviewed 21 out of 22 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java Update benchmark to use the new consumer-group spec builder helper.
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ShareGroupAssignorBenchmark.java Switch benchmark to share-group specific spec builder helper.
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java Update benchmark to use the new consumer-group spec builder helper.
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/CurrentAssignmentBuilderBenchmark.java Update benchmark assignments to include per-partition epochs.
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java Split group spec creation into consumer/share variants and adapt to epoch-annotated assignments.
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java Update assertions and expected assignments to include assignment epochs.
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java Add/adjust tests for assignment-epoch-based offset commit validation.
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java Update member tests for epoch-annotated assignments and add epoch accessor coverage.
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java Update classic-group conversion tests to use epoch-annotated assignments.
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java Update extensive coordinator tests for epoch-annotated assignments and record helpers.
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java Update record helper tests to validate assignment epoch arrays in records.
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java Add helpers to convert between plain assignments and epoch-annotated assignments.
group-coordinator/src/main/resources/common/message/ConsumerGroupCurrentMemberAssignmentValue.json Add tagged AssignmentEpochs to TopicPartitions in the coordinator record schema.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMember.java Re-home assignedPartitions into ShareGroupMember after base-class removal and add change-detection helper.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java Preserve/propagate per-partition assignment epochs through reconciliation and state transitions.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java Change assignment representation to topicId -> partitionId -> epoch, add epoch getters and reset-to-zero helper.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java Implement relaxed per-partition assignment-epoch validation for offset commits.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroupMember.java Remove assignedPartitions from the base class; subclasses own their assignment representation.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java Add parsing for epoch-annotated assignment records and a string formatter for epoch assignments.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java Update assignment-diffing and logging to use epoch-annotated assignments; adjust serialization to consumer protocol.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java Write assignment epochs into current-assignment records (sorted partitions + aligned epoch arrays).
clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java Adjust assignment creation helper to accept epoch-annotated maps while emitting partitions only.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 669 to 674
// If the commit is not transactional and the member uses the new consumer protocol (KIP-848),
// the member should be using the OffsetCommit API version >= 9.
if (!isTransactional && !member.useClassicProtocol() && apiVersion < 9) {
throw new UnsupportedVersionException("OffsetCommit version 9 or above must be used " +
"by members using the modern group protocol");
}
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The modern consumer-group offset-commit path is allowed for apiVersion >= 9, but OffsetCommitRequest only carries TopicId starting in version 10 (v0-9 use topic name). Since the per-partition assignment-epoch validator keys assignments by topicId, version 9 requests will typically have ZERO_UUID here and get incorrectly fenced as “not assigned”. Consider requiring apiVersion >= 10 for non-transactional commits from modern (non-classic) members, or add a fallback that resolves topicId from topicName before looking up assignment epochs.

Copilot uses AI. Check for mistakes.
Comment on lines +2184 to +2193
// client epoch = broker epoch
if (version >= 9) {
CommitPartitionValidator validator = group.validateOffsetCommit(
"member-id", "", memberEpoch, isTransactional, version
);
assertDoesNotThrow(() -> validator.validate(topicName, topicId, partitionId));
} else {
assertThrows(UnsupportedVersionException.class, () ->
group.validateOffsetCommit("member-id", "", memberEpoch, isTransactional, version));
}
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These new offset-commit validation tests treat OffsetCommit versions >= 9 as having a usable topicId. However, request v9 still uses topic names (TopicId is only present from v10+), so production code will likely see ZERO_UUID for v9. To avoid masking the real behavior, update the version gate/expectations (e.g., only run assignment-epoch validation assertions for v10+, and expect UnsupportedVersionException for v9 when using the modern protocol, or explicitly validate the ZERO_UUID behavior if you add a topic-name fallback).

Copilot uses AI. Check for mistakes.
"about": "The partition Ids." },
{ "name": "AssignmentEpochs", "versions": "0+", "nullableVersions": "0+",
"taggedVersions": "0+", "tag": 0, "type": "[]int32", "default": null,
"about": "The epoch at which the partition was assigned to the member. Used to fence zombie commits requests. Of the same length as Partitions." }
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor grammar: “zombie commits requests” should be “zombie commit requests”.

Suggested change
"about": "The epoch at which the partition was assigned to the member. Used to fence zombie commits requests. Of the same length as Partitions." }
"about": "The epoch at which the partition was assigned to the member. Used to fence zombie commit requests. Of the same length as Partitions." }

Copilot uses AI. Check for mistakes.
@lucasbru
Copy link
Member

@lucliu1108 When stacking your PRs, I think you can actually stack them by making the base not apache/trunk but your previous branch (KAFKA-20066-2).

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, Lucy!

I left two comments, but in this form it's a bit difficult to review since it's hard to tell where the old PRs end and the new PR starts.

One option is to chose the previous PR as based instead of apache:trunk.

Another one is to squash the new PR into a single commit, so that I can just review the last commit.

) {
return (topicName, topicId, partitionId) -> {
// Search for the partition in the assigned partitions, then in partitions pending revocation.
Integer assignmentEpoch = member.assignmentEpoch(topicId, partitionId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to handle ZERO_UUID here -- if the request uses a version without a topic ID?

List<Integer> partitions = tp.partitions();
List<Integer> epochs = tp.assignmentEpochs();
for (int i = 0; i < partitions.size(); i++) {
int epoch = (epochs != null && epochs.size() > i) ? epochs.get(i) : adjustedDefaultEpoch;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we shouldn't throw an IllegalStateException if epochs.size() is incorrect. That would just be a broken record at that point.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants