Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
03719df
feat: introduce typed MessagePipeline interface and unify sink API
eschizoid Mar 31, 2026
af11f44
feat: support terminal sink in MessagePipeline and update consumer pr…
eschizoid Mar 31, 2026
2e34edb
refactor: unify processor pipeline creation and update registry API f…
eschizoid Mar 31, 2026
cfc819e
refactor: simplify registry wrapping logic and enhance sink registrat…
eschizoid Mar 31, 2026
a842f68
refactor: update registry sink keys to use typed variants and streaml…
eschizoid Mar 31, 2026
48bac3d
refactor: streamline registry key factories and pipeline builder oper…
eschizoid Mar 31, 2026
a800639
refactor: remove redundant wrapping logic and improve sink retrieval …
eschizoid Mar 31, 2026
4a838a5
refactor: update integration tests to use typed registry key construc…
eschizoid Mar 31, 2026
587a949
refactor: streamline code formatting and simplify control flow in ben…
eschizoid Mar 31, 2026
bdecb5e
refactor: extract retry, typed record processing, and error handling …
eschizoid Mar 31, 2026
362503a
refactor: simplify consumer state transitions, logging, and metric in…
eschizoid Mar 31, 2026
92fddb3
refactor: remove unused constants and streamline control flow in mess…
eschizoid Mar 31, 2026
703eea5
chore: add module-info.java and configure build for Java module syste…
eschizoid Mar 31, 2026
2f90d89
refactor: rename ConsumerRunner to KPipeRunner and update references …
eschizoid Mar 31, 2026
de6a7e7
refactor: rename ConsumerRunnerTest to KPipeRunnerTest and update ref…
eschizoid Mar 31, 2026
731cc32
refactor: update pipeline construction to use registry.pipeline and w…
eschizoid Mar 31, 2026
1ef590b
refactor: update pipeline construction to use registry.pipeline and w…
eschizoid Mar 31, 2026
7c14c6c
Merge branch 'main' of github.com:eschizoid/kpipe into feature/typed-…
eschizoid Mar 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 40 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,21 @@ registry.registerOperator(sanitizeKey, JsonMessageProcessor.removeFieldsOperator
final var stampKey = RegistryKey.json("stamp");
registry.registerOperator(stampKey, JsonMessageProcessor.addTimestampOperator("processedAt"));

final var pipeline = registry.jsonPipeline()
final var pipeline = registry.pipeline(MessageFormat.JSON)
.add(sanitizeKey, stampKey)
.toSink(MessageSinkRegistry.JSON_LOGGING)
.build();

final var consumer = KPipeConsumer.<byte[], byte[]>builder()
.withProperties(kafkaProps)
.withTopic("users")
.withProcessor(pipeline)
.withPipeline(pipeline)
.withRetry(3, Duration.ofSeconds(1))
.build();

// Use the consumer
consumer.start();
// Use the runner to manage the consumer lifecycle
final var runner = KPipeRunner.builder(consumer).build();
runner.start();
```

KPipe handles:
Expand Down Expand Up @@ -328,7 +329,7 @@ registry.registerOperator(envKey,
JsonMessageProcessor.addFieldOperator("environment", "production"));

// Create a high-performance pipeline (single SerDe cycle)
final var pipeline = registry.jsonPipeline()
final var pipeline = registry.pipeline(MessageFormat.JSON)
.add(envKey)
.add(uppercaseKey)
.add(RegistryKey.json("addTimestamp"))
Expand All @@ -338,7 +339,7 @@ final var pipeline = registry.jsonPipeline()
final var consumer = KPipeConsumer.<byte[], byte[]>builder()
.withProperties(kafkaProps)
.withTopic("events")
.withProcessor(pipeline)
.withPipeline(pipeline)
.withRetry(3, Duration.ofSeconds(1))
.build();

Expand Down Expand Up @@ -368,7 +369,7 @@ log.log(Level.INFO, "Time spent paused (ms): " + metrics.get("backpressureTimeMs
Configure automatic metrics reporting:

```java
final var runner = ConsumerRunner.builder(consumer)
final var runner = KPipeRunner.builder(consumer)
.withMetricsReporters(List.of(ConsumerMetricsReporter.forConsumer(consumer::getMetrics)))
.withMetricsInterval(30_000)
.build();
Expand Down Expand Up @@ -421,7 +422,7 @@ final var metaKey = RegistryKey.json("addMetadata");
registry.registerOperator(metaKey, JsonMessageProcessor.mergeWithOperator(metadata));

// Build an optimized pipeline (one deserialization -> many transformations -> one serialization)
final var pipeline = registry.jsonPipeline()
final var pipeline = registry.pipeline(MessageFormat.JSON)
.add(sanitizeKey)
.add(stampKey)
.add(metaKey)
Expand Down Expand Up @@ -454,14 +455,15 @@ registry.registerOperator(upperKey,

// Build an optimized pipeline
// This pipeline handles deserialization, all operators, and serialization in one pass
final var pipeline = registry.avroPipeline("user")
final var avroFormat = ((AvroFormat) MessageFormat.AVRO).withDefaultSchema("user");
final var pipeline = registry.pipeline(avroFormat)
.add(sanitizeKey)
.add(upperKey)
.add(RegistryKey.avro("addTimestamp_user"))
.build();

// For data with magic bytes (e.g., Confluent Wire Format), specify an offset:
final var confluentPipeline = registry.avroPipeline("user")
final var confluentPipeline = registry.pipeline(avroFormat)
.skipBytes(5)
.add(sanitizeKey)
.add(RegistryKey.avro("addTimestamp_user"))
Expand All @@ -481,7 +483,7 @@ final var userKey = RegistryKey.of("userTransform", UserRecord.class);
registry.registerOperator(userKey, user -> new UserRecord(user.id(), user.name().toUpperCase(), user.email()));

// Build an optimized POJO pipeline
final var pipeline = registry.pojoPipeline(UserRecord.class)
final var pipeline = registry.pipeline(MessageFormat.pojo(UserRecord.class))
.add(userKey)
.build();
```
Expand Down Expand Up @@ -512,7 +514,11 @@ final var jsonConsoleSink = new JsonConsoleSink<Map<String, Object>>();
final var avroConsoleSink = new AvroConsoleSink<GenericRecord>();

// Use a sink directly in the pipeline
final var pipeline = registry.jsonPipeline().add(RegistryKey.json("sanitize")).toSink(jsonConsoleSink).build();
final var pipeline = registry
.pipeline(MessageFormat.JSON)
.add(RegistryKey.json("sanitize"))
.toSink(jsonConsoleSink)
.build();
```

### Custom Sinks
Expand Down Expand Up @@ -547,7 +553,7 @@ final var dbKey = RegistryKey.of("database", Map.class);
registry.register(dbKey, databaseSink);

// Use the sink by key in the pipeline
final var pipeline = registry.jsonPipeline()
final var pipeline = registry.pipeline(MessageFormat.JSON)
.add(RegistryKey.json("enrich"))
.toSink(dbKey)
.build();
Expand All @@ -562,10 +568,10 @@ The registry provides utilities for adding error handling to sinks:
final var safeSink = MessageSinkRegistry.withErrorHandling(riskySink);

// Register and use the wrapped sink
final var safeKey = RegistryKey.of("safeDatabase", Map.class);
final var safeKey = RegistryKey.json("safeDatabase");
registry.register(safeKey, safeSink);

final var pipeline = registry.jsonPipeline()
final var pipeline = registry.pipeline(MessageFormat.JSON)
.toSink(safeKey)
.build();
```
Expand All @@ -585,19 +591,19 @@ final var consoleSink = new JsonConsoleSink<Map<String, Object>>();
final var compositeSink = new CompositeMessageSink<>(List.of(postgresSink, consoleSink));

// Use in pipeline
final var pipeline = registry.jsonPipeline().toSink(compositeSink).build();
final var pipeline = registry.pipeline(MessageFormat.JSON).toSink(compositeSink).build();
```

---

## Consumer Runner
## KPipe Runner

The `ConsumerRunner` provides a high-level management layer for Kafka consumers, handling lifecycle, metrics, and
graceful shutdown:
The `KPipeRunner` provides a high-level management layer for Kafka consumers, handling lifecycle, metrics, and graceful
shutdown:

```java
// Create a consumer runner with default settings
final var runner = ConsumerRunner.builder(consumer).build();
final var runner = KPipeRunner.builder(consumer).build();

// Start the consumer
runner.start();
Expand All @@ -608,11 +614,11 @@ runner.awaitShutdown();

### Advanced Configuration

The `ConsumerRunner` supports extensive configuration options:
The `KPipeRunner` supports extensive configuration options:

```java
// Create a consumer runner with advanced configuration
final var runner = ConsumerRunner.builder(consumer)
final var runner = KPipeRunner.builder(consumer)
// Configure metrics reporting
.withMetricsReporters(List.of(ConsumerMetricsReporter.forConsumer(consumer::getMetrics)))
.withMetricsInterval(30_000) // Report metrics every 30 seconds
Expand All @@ -629,14 +635,14 @@ final var runner = ConsumerRunner.builder(consumer)
// Configure custom graceful shutdown
.withGracefulShutdown((c, timeoutMs) -> {
log.log(Level.INFO, "Initiating graceful shutdown with timeout: " + timeoutMs + "ms");
return ConsumerRunner.performGracefulConsumerShutdown(c, timeoutMs);
return KPipeRunner.performGracefulConsumerShutdown(c, timeoutMs);
})
.build();
```

### Lifecycle Management

The `ConsumerRunner` manages the complete lifecycle of a consumer:
The `KPipeRunner` manages the complete lifecycle of a consumer:

```java
// Start the consumer (idempotent - safe to call multiple times)
Expand All @@ -654,11 +660,11 @@ runner.close();

### Metrics Integration

The `ConsumerRunner` integrates with metrics reporting:
The `KPipeRunner` integrates with metrics reporting:

```java
// Add multiple metrics reporters
final var runner = ConsumerRunner.builder(consumer)
final var runner = KPipeRunner.builder(consumer)
.withMetricsReporters(
List.of(
ConsumerMetricsReporter.forConsumer(consumer::getMetrics),
Expand All @@ -671,10 +677,10 @@ final var runner = ConsumerRunner.builder(consumer)

### Using with AutoCloseable

The `ConsumerRunner` implements `AutoCloseable` for use with try-with-resources:
The `KPipeRunner` implements `AutoCloseable` for use with try-with-resources:

```java
try (final var runner = ConsumerRunner.builder(consumer).build()) {
try (final var runner = KPipeRunner.builder(consumer).build()) {
runner.start();
// Application logic here
// Runner will be automatically closed when exiting the try block
Expand All @@ -691,7 +697,7 @@ Here's a concise example of a KPipe application:
public class KPipeApp implements AutoCloseable {

private static final System.Logger LOGGER = System.getLogger(KPipeApp.class.getName());
private final ConsumerRunner<KPipeConsumer<byte[], byte[]>> runner;
private final KPipeRunner<KPipeConsumer<byte[], byte[]>> runner;

static void main() {
// Load configuration from environment variables
Expand All @@ -716,9 +722,9 @@ public class KPipeApp implements AutoCloseable {
final var functionalConsumer = KPipeConsumer.<byte[], byte[]>builder()
.withProperties(KafkaConsumerConfig.createConsumerConfig(config.bootstrapServers(), config.consumerGroup()))
.withTopic(config.topic())
.withProcessor(
.withPipeline(
processorRegistry
.jsonPipeline()
.pipeline(MessageFormat.JSON)
.add(RegistryKey.json("addSource"))
.add(RegistryKey.json("markProcessed"))
.add(RegistryKey.json("addTimestamp"))
Expand All @@ -736,7 +742,7 @@ public class KPipeApp implements AutoCloseable {
.build();

// Set up the consumer runner with metrics and shutdown hooks
runner = ConsumerRunner.builder(functionalConsumer)
runner = KPipeRunner.builder(functionalConsumer)
.withMetricsInterval(config.metricsInterval().toMillis())
.withShutdownTimeout(config.shutdownTimeout().toMillis())
.withShutdownHook(true)
Expand Down Expand Up @@ -877,7 +883,7 @@ registry.registerOperator(enrichmentKey,
JsonMessageProcessor.addTimestampOperator("processedAt"));

// Compose them into an optimized pipeline
final var fullPipeline = registry.jsonPipeline()
final var fullPipeline = registry.pipeline(MessageFormat.JSON)
.add(securityKey)
.add(enrichmentKey)
.build();
Expand Down Expand Up @@ -913,7 +919,7 @@ KPipe provides a fluent `when()` operator directly in the `TypedPipelineBuilder`

```java
final var pipeline = registry
.jsonPipeline()
.pipeline(MessageFormat.JSON)
.when(
(map) -> "VIP".equals(map.get("level")),
(map) -> {
Expand Down
2 changes: 1 addition & 1 deletion app/protobuf/src/main/java/org/kpipe/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import org.kpipe.config.AppConfig;
import org.kpipe.config.KafkaConsumerConfig;
import org.kpipe.consumer.ConsumerCommand;
import org.kpipe.consumer.KPipeRunner;
import org.kpipe.consumer.KPipeConsumer;
import org.kpipe.consumer.KPipeRunner;
import org.kpipe.consumer.KafkaOffsetManager;
import org.kpipe.consumer.OffsetManager;
import org.kpipe.health.HttpHealthServer;
Expand Down
3 changes: 3 additions & 0 deletions lib/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/// This module defines the core components of the KPipe library, including
/// configuration, consumers, health checks, metrics, processors, registry,
/// and sinks.
module org.kpipe {
requires com.fasterxml.jackson.core;
requires java.net.http;
Expand Down
17 changes: 8 additions & 9 deletions lib/src/main/java/org/kpipe/consumer/KPipeConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,20 @@
/// Example usage:
///
/// ```java
/// final var pipeline = registry.pipeline(MessageFormat.JSON)
/// .add(sanitizeKey)
/// .toSink(MessageSinkRegistry.JSON_LOGGING)
/// .build();
///
/// final var consumer = KPipeConsumer.<byte[], byte[]>builder()
/// .withProperties(kafkaProps)
/// .withTopic("example-topic")
/// .withProcessor(value -> processValue(value))
/// .withPipeline(pipeline)
/// .withRetry(3, Duration.ofSeconds(1))
/// .withSequentialProcessing(false) // Set to true for ordered processing
/// .withOffsetManagerProvider(consumer -> KafkaOffsetManager.builder(consumer)
/// .withCommitInterval(Duration.ofSeconds(30))
/// .withCommandQueue(commandQueue)
/// .build())
/// .build();
///
/// consumer.start();
/// // Later when finished
/// consumer.close();
/// final var runner = KPipeRunner.builder(consumer).build();
/// runner.start();
/// ```
///
/// @param <K> the type of keys in the consumed records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class ConsumerRunnerTest {
class KPipeRunnerTest {

@Mock
private KPipeConsumer<String, String> mockConsumer;
Expand Down
Loading