Conversation
* Add iteratorType option * Use ktypes enum for iterator type * Make deafult latest, solidify safety * Fix shard consumer tests
| // - if shard iterator is TRIM_HORIZON: use the full list of shard IDs, and don't do the bootstrap | ||
| // - if shard iterator is LATEST: use only open shard IDs, and do the bootstrap |
There was a problem hiding this comment.
should we expand the comment to explain each point further in a sense that we don't really check iterator type in this function
There was a problem hiding this comment.
Actually this comment should've been removed - it was a todo reminder that I missed 🤦
| return nil, fmt.Errorf("error loading shard IDs from kinesis: %v", err) | ||
| } | ||
|
|
||
| if k.config.iteratorType != ktypes.ShardIteratorTypeLatest { |
There was a problem hiding this comment.
so for iteratorType TRIM_HORIZON and AT_TIMESTAMP we would return all shards?
is it correct behaviour? or AT_TIMESTAMP is no longer a valid option and would be rejected?
There was a problem hiding this comment.
Correct the first time, for both TRIM_HORIZON and AT_TIMESTAMP we don't bootstrap, we preserve previous behaviour - which is to proceed to process all shards
| assert.Greater(t, foundBefore2, 0) | ||
| assert.Equal(t, foundBefore2, numberOfEventsToTest) |
There was a problem hiding this comment.
I believe assert.Greater(t, foundBefore2, 0) can go, seems like an expectation is that foundBefore2 should be equal to numberOfEventsToTest
| // // DEBUG: Print checkpoints table contents after client initialization | ||
| // checkpoints, err := loadCheckpoints(d, clients[0].checkpointTableName) | ||
| // require.NoError(t, err, "Error loading checkpoints for debugging") | ||
| // t.Logf("=== CHECKPOINTS AFTER CLIENT INITIALIZATION ===") | ||
| // for shardID, checkpoint := range checkpoints { | ||
| // seqNum := "nil" | ||
| // if checkpoint.SequenceNumber != nil { | ||
| // seqNum = *checkpoint.SequenceNumber | ||
| // } | ||
| // finished := "nil" | ||
| // if checkpoint.Finished != nil { | ||
| // finished = fmt.Sprintf("%d", *checkpoint.Finished) | ||
| // } | ||
| // ownerID := "nil" | ||
| // if checkpoint.OwnerID != nil { | ||
| // ownerID = *checkpoint.OwnerID | ||
| // } | ||
| // t.Logf("Shard %s: SequenceNumber=%s, Finished=%s, OwnerID=%s", | ||
| // shardID, seqNum, finished, ownerID) | ||
| // } | ||
| // t.Logf("=== END CHECKPOINTS DEBUG ===") |
There was a problem hiding this comment.
believe we don't need it now
There was a problem hiding this comment.
We don't. But in this project, there are a few places in tests where I've left commented out debugging code, because where we need to figure out a problem, it can be burdensome to come up with ways to explore the mechanics. This is one of those, but I should've added a comment to indicate as much.
50801f2 to
7a92d4b
Compare
|
Converting to draft - as discussed we'll take another approach in the short term, and revisit this idea later. |
7a92d4b to
3b9a1e0
Compare
c8f5d9b to
ec8fbc3
Compare
This PR fixes a critical issue with the implementation of configuring shard iterator type in release/1.7.0
The problem:
Previously, we handled the configuration of iterator type in the shard consumer. What we didn't realise is that this simple implementation uses the configured iterator for every new shard - even those that have been created during a scaling action.
So, any data for a new shard that arrived before we began consuming would be lost.
This problem is demonstrated in the test changes implemented in the first three commits - (TestSplit and TestShardsMerged). For iterator type TRIM_HORIZON they're fine, for LATEST they lose data where they shouldn't.
The solution:
The original kinsumer implementation had a mechanism whereby we could use LATEST if we amend the DDB table to have a sequenceNumber of "LATEST". I reverted the consumer logic back to this mechanism, and then implemented a "bootstrap" mechnanism.
It works as follows:
Some details/considerations:
Clients use the metadata register to get the list of shards to allocate for consumption. This is normally managed by leader actions - those are paused until we register shards now.
Checkpoints are used by the leader to determine when a shard is finished, and by clients to determine where in a shard to start.
I attempted to use batch writes to avoid spamming DDB when there are a lot of shards - but conention cannot be avoided (even by putting all the logic in leader actions), so I reverted back to using single conditional writes, and added a mechanism to reduce the scope for contention