diff --git a/README.md b/README.md index 0a3adbb..25ab169 100644 --- a/README.md +++ b/README.md @@ -148,6 +148,9 @@ KPipe uses **Java Virtual Threads** (Project Loom) for high-concurrency message - **Efficient Resource Reuse**: Heavy objects like `Schema.Parser`, `ByteArrayOutputStream`, and Avro encoders are cached per virtual thread using `ScopedValue`, which is significantly more lightweight than `ThreadLocal`. + - **Optimization**: `ScopedValue` allows KPipe to share these heavy resources across all transformations in a single + pipeline without the memory leak risks or scalability bottlenecks of `ThreadLocal` in a virtual-thread-per-record + model. - **Thread-Per-Record**: Each message is processed in its own virtual thread, allowing KPipe to scale to millions of concurrent operations without the overhead of complex thread pools. @@ -165,7 +168,18 @@ KPipe implements a **Lowest Pending Offset** strategy to ensure reliability even result in some records being re-processed (standard "at-least-once" behavior), it guarantees no message is ever skipped. -### 4. External Offset Management +### 4. Parallel vs. Sequential Processing + +KPipe supports two modes of execution depending on your ordering and throughput requirements: + +- **Parallel Mode (Default)**: Best for stateless transformations (enrichment, masking). High throughput via virtual + threads. Offsets are committed based on the **lowest pending offset** to ensure no gaps. +- **Sequential Mode (`.withSequentialProcessing(true)`)**: Best for stateful transformations where order per partition + is critical (e.g., balance updates, sequence-dependent events). In this mode, only one message per partition is + processed at a time. Backpressure is supported and operates by monitoring the **consumer lag** (the difference between + the partition end-offset and the consumer's current position). + +### 5. External Offset Management While Kafka-based offset storage is the default, KPipe supports external storage (e.g., PostgreSQL) for **exactly-once processing** or specific architectural needs. @@ -216,19 +230,57 @@ KPipe provides a robust, multi-layered error handling mechanism: ### 6. Backpressure When a downstream sink (database, HTTP API, another Kafka topic) is slow, KPipe can automatically pause Kafka polling to -prevent unbounded in-flight message growth. +prevent unbounded resource consumption or excessive lag. Backpressure uses **two configurable watermarks** (hysteresis) to avoid rapid pause/resume oscillation: -- **High watermark** — pause Kafka polling when in-flight messages reach this count -- **Low watermark** — resume Kafka polling when in-flight drops to or below this count (hysteresis) +- **High watermark** — pause Kafka polling when the monitored metric reaches this value. +- **Low watermark** — resume Kafka polling when the metric drops to or below this value. + +#### Backpressure Strategies + +KPipe automatically selects the optimal backpressure strategy based on your processing mode: + +| Mode | Strategy | Metric Monitored | Use Case | +| :--------------------- | :--------------- | :----------------------------- | :------------------------------------------------------------- | +| **Parallel** (Default) | **In-Flight** | Total active virtual threads | Prevent memory exhaustion from too many concurrent tasks. | +| **Sequential** | **Consumer Lag** | Total unread messages in Kafka | Prevent the consumer from falling too far behind the producer. | + +##### 1. In-Flight Strategy (Parallel Mode) + +In parallel mode, multiple messages are processed concurrently using Virtual Threads. The backpressure controller +monitors the number of messages currently "in-flight" (started but not yet finished). + +- **High Watermark Default**: 10,000 +- **Low Watermark Default**: 7,000 + +##### 2. Consumer Lag Strategy (Sequential Mode) + +In sequential mode, messages are processed one by one to maintain strict ordering. Since only one message is ever +in-flight, KPipe instead monitors the **total consumer lag** across all assigned partitions. + +The lag is calculated using the formula: + +``` +lag = Σ (endOffset - position) +``` + +Where: + +- `endOffset`: The highest available offset in a partition. +- `position`: The offset of the next record to be fetched by this consumer. + +- **High Watermark Default**: 10,000 +- **Low Watermark Default**: 7,000 + +#### Configuration ```java final var consumer = KPipeConsumer.builder() .withProperties(kafkaProps) .withTopic("events") .withProcessor(pipeline) - // Pause at 10,000 in-flight; resume at 7,000 (defaults) + // Enable backpressure with default watermarks (10k / 7k) .withBackpressure() // Or configure explicit watermarks: // .withBackpressure(5_000, 3_000) @@ -237,10 +289,6 @@ final var consumer = KPipeConsumer.builder() **Backpressure is disabled by default** and opt-in via `.withBackpressure()`. -**Constraints:** - -- Not compatible with sequential processing mode (in-flight is always ≤ 1 in that mode) - **Observability:** backpressure events are logged (WARNING on pause, INFO on resume) and tracked via two dedicated metrics: `backpressurePauseCount` and `backpressureTimeMs`. @@ -475,7 +523,7 @@ You can create custom sinks using lambda expressions: ```java // Create a custom sink that writes to a database -MessageSink databaseSink = (record, processedValue) -> { +final MessageSink databaseSink = (record, processedValue) -> { try { // Parse the processed value final var data = new String(processedValue, StandardCharsets.UTF_8); @@ -489,9 +537,6 @@ MessageSink databaseSink = (record, processedValue) -> { log.log(Level.ERROR, "Failed to write message to database", e); } }; - -// Use the custom sink with a consumer -final var consumer = KPipeConsumer.builder().withMessageSink(databaseSink).build(); ``` ### Message Sink Registry @@ -558,8 +603,7 @@ graceful shutdown: ```java // Create a consumer runner with default settings -ConsumerRunner> runner = ConsumerRunner.builder(consumer) - .build(); +final var runner = ConsumerRunner.builder(consumer).build(); // Start the consumer runner.start(); @@ -620,7 +664,7 @@ The `ConsumerRunner` integrates with metrics reporting: ```java // Add multiple metrics reporters -ConsumerRunner> runner = ConsumerRunner.builder(consumer) +final var runner = ConsumerRunner.builder(consumer) .withMetricsReporters( List.of( ConsumerMetricsReporter.forConsumer(consumer::getMetrics), @@ -636,7 +680,7 @@ ConsumerRunner> runner = ConsumerRunner.builder(co The `ConsumerRunner` implements `AutoCloseable` for use with try-with-resources: ```java -try (ConsumerRunner> runner = ConsumerRunner.builder(consumer).build()) { +try (final var runner = ConsumerRunner.builder(consumer).build()) { runner.start(); // Application logic here // Runner will be automatically closed when exiting the try block @@ -746,7 +790,8 @@ export SHUTDOWN_TIMEOUT_SEC=5 ## Requirements -- Java 24+ +- **Java 24+** (Note: Ensure `--enable-preview` is used as `ScopedValue` and Virtual Thread optimizations continue to + evolve). - Gradle (for building the project) - [kcat](https://github.com/edenhill/kcat) (for testing) - Docker (for local Kafka setup) @@ -755,7 +800,8 @@ export SHUTDOWN_TIMEOUT_SEC=5 ## Testing -Follow these steps to test the KPipe Kafka Consumer: +Follow these steps to test the KPipe Kafka Consumer. KPipe includes a pre-configured `docker-compose.yaml` in the root +directory that starts a full local environment including Kafka, Zookeeper, and Confluent Schema Registry. ### Build and Run @@ -854,10 +900,10 @@ public enum StandardProcessors implements UnaryOperator> { SOURCE(JsonMessageProcessor.addFieldOperator("src", "app")); private final UnaryOperator> op; - StandardProcessors(UnaryOperator> op) { this.op = op; } + StandardProcessors(final UnaryOperator> op) { this.op = op; } @Override - public Map apply(Map t) { return op.apply(t); } + public Map apply(final Map t) { return op.apply(t); } } // Bulk register all enum constants @@ -873,7 +919,7 @@ The library provides a built-in `when()` method for conditional processing: ```java // Create a predicate that checks message type -Predicate isOrderMessage = (bytes) -> { +final Predicate isOrderMessage = (bytes) -> { // Logic to check if it's an order return true; }; @@ -886,6 +932,33 @@ Function conditionalPipeline = MessageProcessorRegistry.when( ); ``` +### Filtering Messages + +To skip a message in a pipeline, return `null` in your processor. KPipe will treat `null` as a signal to stop processing +the current record and will not send it to the sink. + +```java +registry.registerOperator(RegistryKey.json("filter"), map -> { + if ("internal".equals(map.get("type"))) { + return null; // Skip this message + } + return map; +}); +``` + +### Header Propagation + +You can access `ConsumerRecord` headers within a custom sink to propagate tracing or metadata: + +```java +MessageSink tracingSink = (record, processedValue) -> { + final var traceId = record.headers().lastHeader("X-Trace-Id"); + if (traceId != null) { + // Use traceId.value() for logging or downstream calls + } +}; +``` + ### Thread-Safety and Resource Management - Message processors should be stateless and thread-safe. diff --git a/lib/src/main/java/org/kpipe/consumer/BackpressureController.java b/lib/src/main/java/org/kpipe/consumer/BackpressureController.java index 7f1c5fb..c09e345 100644 --- a/lib/src/main/java/org/kpipe/consumer/BackpressureController.java +++ b/lib/src/main/java/org/kpipe/consumer/BackpressureController.java @@ -1,44 +1,97 @@ package org.kpipe.consumer; -/// Controls backpressure for a Kafka consumer by monitoring in-flight message counts and -/// deciding when to pause or resume consumption. +import java.util.Optional; +import java.util.function.LongSupplier; +import org.apache.kafka.clients.consumer.Consumer; + +/// Controls backpressure for a Kafka consumer by monitoring a metric (in-flight or lag) +/// and deciding when to pause or resume consumption. /// /// This is a pure decision module — it has no side effects and holds no mutable state. It /// implements hysteresis via two configurable thresholds: /// -/// * **High watermark** — when in-flight count reaches this value, the consumer should pause -/// * **Low watermark** — when in-flight count drops to this value (while paused), the consumer +/// * **High watermark** — when the metric reaches this value, the consumer should pause +/// * **Low watermark** — when the metric drops to this value (while paused), the consumer /// should resume /// -/// Example usage: -/// -/// ```java -/// final var controller = new BackpressureController(10_000, 7_000); -/// switch (controller.check(inFlightCount, isPaused)) { -/// case PAUSE -> consumer.pause(); -/// case RESUME -> consumer.resume(); -/// case NONE -> {} -/// } -/// ``` -/// -/// @param highWatermark the in-flight count at or above which the consumer should pause -/// @param lowWatermark the in-flight count at or below which the consumer should resume (must +/// @param highWatermark the metric value at or above which the consumer should pause +/// @param lowWatermark the metric value at or below which the consumer should resume (must /// be less than highWatermark) -public record BackpressureController(long highWatermark, long lowWatermark) { - /// The action that the consumer should take based on the current in-flight count. +/// @param strategy the strategy to use for calculating the metric +public record BackpressureController(long highWatermark, long lowWatermark, Strategy strategy) { + /// Creates a new BackpressureController with the same watermarks but a different strategy. + /// + /// @param strategy the new strategy to use + /// @return a new {@link BackpressureController} instance + public BackpressureController withStrategy(final Strategy strategy) { + return new BackpressureController(highWatermark, lowWatermark, strategy); + } + + /// The action that the consumer should take based on the current metric value. public enum Action { - /// Indicates the consumer should pause fetching new messages due to high in-flight count. + /// Indicates the consumer should pause fetching new messages. PAUSE, - /// Indicates the consumer should resume fetching messages as in-flight count is low. + /// Indicates the consumer should resume fetching messages. RESUME, - /// Indicates no action is needed; the consumer should maintain its current state. + /// Indicates no action is needed. NONE, } + /// Strategy for measuring the backpressure metric. + public interface Strategy { + /// Returns the current value of the metric. + /// + /// @param consumer the Kafka consumer to monitor + /// @return the current value of the metric + long getMetric(final Consumer consumer); + + /// Returns a human-readable name for the metric (e.g., "in-flight", "lag"). + /// + /// @return the name of the metric + String getName(); + } + + /// Creates a strategy that monitors Kafka consumer lag. + /// + /// @return a new {@link Strategy} instance + public static Strategy lagStrategy() { + return new Strategy() { + @Override + public long getMetric(final Consumer consumer) { + return calculateTotalLag(consumer); + } + + @Override + public String getName() { + return "lag"; + } + }; + } + + /// Creates a strategy that monitors an in-flight message count. + /// + /// @param inFlightSupplier the supplier of the current in-flight count + /// @return a new {@link Strategy} instance + public static Strategy inFlightStrategy(final LongSupplier inFlightSupplier) { + if (inFlightSupplier == null) throw new IllegalArgumentException("inFlightSupplier cannot be null"); + return new Strategy() { + @Override + public long getMetric(final Consumer consumer) { + return inFlightSupplier.getAsLong(); + } + + @Override + public String getName() { + return "in-flight"; + } + }; + } + /// Constructs a BackpressureController with the specified high and low watermarks. /// - /// @param highWatermark the in-flight count at or above which the consumer should pause - /// @param lowWatermark the in-flight count at or below which the consumer should resume + /// @param highWatermark the metric value at or above which the consumer should pause + /// @param lowWatermark the metric value at or below which the consumer should resume + /// @param strategy the strategy to use for calculating the metric public BackpressureController { if (highWatermark <= 0) throw new IllegalArgumentException("highWatermark must be positive"); if (lowWatermark < 0) throw new IllegalArgumentException("lowWatermark cannot be negative"); @@ -47,15 +100,65 @@ public enum Action { ); } - /// Determines the action to take based on the current in-flight count and pause state. + /// Determines the action to take based on the current state of the consumer. /// - /// @param inFlightCount the current number of in-flight messages + /// @param consumer the Kafka consumer to monitor /// @param currentlyPaused whether the consumer is currently paused /// @return the action to take: {@link Action#PAUSE}, {@link Action#RESUME}, or /// {@link Action#NONE} - public Action check(final long inFlightCount, final boolean currentlyPaused) { - if (!currentlyPaused && inFlightCount >= highWatermark) return Action.PAUSE; - if (currentlyPaused && inFlightCount <= lowWatermark) return Action.RESUME; + public Action check(final Consumer consumer, final boolean currentlyPaused) { + if (strategy == null) return Action.NONE; + final long currentValue = strategy.getMetric(consumer); + if (!currentlyPaused && currentValue >= highWatermark) return Action.PAUSE; + if (currentlyPaused && currentValue <= lowWatermark) return Action.RESUME; return Action.NONE; } + + /// Returns the current metric value. + /// + /// @param consumer the Kafka consumer to monitor + /// @return the current metric value + public long getMetric(final Consumer consumer) { + return strategy != null ? strategy.getMetric(consumer) : 0L; + } + + /// Returns the name of the metric being monitored. + /// + /// @return the name of the metric being monitored + public String getMetricName() { + return strategy != null ? strategy.getName() : "none"; + } + + /// Calculates the total lag across all assigned partitions for the given consumer. + /// + /// The formula for total lag is: + /// ``` + /// lag = Σ (endOffset - position) + /// ``` + /// where `endOffset` is the highest available offset in the partition and `position` is the + /// offset of the next record to be fetched by the consumer. + /// + /// @param consumer the Kafka consumer to calculate lag for + /// @return the total lag, or 0 if no partitions are assigned or an error occurs + public static long calculateTotalLag(final Consumer consumer) { + return Optional.ofNullable(consumer) + .map(c -> { + try { + final var assignment = c.assignment(); + if (assignment.isEmpty()) return 0L; + + final var endOffsets = c.endOffsets(assignment); + long totalLag = 0L; + for (final var tp : assignment) { + final long position = c.position(tp); + final long endOffset = endOffsets.getOrDefault(tp, position); + totalLag += Math.max(0, endOffset - position); + } + return totalLag; + } catch (final Exception e) { + return 0L; + } + }) + .orElse(0L); + } } diff --git a/lib/src/main/java/org/kpipe/consumer/KPipeConsumer.java b/lib/src/main/java/org/kpipe/consumer/KPipeConsumer.java index 1756db7..b781a99 100644 --- a/lib/src/main/java/org/kpipe/consumer/KPipeConsumer.java +++ b/lib/src/main/java/org/kpipe/consumer/KPipeConsumer.java @@ -329,7 +329,7 @@ public Builder withBackpressure() { /// less than highWatermark) /// @return This builder instance for method chaining public Builder withBackpressure(final long highWatermark, final long lowWatermark) { - this.backpressureController = new BackpressureController(highWatermark, lowWatermark); + this.backpressureController = new BackpressureController(highWatermark, lowWatermark, null); return this; } @@ -345,9 +345,12 @@ public KPipeConsumer build() { "Poll timeout must be positive" ); if (offsetManager != null || offsetManagerProvider != null) kafkaProps.setProperty("enable.auto.commit", "false"); - if (backpressureController != null && sequentialProcessing) throw new IllegalStateException( - "Backpressure is not compatible with sequential processing: in sequential mode in-flight count is always ≤ 1" - ); + if (backpressureController != null && sequentialProcessing) { + LOGGER.log( + System.Logger.Level.INFO, + "Sequential processing enabled with backpressure: switching to lag-based monitoring." + ); + } return new KPipeConsumer<>(this); } } @@ -381,12 +384,16 @@ public KPipeConsumer(final Builder builder) { : null; this.commandQueue = builder.commandQueue != null ? builder.commandQueue : new ConcurrentLinkedQueue<>(); this.rebalanceListener = builder.rebalanceListener != null ? builder.rebalanceListener : null; - this.backpressureController = builder.backpressureController; - initializeMetrics(); - } + this.backpressureController = + builder.backpressureController != null + ? builder.backpressureController.withStrategy( + this.sequentialProcessing + ? BackpressureController.lagStrategy() + : BackpressureController.inFlightStrategy(this.inFlightCount::get) + ) + : null; - private void initializeMetrics() { if (enableMetrics) { metrics.put(METRIC_MESSAGES_RECEIVED, new AtomicLong(0)); metrics.put(METRIC_MESSAGES_PROCESSED, new AtomicLong(0)); @@ -650,23 +657,68 @@ public void close() { return; // Already closed or closing } - // Create a tracker first to avoid missing in-flight messages - final var tracker = createTrackerIfEnabled(waitForMessagesTimeout.toMillis()); + final var waitForMessagesMs = waitForMessagesTimeout.toMillis(); + final var tracker = (waitForMessagesMs > 0 && enableMetrics) ? createMessageTracker() : null; // Signal shutdown - signalShutdown(); + pause(); + commandQueue.offer(new ConsumerCommand.Close()); // Wait for in-flight messages - waitForInFlightMessages(tracker, waitForMessagesTimeout.toMillis()); + Optional.ofNullable(tracker).ifPresent(t -> { + try { + final var inFlight = t.getInFlightMessageCount(); + if (inFlight > 0) { + LOGGER.log(Level.INFO, "Waiting for %d in-flight messages to complete".formatted(inFlight)); + t.waitForCompletion(waitForMessagesMs); + } + } catch (final Exception e) { + LOGGER.log(Level.WARNING, "Error waiting for in-flight messages", e); + } + }); // Wake up consumer and wait for thread termination - wakeupAndWaitForConsumerThread(threadTerminationTimeout.toMillis()); + Optional.ofNullable(kafkaConsumer).ifPresent(consumer -> { + try { + consumer.wakeup(); + } catch (final Exception e) { + LOGGER.log(Level.WARNING, "Error during consumer wakeup", e); + } + }); + + Optional.ofNullable(consumerThread.get()) + .filter(Thread::isAlive) + .ifPresent(thread -> { + try { + thread.join(threadTerminationTimeout.toMillis()); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.log(Level.WARNING, "Interrupted while waiting for consumer thread"); + } + }); // Shutdown executor - shutdownExecutor(executorTerminationTimeout.toMillis()); + try { + virtualThreadExecutor.shutdown(); + if (!virtualThreadExecutor.awaitTermination(executorTerminationTimeout.toMillis(), TimeUnit.MILLISECONDS)) { + LOGGER.log(Level.WARNING, "Not all processing tasks completed during shutdown"); + final var pending = virtualThreadExecutor.shutdownNow(); + LOGGER.log(Level.WARNING, "%d tasks were not processed".formatted(pending.size())); + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + virtualThreadExecutor.shutdownNow(); + LOGGER.log(Level.WARNING, "Interrupted while waiting for executor termination"); + } // Shutdown offset manager if enabled - closeOffsetManager(); + if (offsetManager != null) { + try { + offsetManager.close(); + } catch (final Exception e) { + LOGGER.log(Level.WARNING, "Error closing offset manager", e); + } + } // Ensure the state is set to CLOSED state.set(ConsumerState.CLOSED); @@ -726,84 +778,74 @@ protected void processRecord(final ConsumerRecord record) { if (enableMetrics) metrics.get(METRIC_MESSAGES_RECEIVED).incrementAndGet(); try { - for (int attempt = 0; attempt <= maxRetries; attempt++) { - if (attempt > 0) { - if (enableMetrics) metrics.get(METRIC_RETRIES).incrementAndGet(); - LOGGER.log( - Level.INFO, - "Retrying message at offset %d (attempt %d of %d)".formatted(record.offset(), attempt, maxRetries) - ); - - try { - Thread.sleep(retryBackoff.toMillis()); - } catch (final InterruptedException ie) { - Thread.currentThread().interrupt(); - return; // Interrupts must not mark the record as processed. - } - } - - try { - final var processedValue = processor.apply(record.value()); - messageSink.send(record, processedValue); - if (enableMetrics) metrics.get(METRIC_MESSAGES_PROCESSED).incrementAndGet(); - if (offsetManager != null) commandQueue.offer(new ConsumerCommand.MarkOffsetProcessed(record)); - return; - } catch (final Exception e) { - if (isInterruptionRelated(e)) { - Thread.currentThread().interrupt(); - return; // Interrupt-like failures should be retried after restart/rebalance. - } - - if (attempt == maxRetries) { - if (enableMetrics) metrics.get(METRIC_PROCESSING_ERRORS).incrementAndGet(); - LOGGER.log( - Level.WARNING, - "Failed to process message at offset %d after %d attempts: %s".formatted( - record.offset(), - maxRetries + 1, - e.getMessage() - ), - e - ); - errorHandler.accept(new ProcessingError<>(record, e, maxRetries)); - if (offsetManager != null) commandQueue.offer(new ConsumerCommand.MarkOffsetProcessed(record)); - return; - } - } + final var result = tryProcessRecord(record); + if (result) { + if (enableMetrics) metrics.get(METRIC_MESSAGES_PROCESSED).incrementAndGet(); } } finally { inFlightCount.decrementAndGet(); } } - private static final long BACKPRESSURE_WARNING_THRESHOLD = 10_000; - private volatile boolean backpressureWarningLogged; - - private void checkBackpressure() { - if (backpressureController == null) { - if (!backpressureWarningLogged && inFlightCount.get() > BACKPRESSURE_WARNING_THRESHOLD) { + private boolean tryProcessRecord(final ConsumerRecord record) { + for (int attempt = 0; attempt <= maxRetries; attempt++) { + if (attempt > 0) { + if (enableMetrics) metrics.get(METRIC_RETRIES).incrementAndGet(); LOGGER.log( - Level.WARNING, - "In-flight count(%d) exceeds %d but backpressure is disabled for topic %s. ".formatted( - inFlightCount.get(), - BACKPRESSURE_WARNING_THRESHOLD, - topic - ) + - "Consider enabling backpressure with withBackpressure to prevent memory exhaustion." + Level.INFO, + "Retrying message at offset %d (attempt %d of %d)".formatted(record.offset(), attempt, maxRetries) ); - backpressureWarningLogged = true; + + try { + Thread.sleep(retryBackoff.toMillis()); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + return false; + } + } + + try { + final var processedValue = processor.apply(record.value()); + messageSink.send(record, processedValue); + if (offsetManager != null) commandQueue.offer(new ConsumerCommand.MarkOffsetProcessed(record)); + return true; + } catch (final Exception e) { + if (isInterruptionRelated(e)) { + Thread.currentThread().interrupt(); + return false; + } + + if (attempt == maxRetries) { + if (enableMetrics) metrics.get(METRIC_PROCESSING_ERRORS).incrementAndGet(); + LOGGER.log( + Level.WARNING, + "Failed to process message at offset %d after %d attempts: %s".formatted( + record.offset(), + maxRetries + 1, + e.getMessage() + ), + e + ); + errorHandler.accept(new ProcessingError<>(record, e, maxRetries)); + if (offsetManager != null) commandQueue.offer(new ConsumerCommand.MarkOffsetProcessed(record)); + return false; + } } - return; } + return false; + } - final long inFlight = inFlightCount.get(); + private void checkBackpressure() { + if (backpressureController == null) return; - switch (backpressureController.check(inFlight, isPaused())) { + switch (backpressureController.check(kafkaConsumer, isPaused())) { case PAUSE -> { + final long currentValue = backpressureController.getMetric(kafkaConsumer); LOGGER.log( Level.WARNING, - "Backpressure triggered: pausing consumer (in-flight=%d, highWatermark=%d) for topic %s".formatted( - inFlight, + "Backpressure triggered: pausing consumer (%s=%d, highWatermark=%d) for topic %s".formatted( + backpressureController.getMetricName(), + currentValue, backpressureController.highWatermark(), topic ) @@ -813,13 +855,15 @@ private void checkBackpressure() { pause(); } case RESUME -> { + final long currentValue = backpressureController.getMetric(kafkaConsumer); final long pauseDurationMs = System.currentTimeMillis() - backpressurePauseStartTime; if (enableMetrics) metrics.get(METRIC_BACKPRESSURE_TIME_MS).addAndGet(pauseDurationMs); LOGGER.log( Level.INFO, - "Backpressure resolved: resuming consumer (paused for %d ms, in-flight=%d) for topic %s".formatted( + "Backpressure resolved: resuming consumer (paused for %d ms, %s=%d) for topic %s".formatted( pauseDurationMs, - inFlight, + backpressureController.getMetricName(), + currentValue, topic ) ); @@ -831,10 +875,8 @@ private void checkBackpressure() { } private static boolean isInterruptionRelated(final Throwable error) { - Throwable current = error; - while (current != null) { + for (Throwable current = error; current != null; current = current.getCause()) { if (current instanceof InterruptedException || current instanceof ClosedByInterruptException) return true; - current = current.getCause(); } return false; } @@ -859,75 +901,4 @@ private ConsumerRecords pollRecords() { }) .orElse(null); } - - private MessageTracker createTrackerIfEnabled(final long waitForMessagesMs) { - return (waitForMessagesMs > 0 && enableMetrics) ? createMessageTracker() : null; - } - - private void signalShutdown() { - pause(); - commandQueue.offer(new ConsumerCommand.Close()); - } - - private void waitForInFlightMessages(final MessageTracker tracker, final long waitForMessagesMs) { - Optional.ofNullable(tracker).ifPresent(t -> { - try { - long inFlightCount = t.getInFlightMessageCount(); - if (inFlightCount > 0) { - LOGGER.log(Level.INFO, "Waiting for %d in-flight messages to complete".formatted(inFlightCount)); - t.waitForCompletion(waitForMessagesMs); - } - } catch (Exception e) { - LOGGER.log(Level.WARNING, "Error waiting for in-flight messages", e); - } - }); - } - - private void wakeupAndWaitForConsumerThread(final long threadTerminationMs) { - // Safe wakeup of the consumer - Optional.ofNullable(kafkaConsumer).ifPresent(consumer -> { - try { - consumer.wakeup(); - } catch (Exception e) { - LOGGER.log(Level.WARNING, "Error during consumer wakeup", e); - } - }); - - // Wait for thread termination - Optional.ofNullable(consumerThread.get()) - .filter(Thread::isAlive) - .ifPresent(thread -> { - try { - thread.join(threadTerminationMs); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.log(Level.WARNING, "Interrupted while waiting for consumer thread"); - } - }); - } - - private void shutdownExecutor(final long executorTerminationMs) { - try { - virtualThreadExecutor.shutdown(); - if (!virtualThreadExecutor.awaitTermination(executorTerminationMs, TimeUnit.MILLISECONDS)) { - LOGGER.log(Level.WARNING, "Not all processing tasks completed during shutdown"); - final var pending = virtualThreadExecutor.shutdownNow(); - LOGGER.log(Level.WARNING, "%d tasks were not processed".formatted(pending.size())); - } - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - virtualThreadExecutor.shutdownNow(); - LOGGER.log(Level.WARNING, "Interrupted while waiting for executor termination"); - } - } - - private void closeOffsetManager() { - if (offsetManager != null) { - try { - offsetManager.close(); - } catch (final Exception e) { - LOGGER.log(Level.WARNING, "Error closing offset manager", e); - } - } - } } diff --git a/lib/src/main/java/org/kpipe/health/HealthConfig.java b/lib/src/main/java/org/kpipe/health/HealthConfig.java index 3823d7e..484b5c5 100644 --- a/lib/src/main/java/org/kpipe/health/HealthConfig.java +++ b/lib/src/main/java/org/kpipe/health/HealthConfig.java @@ -28,17 +28,23 @@ private HealthConfig() {} /// Default HTTP path for the health endpoint. public static final String DEFAULT_PATH = "/health"; - /// Get the enabled status from environment. + /// Checks if the health configuration is enabled. + /// + /// @return true if enabled, false otherwise public static boolean isEnabled() { return !"false".equalsIgnoreCase(AppConfig.getEnvOrDefault(ENV_ENABLED, "true")); } - /// Get the host from environment. + /// Retrieves the host from the environment. + /// + /// @return the host as a string public static String getHost() { return AppConfig.getEnvOrDefault(ENV_HOST, DEFAULT_HOST); } - /// Get the port from environment. + /// Retrieves the port from the environment. + /// + /// @return the port as an integer public static int getPort() { try { final var value = AppConfig.getEnvOrDefault(ENV_PORT, Integer.toString(DEFAULT_PORT)); @@ -50,7 +56,9 @@ public static int getPort() { } } - /// Get the path from environment. + /// Retrieves the path from the environment. + /// + /// @return the path as a string public static String getPath() { return AppConfig.getEnvOrDefault(ENV_PATH, DEFAULT_PATH); } diff --git a/lib/src/main/java/org/kpipe/health/HttpHealthServer.java b/lib/src/main/java/org/kpipe/health/HttpHealthServer.java index 0209c4d..65c7252 100644 --- a/lib/src/main/java/org/kpipe/health/HttpHealthServer.java +++ b/lib/src/main/java/org/kpipe/health/HttpHealthServer.java @@ -16,8 +16,6 @@ /// Lightweight HTTP health check server using the JDK built-in HttpServer. public final class HttpHealthServer implements AutoCloseable { - public static final String DEFAULT_PATH = "/health"; - private static final Logger LOGGER = System.getLogger(HttpHealthServer.class.getName()); private final HttpServer server; diff --git a/lib/src/main/java/org/kpipe/metrics/ConsumerMetricsReporter.java b/lib/src/main/java/org/kpipe/metrics/ConsumerMetricsReporter.java index a97f59b..990f22a 100644 --- a/lib/src/main/java/org/kpipe/metrics/ConsumerMetricsReporter.java +++ b/lib/src/main/java/org/kpipe/metrics/ConsumerMetricsReporter.java @@ -6,7 +6,7 @@ import java.util.function.Consumer; import java.util.function.Supplier; -/// Functional service for reporting Kafka consumer metrics. +/// Service for reporting Kafka consumer metrics. /// /// **Example 1:** Basic usage with default logging: /// diff --git a/lib/src/main/java/org/kpipe/registry/MessageProcessorRegistry.java b/lib/src/main/java/org/kpipe/registry/MessageProcessorRegistry.java index f105a8b..ff7c047 100644 --- a/lib/src/main/java/org/kpipe/registry/MessageProcessorRegistry.java +++ b/lib/src/main/java/org/kpipe/registry/MessageProcessorRegistry.java @@ -139,9 +139,7 @@ public JsonPipelineBuilder add(final RegistryKey> key) { /// /// @return the composed pipeline function public Function build() { - UnaryOperator> combined = obj -> obj; - for (final var op : operators) combined = compose(combined, op); - final var finalCombined = combined; + final var finalCombined = operators.stream().reduce(obj -> obj, (acc, op) -> t -> op.apply(acc.apply(t))); return bytes -> JsonMessageProcessor.inScopedCaches(() -> JsonMessageProcessor.processJson(bytes, finalCombined)); } } @@ -196,11 +194,7 @@ public Function build() { final var schema = AvroMessageProcessor.getSchema(schemaKey); if (schema == null) throw new IllegalArgumentException("Schema not found: " + schemaKey); - UnaryOperator combined = record -> record; - for (final var op : operators) { - combined = compose(combined, op); - } - final var finalCombined = combined; + final var finalCombined = operators.stream().reduce(record -> record, (acc, op) -> t -> op.apply(acc.apply(t))); return bytes -> AvroMessageProcessor.inScopedCaches(() -> AvroMessageProcessor.processAvro(bytes, offset, schema, finalCombined) @@ -246,22 +240,16 @@ public PojoPipelineBuilder add(final RegistryKey key) { /// /// @return the composed pipeline function public Function build() { - final MessageFormat format = MessageFormat.pojo(clazz); - - UnaryOperator combined = obj -> obj; - for (final var op : operators) { - combined = compose(combined, op); - } - final var finalCombined = combined; + final var format = MessageFormat.pojo(clazz); + final var finalCombined = operators.stream().reduce(obj -> obj, (acc, op) -> t -> op.apply(acc.apply(t))); return bytes -> { try { if (bytes == null || bytes.length == 0) return bytes; - final T deserialized = format.deserialize(bytes); + final var deserialized = format.deserialize(bytes); if (deserialized == null) return bytes; - final T processed = finalCombined.apply(deserialized); - if (processed == null) return bytes; - return format.serialize(processed); + final var processed = finalCombined.apply(deserialized); + return (processed == null) ? bytes : format.serialize(processed); } catch (final Exception e) { LOGGER.log(Level.WARNING, "Error in POJO pipeline execution", e); return defaultErrorValue; @@ -270,10 +258,6 @@ public Function build() { } } - private UnaryOperator compose(UnaryOperator first, UnaryOperator second) { - return t -> second.apply(first.apply(t)); - } - /// Registers a typed operator using a type-safe RegistryKey. /// /// @param The type of data the operator processes diff --git a/lib/src/test/java/org/kpipe/consumer/BackpressureControllerTest.java b/lib/src/test/java/org/kpipe/consumer/BackpressureControllerTest.java index 028a3b0..7304ee3 100644 --- a/lib/src/test/java/org/kpipe/consumer/BackpressureControllerTest.java +++ b/lib/src/test/java/org/kpipe/consumer/BackpressureControllerTest.java @@ -1,29 +1,49 @@ package org.kpipe.consumer; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.kpipe.consumer.BackpressureController.Action; +import org.mockito.Mockito; class BackpressureControllerTest { + private static final BackpressureController.Strategy DUMMY_STRATEGY = new BackpressureController.Strategy() { + @Override + public long getMetric(Consumer consumer) { + return 0; + } + + @Override + public String getName() { + return "dummy"; + } + }; + @Test void shouldRejectInvalidWatermarks() { - assertThrows(IllegalArgumentException.class, () -> new BackpressureController(0, 0)); - assertThrows(IllegalArgumentException.class, () -> new BackpressureController(1000, -1)); - assertThrows(IllegalArgumentException.class, () -> new BackpressureController(1000, 1000)); - assertThrows(IllegalArgumentException.class, () -> new BackpressureController(500, 1000)); + assertThrows(IllegalArgumentException.class, () -> new BackpressureController(0, 0, DUMMY_STRATEGY)); + assertThrows(IllegalArgumentException.class, () -> new BackpressureController(1000, -1, DUMMY_STRATEGY)); + assertThrows(IllegalArgumentException.class, () -> new BackpressureController(1000, 1000, DUMMY_STRATEGY)); + assertThrows(IllegalArgumentException.class, () -> new BackpressureController(500, 1000, DUMMY_STRATEGY)); } - @ParameterizedTest(name = "in-flight={0}, paused={1} → {2}") + @ParameterizedTest(name = "value={0}, paused={1} → {2}") @CsvSource( { - // Pause: not paused, in-flight reaches or exceeds high watermark + // Pause: not paused, value reaches or exceeds high watermark "1000, false, PAUSE", "1500, false, PAUSE", - // Resume: paused, in-flight at or below low watermark + // Resume: paused, value at or below low watermark "700, true, RESUME", "0, true, RESUME", // Hold: below high watermark when not paused @@ -31,12 +51,79 @@ void shouldRejectInvalidWatermarks() { // Hold: between watermarks when paused "701, true, NONE", "850, true, NONE", - // Hold: already paused, in-flight still above high watermark (no double-pause) + // Hold: already paused, value still above high watermark (no double-pause) "1500, true, NONE", } ) - void checkReturnsCorrectAction(final long inFlight, final boolean paused, final Action expected) { - final var controller = new BackpressureController(1000, 700); - assertEquals(expected, controller.check(inFlight, paused)); + void checkReturnsCorrectAction(final long metricValue, final boolean paused, final Action expected) { + final var strategy = Mockito.mock(BackpressureController.Strategy.class); + final var consumer = Mockito.mock(Consumer.class); + when(strategy.getMetric(consumer)).thenReturn(metricValue); + + final var controller = new BackpressureController(1000, 700, strategy); + assertEquals(expected, controller.check(consumer, paused)); + } + + @Test + void calculateTotalLagShouldReturnZeroWhenNoAssignment() { + final var consumer = Mockito.mock(Consumer.class); + when(consumer.assignment()).thenReturn(Collections.emptySet()); + + assertEquals(0, BackpressureController.calculateTotalLag(consumer)); + } + + @Test + void calculateTotalLagShouldReturnCorrectLag() { + final var consumer = Mockito.mock(Consumer.class); + final var tp1 = new TopicPartition("test", 0); + final var tp2 = new TopicPartition("test", 1); + final var assignment = Set.of(tp1, tp2); + + when(consumer.assignment()).thenReturn(assignment); + when(consumer.endOffsets(assignment)).thenReturn(Map.of(tp1, 100L, tp2, 200L)); + when(consumer.position(tp1)).thenReturn(90L); // lag 10 + when(consumer.position(tp2)).thenReturn(150L); // lag 50 + + assertEquals(60, BackpressureController.calculateTotalLag(consumer)); + } + + @Test + void calculateTotalLagShouldHandleErrors() { + final var consumer = Mockito.mock(Consumer.class); + when(consumer.assignment()).thenThrow(new RuntimeException("Kafka error")); + + assertEquals(0, BackpressureController.calculateTotalLag(consumer)); + } + + @Test + void lagStrategyShouldReturnCorrectNameAndMetric() { + final var strategy = BackpressureController.lagStrategy(); + assertEquals("lag", strategy.getName()); + + final var consumer = Mockito.mock(Consumer.class); + final var tp = new TopicPartition("test", 0); + final var assignment = Set.of(tp); + + when(consumer.assignment()).thenReturn(assignment); + when(consumer.endOffsets(assignment)).thenReturn(Map.of(tp, 100L)); + when(consumer.position(tp)).thenReturn(80L); + + assertEquals(20, strategy.getMetric(consumer)); + } + + @Test + void inFlightStrategyShouldReturnCorrectNameAndMetric() { + final var inFlightValue = new java.util.concurrent.atomic.AtomicLong(42); + final var strategy = BackpressureController.inFlightStrategy(inFlightValue::get); + assertEquals("in-flight", strategy.getName()); + + assertEquals(42, strategy.getMetric(null)); + inFlightValue.set(100); + assertEquals(100, strategy.getMetric(null)); + } + + @Test + void inFlightStrategyShouldRejectNullSupplier() { + assertThrows(IllegalArgumentException.class, () -> BackpressureController.inFlightStrategy(null)); } } diff --git a/lib/src/test/java/org/kpipe/consumer/ExternalOffsetIntegrationTest.java b/lib/src/test/java/org/kpipe/consumer/ExternalOffsetIntegrationTest.java index d984da1..79440f9 100644 --- a/lib/src/test/java/org/kpipe/consumer/ExternalOffsetIntegrationTest.java +++ b/lib/src/test/java/org/kpipe/consumer/ExternalOffsetIntegrationTest.java @@ -234,28 +234,30 @@ void shouldResumeFromPostgresOffset() throws InterruptedException, SQLException mc.rebalance(List.of(PARTITION)); // 3. Populate Kafka with messages 0-9 - for (int i = 0; i < 10; i++) mc.addRecord(new ConsumerRecord<>(TOPIC, 0, i, "k" + i, "v" + i)); + for (int i = 0; i < 10; i++) { + mc.addRecord(new ConsumerRecord<>(TOPIC, 0, i, "k" + i, "v" + i)); + } // 6. Verify processing starts from offset 5 assertTrue( - latch.await(10, TimeUnit.SECONDS), + latch.await(20, TimeUnit.SECONDS), "Consumer should process 5 remaining messages. Processed: %s".formatted(processedOffsets) ); - // Wait a bit more for the last offset to be marked as processed in the DB - Thread.sleep(200); + // Wait for the last offset to be marked as processed in the DB (polling for stability) + long lastDbOffset = -1; + for (int i = 0; i < 20; i++) { + final var offset = dbManager.getOffsetFromDb(PARTITION); + if (offset != null && offset == 9L) { + lastDbOffset = offset; + break; + } + Thread.sleep(100); + } - assertEquals( - 5, - processedOffsets.size(), - "Should have processed 5 messages, but got: %s".formatted(processedOffsets) - ); + assertEquals(5, processedOffsets.size(), "Should have processed 5 messages"); assertTrue(processedOffsets.stream().allMatch(o -> o >= 5), "Should only process offsets >= 5"); - assertEquals( - 9L, - dbManager.getOffsetFromDb(PARTITION), - "DB should be updated to offset 9. Actual: %d".formatted(dbManager.getOffsetFromDb(PARTITION)) - ); + assertEquals(9L, lastDbOffset, "DB should be updated to offset 9. Actual: %d".formatted(lastDbOffset)); consumer.close(); } diff --git a/lib/src/test/java/org/kpipe/consumer/KPipeBackpressureIntegrationTest.java b/lib/src/test/java/org/kpipe/consumer/KPipeBackpressureIntegrationTest.java index d4c03f7..da18e1d 100644 --- a/lib/src/test/java/org/kpipe/consumer/KPipeBackpressureIntegrationTest.java +++ b/lib/src/test/java/org/kpipe/consumer/KPipeBackpressureIntegrationTest.java @@ -13,6 +13,7 @@ import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; /// Integration tests that exercise the full consumer loop to verify backpressure fires @@ -35,264 +36,331 @@ void setUp() { properties.put("enable.auto.commit", "true"); } - @Test - void shouldPauseKafkaConsumerWhenInFlightExceedsHighWatermark() throws InterruptedException { - // Arrange: 5 records, slow sink, highWatermark=3 → in-flight will exceed it - final var mockConsumer = buildMockConsumer(5); - final var sinkStarted = new CountDownLatch(5); - final var sinkRelease = new CountDownLatch(1); - - final var consumer = KPipeConsumer - .builder() - .withProperties(properties) - .withTopic(TOPIC) - .withProcessor(v -> v) - .withMessageSink((record, value) -> { - sinkStarted.countDown(); - try { - sinkRelease.await(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }) - .withBackpressure(3, 1) - .withConsumer(() -> mockConsumer) - .build(); - - // Act: start the real consumer loop - consumer.start(); - - // Wait for all 5 sink calls to be in-flight (in-flight=5 >= highWatermark=3) - assertTrue(sinkStarted.await(3, TimeUnit.SECONDS), "All 5 records should be in-flight"); - - // Give the consumer loop time to call checkBackpressure() - awaitCondition(() -> !mockConsumer.paused().isEmpty(), 2000); - - // Assert: Kafka consumer is paused - assertTrue(mockConsumer.paused().contains(PARTITION), "Consumer should be paused due to backpressure"); - assertEquals(1L, consumer.getMetrics().get(KPipeConsumer.METRIC_BACKPRESSURE_PAUSE_COUNT)); - - // Release the sink → in-flight drops to 0, below lowWatermark=1 - sinkRelease.countDown(); - - // Assert: consumer resumes once in-flight ≤ lowWatermark - awaitCondition(() -> mockConsumer.paused().isEmpty(), 3000); - assertTrue(mockConsumer.paused().isEmpty(), "Consumer should have resumed"); - - consumer.close(); - } + @Nested + class ParallelMode { + + @Test + void shouldPauseKafkaConsumerWhenInFlightExceedsHighWatermark() throws InterruptedException { + // Arrange: 5 records, slow sink, highWatermark=3 → in-flight will exceed it + final var mockConsumer = buildMockConsumer(5); + final var sinkStarted = new CountDownLatch(5); + final var sinkRelease = new CountDownLatch(1); + + final var consumer = KPipeConsumer.builder() + .withProperties(properties) + .withTopic(TOPIC) + .withProcessor(v -> v) + .withMessageSink((record, value) -> { + sinkStarted.countDown(); + try { + sinkRelease.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }) + .withBackpressure(3, 1) + .withConsumer(() -> mockConsumer) + .build(); + + // Act: start the real consumer loop + consumer.start(); + + // Wait for all 5 sink calls to be in-flight (in-flight=5 >= highWatermark=3) + assertTrue(sinkStarted.await(3, TimeUnit.SECONDS), "All 5 records should be in-flight"); + + // Give the consumer loop time to call checkBackpressure() + awaitCondition(() -> !mockConsumer.paused().isEmpty(), 2000); + + // Assert: Kafka consumer is paused + assertTrue(mockConsumer.paused().contains(PARTITION), "Consumer should be paused due to backpressure"); + assertEquals(1L, consumer.getMetrics().get(KPipeConsumer.METRIC_BACKPRESSURE_PAUSE_COUNT)); + + // Release the sink → in-flight drops to 0, below lowWatermark=1 + sinkRelease.countDown(); + + // Assert: consumer resumes once in-flight ≤ lowWatermark + awaitCondition(() -> mockConsumer.paused().isEmpty(), 3000); + assertTrue(mockConsumer.paused().isEmpty(), "Consumer should have resumed"); + + consumer.close(); + } - @Test - void shouldAccumulateBackpressureTimeMsAfterResume() throws InterruptedException { - // Arrange - final var mockConsumer = buildMockConsumer(4); - final var sinkStarted = new CountDownLatch(4); - final var sinkRelease = new CountDownLatch(1); - - final var consumer = KPipeConsumer - .builder() - .withProperties(properties) - .withTopic(TOPIC) - .withProcessor(v -> v) - .withMessageSink((record, value) -> { - sinkStarted.countDown(); - try { - sinkRelease.await(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }) - .withBackpressure(2, 1) - .withConsumer(() -> mockConsumer) - .build(); - - consumer.start(); - assertTrue(sinkStarted.await(3, TimeUnit.SECONDS)); - - // Wait for pause - awaitCondition(() -> !mockConsumer.paused().isEmpty(), 2000); - - // Release and wait for resume - sinkRelease.countDown(); - awaitCondition(() -> mockConsumer.paused().isEmpty(), 3000); - - // backpressureTimeMs must be > 0 - assertTrue( - consumer.getMetrics().get(KPipeConsumer.METRIC_BACKPRESSURE_TIME_MS) > 0, - "backpressureTimeMs should be positive after a backpressure pause" - ); - - consumer.close(); - } + @Test + void shouldAccumulateBackpressureTimeMsAfterResume() throws InterruptedException { + // Arrange + final var mockConsumer = buildMockConsumer(4); + final var sinkStarted = new CountDownLatch(4); + final var sinkRelease = new CountDownLatch(1); + + final var consumer = KPipeConsumer.builder() + .withProperties(properties) + .withTopic(TOPIC) + .withProcessor(v -> v) + .withMessageSink((record, value) -> { + sinkStarted.countDown(); + try { + sinkRelease.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }) + .withBackpressure(2, 1) + .withConsumer(() -> mockConsumer) + .build(); + + consumer.start(); + assertTrue(sinkStarted.await(3, TimeUnit.SECONDS)); + + // Wait for pause + awaitCondition(() -> !mockConsumer.paused().isEmpty(), 2000); + + // Release and wait for resume + sinkRelease.countDown(); + awaitCondition(() -> mockConsumer.paused().isEmpty(), 3000); + + // backpressureTimeMs must be > 0 + assertTrue( + (long) consumer.getMetrics().get(KPipeConsumer.METRIC_BACKPRESSURE_TIME_MS) > 0, + "backpressureTimeMs should be positive after a backpressure pause" + ); + + consumer.close(); + } - @Test - void shouldNotPauseWhenInFlightStaysBelowHighWatermark() throws InterruptedException { - // Arrange: 2 fast records, highWatermark=10 → no pause expected - final var mockConsumer = buildMockConsumer(2); - final var sinkDone = new CountDownLatch(2); + @Test + void shouldNotPauseWhenInFlightStaysBelowHighWatermark() throws InterruptedException { + // Arrange: 2 fast records, highWatermark=10 → no pause expected + final var mockConsumer = buildMockConsumer(2); + final var sinkDone = new CountDownLatch(2); - final var consumer = KPipeConsumer - .builder() - .withProperties(properties) - .withTopic(TOPIC) - .withProcessor(v -> v) - .withMessageSink((record, value) -> sinkDone.countDown()) - .withBackpressure(10, 5) - .withConsumer(() -> mockConsumer) - .build(); + final var consumer = KPipeConsumer.builder() + .withProperties(properties) + .withTopic(TOPIC) + .withProcessor(v -> v) + .withMessageSink((record, value) -> sinkDone.countDown()) + .withBackpressure(10, 5) + .withConsumer(() -> mockConsumer) + .build(); - consumer.start(); - assertTrue(sinkDone.await(3, TimeUnit.SECONDS)); + consumer.start(); + assertTrue(sinkDone.await(3, TimeUnit.SECONDS)); - // Give the loop a few iterations to call checkBackpressure - Thread.sleep(300); + // Give the loop a few iterations to call checkBackpressure + Thread.sleep(300); - assertTrue(mockConsumer.paused().isEmpty(), "Consumer should never have paused"); - assertEquals(0L, consumer.getMetrics().get(KPipeConsumer.METRIC_BACKPRESSURE_PAUSE_COUNT)); + assertTrue(mockConsumer.paused().isEmpty(), "Consumer should never have paused"); + assertEquals(0L, consumer.getMetrics().get(KPipeConsumer.METRIC_BACKPRESSURE_PAUSE_COUNT)); - consumer.close(); - } + consumer.close(); + } - @Test - void withBackpressureAndSequentialProcessingShouldThrowAtBuildTime() { - assertThrows( - IllegalStateException.class, - () -> - KPipeConsumer - .builder() - .withProperties(properties) - .withTopic(TOPIC) - .withProcessor(v -> v) - .withSequentialProcessing(true) - .withBackpressure(10_000, 7_000) - .build() - ); - } + @Test + void shouldNotPollNewRecordsWhilePaused() throws InterruptedException { + // Arrange: 10 records total, but we add them in batches + final var mc = new MockConsumer("earliest") { + @Override + public synchronized void subscribe(final Collection topics) {} + + @Override + public synchronized void subscribe(final Collection topics, final ConsumerRebalanceListener callback) {} + }; + mc.assign(List.of(PARTITION)); + mc.updateBeginningOffsets(Map.of(PARTITION, 0L)); + + // Add first 5 records + for (int i = 0; i < 5; i++) { + mc.addRecord(new ConsumerRecord<>(TOPIC, 0, i, "k" + i, "v" + i)); + } - @Test - void shouldNotPollNewRecordsWhilePaused() throws InterruptedException { - // Arrange: 10 records total, but we add them in batches - final var mc = new MockConsumer("earliest") { - @Override - public synchronized void subscribe(final Collection topics) {} - @Override - public synchronized void subscribe(final Collection topics, final ConsumerRebalanceListener callback) {} - }; - mc.assign(List.of(PARTITION)); - mc.updateBeginningOffsets(Map.of(PARTITION, 0L)); - - // Add first 5 records - for (int i = 0; i < 5; i++) { - mc.addRecord(new ConsumerRecord<>(TOPIC, 0, i, "k" + i, "v" + i)); - } + final var sinkStarted = new CountDownLatch(5); + final var sinkRelease = new CountDownLatch(1); + + final var consumer = KPipeConsumer.builder() + .withProperties(properties) + .withTopic(TOPIC) + .withProcessor(v -> v) + .withMessageSink((record, value) -> { + sinkStarted.countDown(); + try { + sinkRelease.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }) + .withBackpressure(3, 1) + .withConsumer(() -> mc) + .build(); + + // Act + consumer.start(); + + // Wait for backpressure to trigger (5 records should be in-flight) + assertTrue(sinkStarted.await(3, TimeUnit.SECONDS), "First 5 records should be in-flight"); + awaitCondition(() -> !mc.paused().isEmpty(), 2000); + + // Assert: consumer is paused + assertTrue(mc.paused().contains(PARTITION)); + assertEquals(5L, consumer.getMetrics().get("messagesReceived")); + + // Now add more records while it's paused + for (int i = 5; i < 10; i++) { + mc.addRecord(new ConsumerRecord<>(TOPIC, 0, i, "k" + i, "v" + i)); + } - final var sinkStarted = new CountDownLatch(5); - final var sinkRelease = new CountDownLatch(1); - - final var consumer = KPipeConsumer - .builder() - .withProperties(properties) - .withTopic(TOPIC) - .withProcessor(v -> v) - .withMessageSink((record, value) -> { - sinkStarted.countDown(); - try { - sinkRelease.await(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }) - .withBackpressure(3, 1) - .withConsumer(() -> mc) - .build(); - - // Act - consumer.start(); - - // Wait for backpressure to trigger (5 records should be in-flight) - assertTrue(sinkStarted.await(3, TimeUnit.SECONDS), "First 5 records should be in-flight"); - awaitCondition(() -> !mc.paused().isEmpty(), 2000); - - // Assert: consumer is paused - assertTrue(mc.paused().contains(PARTITION)); - assertEquals(5L, consumer.getMetrics().get("messagesReceived")); - - // Now add more records while it's paused - for (int i = 5; i < 10; i++) { - mc.addRecord(new ConsumerRecord<>(TOPIC, 0, i, "k" + i, "v" + i)); + // Wait some time while paused to ensure no more records are polled + Thread.sleep(500); + + // Should still have only 5 records received because it's paused + assertEquals(5L, consumer.getMetrics().get("messagesReceived"), "No more records should be polled while paused"); + + // Release and wait for resume + sinkRelease.countDown(); + awaitCondition(() -> mc.paused().isEmpty(), 3000); + + // Now it should poll the remaining records + awaitCondition(() -> (long) consumer.getMetrics().get("messagesReceived") == 10, 3000); + assertEquals(10L, consumer.getMetrics().get("messagesReceived")); + + consumer.close(); } - // Wait some time while paused to ensure no more records are polled - Thread.sleep(500); - - // Should still have only 5 records received because it's paused - assertEquals(5L, consumer.getMetrics().get("messagesReceived"), "No more records should be polled while paused"); + @Test + void shouldStopPollingWhenManuallyPaused() throws InterruptedException { + // Arrange: 10 records total + final var mc = new MockConsumer("earliest") { + @Override + public synchronized void subscribe(final Collection topics) {} + + @Override + public synchronized void subscribe(final Collection topics, final ConsumerRebalanceListener callback) {} + }; + mc.assign(List.of(PARTITION)); + mc.updateBeginningOffsets(Map.of(PARTITION, 0L)); + + // Add records in two batches + for (int i = 0; i < 5; i++) { + mc.addRecord(new ConsumerRecord<>(TOPIC, 0, i, "k" + i, "v" + i)); + } + + final var consumer = KPipeConsumer.builder() + .withProperties(properties) + .withTopic(TOPIC) + .withProcessor(v -> v) + .withConsumer(() -> mc) + .build(); + + // Act + consumer.start(); - // Release and wait for resume - sinkRelease.countDown(); - awaitCondition(() -> mc.paused().isEmpty(), 3000); + // Wait for first 5 to be polled + awaitCondition(() -> (long) consumer.getMetrics().get("messagesReceived") == 5, 2000); - // Now it should poll the remaining records - awaitCondition(() -> consumer.getMetrics().get("messagesReceived") == 10, 3000); - assertEquals(10L, consumer.getMetrics().get("messagesReceived")); + // Pause manually + consumer.pause(); + + // Give some time for the command to be processed + awaitCondition(() -> !mc.paused().isEmpty(), 2000); + + // Now add more records while it's paused + for (int i = 5; i < 10; i++) { + mc.addRecord(new ConsumerRecord<>(TOPIC, 0, i, "k" + i, "v" + i)); + } - consumer.close(); + // Wait some more to ensure no more are polled while paused + Thread.sleep(500); + assertEquals(5L, consumer.getMetrics().get("messagesReceived"), "No more records should be polled while paused"); + + // Resume manually + consumer.resume(); + + // Wait for the rest to be polled + awaitCondition(() -> (long) consumer.getMetrics().get("messagesReceived") == 10, 3000); + assertEquals(10L, consumer.getMetrics().get("messagesReceived")); + + consumer.close(); + } } - - @Test - void shouldStopPollingWhenManuallyPaused() throws InterruptedException { - // Arrange: 10 records total - final var mc = new MockConsumer("earliest") { - @Override - public synchronized void subscribe(final Collection topics) {} - @Override - public synchronized void subscribe(final Collection topics, final ConsumerRebalanceListener callback) {} - }; - mc.assign(List.of(PARTITION)); - mc.updateBeginningOffsets(Map.of(PARTITION, 0L)); - - // Add records in two batches - for (int i = 0; i < 5; i++) { - mc.addRecord(new ConsumerRecord<>(TOPIC, 0, i, "k" + i, "v" + i)); + + @Nested + class SequentialMode { + + @Test + void shouldPauseWhenLagExceedsHighWatermarkInSequentialMode() throws InterruptedException { + // Arrange: 10 records in Kafka, highWatermark=5 + final var mockConsumer = new MockConsumer("earliest") { + @Override + public synchronized void subscribe(final Collection topics) {} + + @Override + public synchronized void subscribe(final Collection topics, final ConsumerRebalanceListener callback) {} + }; + mockConsumer.assign(List.of(PARTITION)); + mockConsumer.updateBeginningOffsets(Map.of(PARTITION, 0L)); + + // Initial 10 records + for (int i = 0; i < 10; i++) { + mockConsumer.addRecord(new ConsumerRecord<>(TOPIC, 0, i, "k" + i, "v" + i)); + } + mockConsumer.updateEndOffsets(Map.of(PARTITION, 10L)); + + final var consumer = KPipeConsumer.builder() + .withProperties(properties) + .withTopic(TOPIC) + .withProcessor(v -> { + try { + // Slow down processing to allow backpressure loop to see the lag + Thread.sleep(200); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return v; + }) + // Low watermark=2, High watermark=5 + .withBackpressure(5, 2) + .withSequentialProcessing(true) + .withConsumer(() -> mockConsumer) + .build(); + + // Act + consumer.start(); + + // Wait for pause + awaitCondition(() -> !mockConsumer.paused().isEmpty(), 5000); + + // Assert: consumer is paused + assertTrue(mockConsumer.paused().contains(PARTITION), "Consumer should be paused due to high lag"); + assertTrue((long) consumer.getMetrics().get(KPipeConsumer.METRIC_BACKPRESSURE_PAUSE_COUNT) >= 1); + + consumer.close(); } - final var consumer = KPipeConsumer - .builder() - .withProperties(properties) - .withTopic(TOPIC) - .withProcessor(v -> v) - .withConsumer(() -> mc) - .build(); - - // Act - consumer.start(); - - // Wait for first 5 to be polled - awaitCondition(() -> consumer.getMetrics().get("messagesReceived") == 5, 2000); - - // Pause manually - consumer.pause(); - - // Give some time for the command to be processed - awaitCondition(() -> !mc.paused().isEmpty(), 2000); - - // Now add more records while it's paused - for (int i = 5; i < 10; i++) { - mc.addRecord(new ConsumerRecord<>(TOPIC, 0, i, "k" + i, "v" + i)); + @Test + void withBackpressureAndSequentialProcessingShouldSwitchToLagMonitoringAtBuildTime() { + final var consumer = KPipeConsumer.builder() + .withProperties(properties) + .withTopic(TOPIC) + .withProcessor(v -> v) + .withSequentialProcessing(true) + .withBackpressure(10_000, 7_000) + .build(); + + assertNotNull(consumer); + } + + @Test + void testMockConsumerLag() { + final var mc = new MockConsumer("earliest"); + mc.assign(List.of(PARTITION)); + mc.updateBeginningOffsets(Map.of(PARTITION, 0L)); + mc.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, "k", "v")); + mc.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, "k", "v")); + mc.updateEndOffsets(Map.of(PARTITION, 10L)); + + assertEquals(0, mc.position(PARTITION)); + mc.poll(java.time.Duration.ZERO); + assertEquals(2, mc.position(PARTITION)); + assertEquals(10L, mc.endOffsets(List.of(PARTITION)).get(PARTITION)); } - - // Wait some more to ensure no more are polled while paused - Thread.sleep(500); - assertEquals(5L, consumer.getMetrics().get("messagesReceived"), "No more records should be polled while paused"); - - // Resume manually - consumer.resume(); - - // Wait for the rest to be polled - awaitCondition(() -> consumer.getMetrics().get("messagesReceived") == 10, 3000); - assertEquals(10L, consumer.getMetrics().get("messagesReceived")); - - consumer.close(); } // --- helpers ---