From c92a9f543f115b176dfc10f5f11564acc92229e0 Mon Sep 17 00:00:00 2001 From: Michele Azzolari Date: Thu, 3 Jul 2025 10:54:10 +0200 Subject: [PATCH 1/2] feat: expose on/off methods to consumer/publisher --- src/interface.ts | 13 ++++++++++++- src/kafka/kafka-consumer.ts | 28 +++++++++++++++++++++++++++- src/kafka/kafka-producer.ts | 26 +++++++++++++++++++++++++- 3 files changed, 64 insertions(+), 3 deletions(-) diff --git a/src/interface.ts b/src/interface.ts index 7492997..7849c79 100644 --- a/src/interface.ts +++ b/src/interface.ts @@ -1,14 +1,25 @@ -import { EachMessageHandler, ProducerRecord } from 'kafkajs'; +import { + ConsumerEvents, + EachMessageHandler, + InstrumentationEvent, + ProducerEvents, + ProducerRecord, + ValueOf +} from 'kafkajs'; export interface Consumer { connect(): Promise; setCallback(func: EachMessageHandler): void; disconnect(): Promise; subscribe(topics: string[], autoCommit: boolean, fromBeginning: boolean): Promise; + on(event: ValueOf, listener: (event: InstrumentationEvent) => void): void; + off(event: ValueOf): void; } export interface Producer { send(message: ProducerRecord): Promise; connect(): Promise; disconnect(): Promise; + on(event: ValueOf, listener: (event: InstrumentationEvent) => void): void; + off(event: ValueOf): void; } diff --git a/src/kafka/kafka-consumer.ts b/src/kafka/kafka-consumer.ts index 943d6e1..267bafd 100644 --- a/src/kafka/kafka-consumer.ts +++ b/src/kafka/kafka-consumer.ts @@ -1,4 +1,13 @@ -import { Consumer, EachMessageHandler, EachMessagePayload, TopicPartitionOffsetAndMetadata } from 'kafkajs'; +import { + Consumer, + ConsumerEvents, + EachMessageHandler, + EachMessagePayload, + InstrumentationEvent, + RemoveInstrumentationEventListener, + TopicPartitionOffsetAndMetadata, + ValueOf +} from 'kafkajs'; import { Consumer as GenericConsumer } from '../interface'; import { getLogger } from '@fluidware-it/saddlebag'; @@ -9,6 +18,8 @@ export class KafkaConsumer implements GenericConsumer { private connected = false; + private eventsListener: Record>[]> = {}; + private callback: EachMessageHandler = () => { throw new Error('EachMessagePayload must be override before connect() method call'); }; @@ -49,6 +60,21 @@ export class KafkaConsumer implements GenericConsumer { this.logger.info('connected'); } + public on(event: ValueOf, listener: (event: InstrumentationEvent) => void): void { + const off = this.consumer.on(event, listener); + if (!this.eventsListener[event]) { + this.eventsListener[event] = []; + } + this.eventsListener[event].push(off); + } + + public off(event: ValueOf) { + if (this.eventsListener[event]) { + this.eventsListener[event].forEach(off => off()); + delete this.eventsListener[event]; + } + } + public async disconnect(): Promise { if (!this.connected) return true; try { diff --git a/src/kafka/kafka-producer.ts b/src/kafka/kafka-producer.ts index 6862419..e262f47 100644 --- a/src/kafka/kafka-producer.ts +++ b/src/kafka/kafka-producer.ts @@ -1,4 +1,11 @@ -import { Producer, ProducerRecord } from 'kafkajs'; +import { + InstrumentationEvent, + Producer, + ProducerEvents, + ProducerRecord, + RemoveInstrumentationEventListener, + ValueOf +} from 'kafkajs'; import { Producer as GenericProducer } from '../interface'; import { getLogger } from '@fluidware-it/saddlebag'; @@ -9,6 +16,8 @@ export class KafkaProducer implements GenericProducer { private connected = false; + private eventsListener: Record>[]> = {}; + constructor(producer: Producer) { this.producer = producer; } @@ -33,6 +42,21 @@ export class KafkaProducer implements GenericProducer { } } + public on(event: ValueOf, listener: (event: InstrumentationEvent) => void): void { + const off = this.producer.on(event, listener); + if (!this.eventsListener[event]) { + this.eventsListener[event] = []; + } + this.eventsListener[event].push(off); + } + + public off(event: ValueOf): void { + if (this.eventsListener[event]) { + this.eventsListener[event].forEach(off => off()); + delete this.eventsListener[event]; + } + } + public async connect(): Promise { if (!this.connected) { await this.producer.connect(); From da86f09a1267c4f17583e6d8b0c2385bd40d31a3 Mon Sep 17 00:00:00 2001 From: Michele Azzolari Date: Thu, 3 Jul 2025 12:20:33 +0200 Subject: [PATCH 2/2] chore(docs): update ENVIRONMENT.md --- ENVIRONMENT.md | 1 + 1 file changed, 1 insertion(+) diff --git a/ENVIRONMENT.md b/ENVIRONMENT.md index a9b93ef..8e7fa35 100644 --- a/ENVIRONMENT.md +++ b/ENVIRONMENT.md @@ -5,6 +5,7 @@ | -------------------------------------- | -------- | -------------- | -------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | KAFKA_${prefix}BROKERS | string[] | localhost:9092 | | "prefix" is not required, can be used to have multiple kafka configurations: i.e: KAFKA_INSTANCE_A_BROKERS=kafka-a-01:9092,kafka-a-02:9092, KAFKA_INSTANCE_B_BROKERS=kafka-b-01:9092,kafka-b-02:9092 | | KAFKA_${prefix}CLIENT_ID | string | _function_ | | default to `hostname()` | +| KAFKA_${prefix}LOG_LEVEL | string | NOTHING | | possible values: 'NOTHING', 'ERROR', 'WARN', 'INFO', 'DEBUG' | | KAFKA_${prefix}SSL_CA_PATH | string | | | | | KAFKA_${prefix}SSL_CERT_PEM_PATH | string | | | | | KAFKA_${prefix}SSL_KEY_PEM_PATH | string | | | |