Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 66 additions & 68 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ registry.registerOperator(sanitizeKey, JsonMessageProcessor.removeFieldsOperator
final var stampKey = RegistryKey.json("stamp");
registry.registerOperator(stampKey, JsonMessageProcessor.addTimestampOperator("processedAt"));

final var pipeline = registry.jsonPipelineBuilder()
.add(sanitizeKey)
.add(stampKey)
final var pipeline = registry.jsonPipeline()
.add(sanitizeKey, stampKey)
.toSink(MessageSinkRegistry.JSON_LOGGING)
.build();

final var consumer = KPipeConsumer.<byte[], byte[]>builder()
Expand Down Expand Up @@ -136,9 +136,11 @@ Unlike traditional pipelines that often perform `byte[] -> Object -> byte[]` at
optimizes for throughput:

- **Single Deserialization**: Messages are deserialized **once** into a mutable representation (e.g., `Map` for JSON,
`GenericRecord` for Avro).
`GenericRecord` for Avro) via the `MessagePipeline`.
- **In-Place Transformations**: A chain of `UnaryOperator` functions is applied to the same object.
- **Single Serialization**: The final object is serialized back to `byte[]` only once before being sent to the sink.
- **Single Serialization**: The final object is serialized back to `byte[]` only once.
- **Integrated Sinks**: Typed sinks can be attached directly to the pipeline, receiving the object before final
serialization.

This approach significantly reduces CPU overhead and GC pressure.

Expand Down Expand Up @@ -326,10 +328,10 @@ registry.registerOperator(envKey,
JsonMessageProcessor.addFieldOperator("environment", "production"));

// Create a high-performance pipeline (single SerDe cycle)
final var pipeline = registry.jsonPipelineBuilder()
final var pipeline = registry.jsonPipeline()
.add(envKey)
.add(uppercaseKey)
.add(MessageProcessorRegistry.JSON_ADD_TIMESTAMP)
.add(RegistryKey.json("addTimestamp"))
.build();

// Use the pipeline with a consumer
Expand Down Expand Up @@ -419,7 +421,7 @@ final var metaKey = RegistryKey.json("addMetadata");
registry.registerOperator(metaKey, JsonMessageProcessor.mergeWithOperator(metadata));

// Build an optimized pipeline (one deserialization -> many transformations -> one serialization)
final var pipeline = registry.jsonPipelineBuilder()
final var pipeline = registry.jsonPipeline()
.add(sanitizeKey)
.add(stampKey)
.add(metaKey)
Expand Down Expand Up @@ -452,23 +454,24 @@ registry.registerOperator(upperKey,

// Build an optimized pipeline
// This pipeline handles deserialization, all operators, and serialization in one pass
final var pipeline = registry.avroPipelineBuilder("user")
final var pipeline = registry.avroPipeline("user")
.add(sanitizeKey)
.add(upperKey)
.add(RegistryKey.avro("addTimestamp_user"))
.build();

// For data with magic bytes (e.g., Confluent Wire Format), specify an offset:
final var confluentPipeline = registry.avroPipelineBuilder("user", 5)
final var confluentPipeline = registry.avroPipeline("user")
.skipBytes(5)
.add(sanitizeKey)
.add(RegistryKey.avro("addTimestamp_user"))
.build();
```

### POJO Processing

For high-performance processing of Java records or POJOs, use the `PojoFormat` and `PojoPipelineBuilder`. This leverages
DSL-JSON annotation processing for near-native performance.
For high-performance processing of Java records or POJOs, use the `PojoFormat` and `TypedPipelineBuilder`. This
leverages DSL-JSON annotation processing for near-native performance.

```java
final var registry = new MessageProcessorRegistry("myApp");
Expand All @@ -478,7 +481,7 @@ final var userKey = RegistryKey.of("userTransform", UserRecord.class);
registry.registerOperator(userKey, user -> new UserRecord(user.id(), user.name().toUpperCase(), user.email()));

// Build an optimized POJO pipeline
final var pipeline = registry.pojoPipelineBuilder(UserRecord.class)
final var pipeline = registry.pojoPipeline(UserRecord.class)
.add(userKey)
.build();
```
Expand All @@ -492,8 +495,8 @@ defines a single method:

```java
@FunctionalInterface
public interface MessageSink<K, V> {
void send(final ConsumerRecord<K, V> record, final V processedValue);
public interface MessageSink<T> {
void accept(final T processedValue);
}
```

Expand All @@ -502,19 +505,14 @@ public interface MessageSink<K, V> {
KPipe provides several built-in sinks:

```java
// Create a JSON console sink
final var jsonConsoleSink = new JsonConsoleSink<>();
// Create a JSON console sink (Map-typed)
final var jsonConsoleSink = new JsonConsoleSink<Map<String, Object>>();

// Create an Avro console sink
final var avroConsoleSink = new AvroConsoleSink<>();
// Create an Avro console sink (GenericRecord-typed)
final var avroConsoleSink = new AvroConsoleSink<GenericRecord>();

// Use a sink with a consumer
final var consumer = KPipeConsumer.<byte[], byte[]>builder()
.withProperties(kafkaProps)
.withTopic("events")
.withProcessor(pipeline)
.withMessageSink(jsonConsoleSink)
.build();
// Use a sink directly in the pipeline
final var pipeline = registry.jsonPipeline().add(RegistryKey.json("sanitize")).toSink(jsonConsoleSink).build();
```

### Custom Sinks
Expand All @@ -523,16 +521,13 @@ You can create custom sinks using lambda expressions:

```java
// Create a custom sink that writes to a database
final MessageSink<byte[], byte[]> databaseSink = (record, processedValue) -> {
final MessageSink<Map<String, Object>> databaseSink = (processedMap) -> {
try {
// Parse the processed value
final var data = new String(processedValue, StandardCharsets.UTF_8);

// Write to database
databaseService.insert(data);
databaseService.insert(processedMap);

// Log success
log.log(Level.INFO, "Successfully wrote message to database: " + record.key());
log.log(Level.INFO, "Successfully wrote message to database: " + processedMap.get("id"));
} catch (Exception e) {
log.log(Level.ERROR, "Failed to write message to database", e);
}
Expand All @@ -548,16 +543,13 @@ The `MessageSinkRegistry` provides a centralized repository for registering and
final var registry = new MessageSinkRegistry();

// Register sinks with explicit types
final var dbKey = RegistryKey.of("database", byte[].class);
registry.register(MessageSinkRegistry.JSON_LOGGING, byte[].class, new JsonConsoleSink<>());
registry.register(dbKey, byte[].class, databaseSink);
final var dbKey = RegistryKey.of("database", Map.class);
registry.register(dbKey, databaseSink);

// Create a pipeline of sinks
final var sinkPipeline = registry.pipeline(byte[].class, MessageSinkRegistry.JSON_LOGGING, dbKey);

// Use the sink pipeline with a consumer
final var consumer = KPipeConsumer.<byte[], byte[]>builder()
.withMessageSink(sinkPipeline)
// Use the sink by key in the pipeline
final var pipeline = registry.jsonPipeline()
.add(RegistryKey.json("enrich"))
.toSink(dbKey)
.build();
```

Expand All @@ -570,10 +562,12 @@ The registry provides utilities for adding error handling to sinks:
final var safeSink = MessageSinkRegistry.withErrorHandling(riskySink);

// Register and use the wrapped sink
final var safeKey = RegistryKey.of("safeDatabase", byte[].class);
registry.register(safeKey, String.class, safeSink);
final var safeKey = RegistryKey.of("safeDatabase", Map.class);
registry.register(safeKey, safeSink);

final var safePipeline = registry.pipeline(String.class, MessageSinkRegistry.JSON_LOGGING, safeKey);
final var pipeline = registry.jsonPipeline()
.toSink(safeKey)
.build();
```

### Composite Sink (Broadcasting)
Expand All @@ -585,13 +579,13 @@ one sink (e.g., a database timeout) do not prevent other sinks from receiving th
// Create multiple sinks
final var postgresSink = new MyPostgresSink();

final var consoleSink = new JsonConsoleSink<byte[], byte[]>();
final var consoleSink = new JsonConsoleSink<Map<String, Object>>();

// Broadcast to both
final var compositeSink = new CompositeMessageSink<>(List.of(postgresSink, consoleSink));

// Use with consumer
final var consumer = KPipeConsumer.<byte[], byte[]>builder().withMessageSink(compositeSink).build();
// Use in pipeline
final var pipeline = registry.jsonPipeline().toSink(compositeSink).build();
```

---
Expand Down Expand Up @@ -724,13 +718,13 @@ public class KPipeApp implements AutoCloseable {
.withTopic(config.topic())
.withProcessor(
processorRegistry
.jsonPipelineBuilder()
.add(MessageProcessorRegistry.JSON_ADD_SOURCE)
.add(MessageProcessorRegistry.JSON_MARK_PROCESSED)
.add(MessageProcessorRegistry.JSON_ADD_TIMESTAMP)
.jsonPipeline()
.add(RegistryKey.json("addSource"))
.add(RegistryKey.json("markProcessed"))
.add(RegistryKey.json("addTimestamp"))
.toSink(MessageSinkRegistry.JSON_LOGGING)
.build()
)
.withMessageSink(sinkRegistry.pipeline(byte[].class, MessageSinkRegistry.JSON_LOGGING))
.withCommandQueue(commandQueue)
.withOffsetManagerProvider((consumer) ->
KafkaOffsetManager.builder(consumer)
Expand Down Expand Up @@ -883,7 +877,7 @@ registry.registerOperator(enrichmentKey,
JsonMessageProcessor.addTimestampOperator("processedAt"));

// Compose them into an optimized pipeline
final var fullPipeline = registry.jsonPipelineBuilder()
final var fullPipeline = registry.jsonPipeline()
.add(securityKey)
.add(enrichmentKey)
.build();
Expand Down Expand Up @@ -915,27 +909,31 @@ registry.registerEnum(Map.class, StandardProcessors.class);

### Conditional Processing

The library provides a built-in `when()` method for conditional processing:
KPipe provides a fluent `when()` operator directly in the `TypedPipelineBuilder`:

```java
// Create a predicate that checks message type
final Predicate<byte[]> isOrderMessage = (bytes) -> {
// Logic to check if it's an order
return true;
};

// Use the built-in conditional processor
Function<byte[], byte[]> conditionalPipeline = MessageProcessorRegistry.when(
isOrderMessage,
registry.jsonPipelineBuilder().add(RegistryKey.json("orderProcessor")).build(),
registry.jsonPipelineBuilder().add(RegistryKey.json("defaultProcessor")).build()
);
final var pipeline = registry
.jsonPipeline()
.when(
(map) -> "VIP".equals(map.get("level")),
(map) -> {
map.put("priority", "high");
return map;
},
(map) -> {
map.put("priority", "low");
return map;
}
)
.build();
```

Alternatively, for `byte[]` level branching, use the static `MessageProcessorRegistry.when()` utility:

### Filtering Messages

To skip a message in a pipeline, return `null` in your processor. KPipe will treat `null` as a signal to stop processing
the current record and will not send it to the sink.
To skip a message in a pipeline, return `null` in your operator. KPipe will treat `null` as a signal to stop processing
the current record and will not send it to any downstream operators or sinks.

```java
registry.registerOperator(RegistryKey.json("filter"), map -> {
Expand Down
Loading
Loading