Merged
Conversation
* Initial metrics implementation for tracking record in memory * Buffer metrics before sending * Fix metrics to account for pre-buffer in-memory data * Add a metric which measures memory backlog * Use int64 everywhere for recordsInMemory metric * Use channel instead of atomics to avoid possible contention * Log warnings if we drop metrics * Combine the updates to reduce channel traffic overhead * Reduce channel activity/overhead with a batching mechanic * Use a smaller channel and log immediately if blocked * Buffer metrics every second * Add test of new metrics implementation * Add config to filter metrics * Make metrics config cleaner (#35) * Make metrics config cleaner * go fmt
Piotr Poniedziałek (pondzix)
approved these changes
Sep 11, 2025
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR introduces features related to management of the amount of data we pull into memory via kinsumer:
Configurable limit on getRecords requests
Self explanatory
maxConrrentShards setting
Uses a semaphore to limit the number of shards we pull data from at once. Previous to this, when there's a bottleneck on delivering data to the client, we would continue to pull data from the stream, which would cause excessive memory consumption.
This feature allows us to limit the number of shards one pod pulls from - where there is no bottleneck we will cycle through shards continuously. When there is one, we will have a limit on the amount of shards we deal with, and therefore the memory we consume.
Buffered data metric
Adds metrics recroding the number of records in memory after pulling from kinesis, and the approximate size in memory of those records.
Additionally, allows configuration of metrics in order to cost-control, since we don't necessarily need all the metrics that kinsumer outputs.
The other commits are just to factor things better and add tests - they're later in the chain to avoid the need for a rebase, since lots of this work was done concurrently.