Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 95 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ KPipe uses **Java Virtual Threads** (Project Loom) for high-concurrency message

- **Efficient Resource Reuse**: Heavy objects like `Schema.Parser`, `ByteArrayOutputStream`, and Avro encoders are
cached per virtual thread using `ScopedValue`, which is significantly more lightweight than `ThreadLocal`.
- **Optimization**: `ScopedValue` allows KPipe to share these heavy resources across all transformations in a single
pipeline without the memory leak risks or scalability bottlenecks of `ThreadLocal` in a virtual-thread-per-record
model.
- **Thread-Per-Record**: Each message is processed in its own virtual thread, allowing KPipe to scale to millions of
concurrent operations without the overhead of complex thread pools.

Expand All @@ -165,7 +168,18 @@ KPipe implements a **Lowest Pending Offset** strategy to ensure reliability even
result in some records being re-processed (standard "at-least-once" behavior), it guarantees no message is ever
skipped.

### 4. External Offset Management
### 4. Parallel vs. Sequential Processing

KPipe supports two modes of execution depending on your ordering and throughput requirements:

- **Parallel Mode (Default)**: Best for stateless transformations (enrichment, masking). High throughput via virtual
threads. Offsets are committed based on the **lowest pending offset** to ensure no gaps.
- **Sequential Mode (`.withSequentialProcessing(true)`)**: Best for stateful transformations where order per partition
is critical (e.g., balance updates, sequence-dependent events). In this mode, only one message per partition is
processed at a time. Backpressure is supported and operates by monitoring the **consumer lag** (the difference between
the partition end-offset and the consumer's current position).

### 5. External Offset Management

While Kafka-based offset storage is the default, KPipe supports external storage (e.g., PostgreSQL) for **exactly-once
processing** or specific architectural needs.
Expand Down Expand Up @@ -216,19 +230,57 @@ KPipe provides a robust, multi-layered error handling mechanism:
### 6. Backpressure

When a downstream sink (database, HTTP API, another Kafka topic) is slow, KPipe can automatically pause Kafka polling to
prevent unbounded in-flight message growth.
prevent unbounded resource consumption or excessive lag.

Backpressure uses **two configurable watermarks** (hysteresis) to avoid rapid pause/resume oscillation:

- **High watermark** — pause Kafka polling when in-flight messages reach this count
- **Low watermark** — resume Kafka polling when in-flight drops to or below this count (hysteresis)
- **High watermark** — pause Kafka polling when the monitored metric reaches this value.
- **Low watermark** — resume Kafka polling when the metric drops to or below this value.

#### Backpressure Strategies

KPipe automatically selects the optimal backpressure strategy based on your processing mode:

| Mode | Strategy | Metric Monitored | Use Case |
| :--------------------- | :--------------- | :----------------------------- | :------------------------------------------------------------- |
| **Parallel** (Default) | **In-Flight** | Total active virtual threads | Prevent memory exhaustion from too many concurrent tasks. |
| **Sequential** | **Consumer Lag** | Total unread messages in Kafka | Prevent the consumer from falling too far behind the producer. |

##### 1. In-Flight Strategy (Parallel Mode)

In parallel mode, multiple messages are processed concurrently using Virtual Threads. The backpressure controller
monitors the number of messages currently "in-flight" (started but not yet finished).

- **High Watermark Default**: 10,000
- **Low Watermark Default**: 7,000

##### 2. Consumer Lag Strategy (Sequential Mode)

In sequential mode, messages are processed one by one to maintain strict ordering. Since only one message is ever
in-flight, KPipe instead monitors the **total consumer lag** across all assigned partitions.

The lag is calculated using the formula:

```
lag = Σ (endOffset - position)
```

Where:

- `endOffset`: The highest available offset in a partition.
- `position`: The offset of the next record to be fetched by this consumer.

- **High Watermark Default**: 10,000
- **Low Watermark Default**: 7,000

#### Configuration

```java
final var consumer = KPipeConsumer.<byte[], byte[]>builder()
.withProperties(kafkaProps)
.withTopic("events")
.withProcessor(pipeline)
// Pause at 10,000 in-flight; resume at 7,000 (defaults)
// Enable backpressure with default watermarks (10k / 7k)
.withBackpressure()
// Or configure explicit watermarks:
// .withBackpressure(5_000, 3_000)
Expand All @@ -237,10 +289,6 @@ final var consumer = KPipeConsumer.<byte[], byte[]>builder()

**Backpressure is disabled by default** and opt-in via `.withBackpressure()`.

**Constraints:**

- Not compatible with sequential processing mode (in-flight is always ≤ 1 in that mode)

**Observability:** backpressure events are logged (WARNING on pause, INFO on resume) and tracked via two dedicated
metrics: `backpressurePauseCount` and `backpressureTimeMs`.

Expand Down Expand Up @@ -475,7 +523,7 @@ You can create custom sinks using lambda expressions:

```java
// Create a custom sink that writes to a database
MessageSink<byte[], byte[]> databaseSink = (record, processedValue) -> {
final MessageSink<byte[], byte[]> databaseSink = (record, processedValue) -> {
try {
// Parse the processed value
final var data = new String(processedValue, StandardCharsets.UTF_8);
Expand All @@ -489,9 +537,6 @@ MessageSink<byte[], byte[]> databaseSink = (record, processedValue) -> {
log.log(Level.ERROR, "Failed to write message to database", e);
}
};

// Use the custom sink with a consumer
final var consumer = KPipeConsumer.<byte[], byte[]>builder().withMessageSink(databaseSink).build();
```

### Message Sink Registry
Expand Down Expand Up @@ -558,8 +603,7 @@ graceful shutdown:

```java
// Create a consumer runner with default settings
ConsumerRunner<KPipeConsumer<byte[], byte[]>> runner = ConsumerRunner.builder(consumer)
.build();
final var runner = ConsumerRunner.builder(consumer).build();

// Start the consumer
runner.start();
Expand Down Expand Up @@ -620,7 +664,7 @@ The `ConsumerRunner` integrates with metrics reporting:

```java
// Add multiple metrics reporters
ConsumerRunner<KPipeConsumer<byte[], byte[]>> runner = ConsumerRunner.builder(consumer)
final var runner = ConsumerRunner.builder(consumer)
.withMetricsReporters(
List.of(
ConsumerMetricsReporter.forConsumer(consumer::getMetrics),
Expand All @@ -636,7 +680,7 @@ ConsumerRunner<KPipeConsumer<byte[], byte[]>> runner = ConsumerRunner.builder(co
The `ConsumerRunner` implements `AutoCloseable` for use with try-with-resources:

```java
try (ConsumerRunner<KPipeConsumer<byte[], byte[]>> runner = ConsumerRunner.builder(consumer).build()) {
try (final var runner = ConsumerRunner.builder(consumer).build()) {
runner.start();
// Application logic here
// Runner will be automatically closed when exiting the try block
Expand Down Expand Up @@ -746,7 +790,8 @@ export SHUTDOWN_TIMEOUT_SEC=5

## Requirements

- Java 24+
- **Java 24+** (Note: Ensure `--enable-preview` is used as `ScopedValue` and Virtual Thread optimizations continue to
evolve).
- Gradle (for building the project)
- [kcat](https://github.com/edenhill/kcat) (for testing)
- Docker (for local Kafka setup)
Expand All @@ -755,7 +800,8 @@ export SHUTDOWN_TIMEOUT_SEC=5

## Testing

Follow these steps to test the KPipe Kafka Consumer:
Follow these steps to test the KPipe Kafka Consumer. KPipe includes a pre-configured `docker-compose.yaml` in the root
directory that starts a full local environment including Kafka, Zookeeper, and Confluent Schema Registry.

### Build and Run

Expand Down Expand Up @@ -854,10 +900,10 @@ public enum StandardProcessors implements UnaryOperator<Map<String, Object>> {
SOURCE(JsonMessageProcessor.addFieldOperator("src", "app"));

private final UnaryOperator<Map<String, Object>> op;
StandardProcessors(UnaryOperator<Map<String, Object>> op) { this.op = op; }
StandardProcessors(final UnaryOperator<Map<String, Object>> op) { this.op = op; }

@Override
public Map<String, Object> apply(Map<String, Object> t) { return op.apply(t); }
public Map<String, Object> apply(final Map<String, Object> t) { return op.apply(t); }
}

// Bulk register all enum constants
Expand All @@ -873,7 +919,7 @@ The library provides a built-in `when()` method for conditional processing:

```java
// Create a predicate that checks message type
Predicate<byte[]> isOrderMessage = (bytes) -> {
final Predicate<byte[]> isOrderMessage = (bytes) -> {
// Logic to check if it's an order
return true;
};
Expand All @@ -886,6 +932,33 @@ Function<byte[], byte[]> conditionalPipeline = MessageProcessorRegistry.when(
);
```

### Filtering Messages

To skip a message in a pipeline, return `null` in your processor. KPipe will treat `null` as a signal to stop processing
the current record and will not send it to the sink.

```java
registry.registerOperator(RegistryKey.json("filter"), map -> {
if ("internal".equals(map.get("type"))) {
return null; // Skip this message
}
return map;
});
```

### Header Propagation

You can access `ConsumerRecord` headers within a custom sink to propagate tracing or metadata:

```java
MessageSink<byte[], byte[]> tracingSink = (record, processedValue) -> {
final var traceId = record.headers().lastHeader("X-Trace-Id");
if (traceId != null) {
// Use traceId.value() for logging or downstream calls
}
};
```

### Thread-Safety and Resource Management

- Message processors should be stateless and thread-safe.
Expand Down
Loading
Loading