Conversation
Signed-off-by: Darkheir <raphael.cohen@sekoia.io>
There was a problem hiding this comment.
Pull request overview
This PR sets the Kafka consumer client.id based on the running node’s ID to improve observability when correlating partition assignments/consumer activity in Kafka tooling.
Changes:
- Thread the node id into the Kafka consumer creation path.
- Set Kafka
client.idon the rdkafkaClientConfigusing the node id.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| .set("group.id", &group_id) | ||
| .set("client.id", node_id.as_str()) | ||
| .set_log_level(log_level) |
There was a problem hiding this comment.
client.id is being set unconditionally, which will override any client.id provided by users via params.client_params. That’s a behavioral change that can break existing Kafka quota/monitoring setups. Consider only setting a default client.id when it is not already present in client_params, and leaving the user-specified value intact when provided.
| .set("group.id", &group_id) | ||
| .set("client.id", node_id.as_str()) | ||
| .set_log_level(log_level) |
There was a problem hiding this comment.
Using only node_id for client.id is not sufficient to uniquely identify a consumer instance when multiple indexing pipelines/sources run on the same node (e.g., multiple pipelines per (index_uid, source_id) are allowed). This can still make it hard to associate partitions/metrics to a specific consumer. Consider incorporating additional identifiers (such as source_id and/or pipeline_uid/group_id) into the default client.id value so each consumer instance can be distinguished.
There was a problem hiding this comment.
We want to be able to associate a partition to its indexer, not its pipeline
| .set("group.id", &group_id) | ||
| .set("client.id", node_id.as_str()) | ||
| .set_log_level(log_level) |
There was a problem hiding this comment.
There’s no test covering the new defaulting behavior for client.id. Since create_consumer returns a ClientConfig, consider adding a unit test that asserts the produced native config includes the expected client.id (and that an explicit client.id in client_params is preserved if that is the intended behavior).
Description
Set the Kafka
client.idattribute with the node id to make it easier to associate partitions with their consumer.