diff --git a/README.md b/README.md index a41e359..3f4a7d5 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ registry.registerOperator(sanitizeKey, JsonMessageProcessor.removeFieldsOperator final var stampKey = RegistryKey.json("stamp"); registry.registerOperator(stampKey, JsonMessageProcessor.addTimestampOperator("processedAt")); -final var pipeline = registry.jsonPipeline() +final var pipeline = registry.pipeline(MessageFormat.JSON) .add(sanitizeKey, stampKey) .toSink(MessageSinkRegistry.JSON_LOGGING) .build(); @@ -41,12 +41,13 @@ final var pipeline = registry.jsonPipeline() final var consumer = KPipeConsumer.builder() .withProperties(kafkaProps) .withTopic("users") - .withProcessor(pipeline) + .withPipeline(pipeline) .withRetry(3, Duration.ofSeconds(1)) .build(); - // Use the consumer - consumer.start(); +// Use the runner to manage the consumer lifecycle +final var runner = KPipeRunner.builder(consumer).build(); +runner.start(); ``` KPipe handles: @@ -328,7 +329,7 @@ registry.registerOperator(envKey, JsonMessageProcessor.addFieldOperator("environment", "production")); // Create a high-performance pipeline (single SerDe cycle) -final var pipeline = registry.jsonPipeline() +final var pipeline = registry.pipeline(MessageFormat.JSON) .add(envKey) .add(uppercaseKey) .add(RegistryKey.json("addTimestamp")) @@ -338,7 +339,7 @@ final var pipeline = registry.jsonPipeline() final var consumer = KPipeConsumer.builder() .withProperties(kafkaProps) .withTopic("events") - .withProcessor(pipeline) + .withPipeline(pipeline) .withRetry(3, Duration.ofSeconds(1)) .build(); @@ -368,7 +369,7 @@ log.log(Level.INFO, "Time spent paused (ms): " + metrics.get("backpressureTimeMs Configure automatic metrics reporting: ```java -final var runner = ConsumerRunner.builder(consumer) +final var runner = KPipeRunner.builder(consumer) .withMetricsReporters(List.of(ConsumerMetricsReporter.forConsumer(consumer::getMetrics))) .withMetricsInterval(30_000) .build(); @@ -421,7 +422,7 @@ final var metaKey = RegistryKey.json("addMetadata"); registry.registerOperator(metaKey, JsonMessageProcessor.mergeWithOperator(metadata)); // Build an optimized pipeline (one deserialization -> many transformations -> one serialization) -final var pipeline = registry.jsonPipeline() +final var pipeline = registry.pipeline(MessageFormat.JSON) .add(sanitizeKey) .add(stampKey) .add(metaKey) @@ -454,14 +455,15 @@ registry.registerOperator(upperKey, // Build an optimized pipeline // This pipeline handles deserialization, all operators, and serialization in one pass -final var pipeline = registry.avroPipeline("user") +final var avroFormat = ((AvroFormat) MessageFormat.AVRO).withDefaultSchema("user"); +final var pipeline = registry.pipeline(avroFormat) .add(sanitizeKey) .add(upperKey) .add(RegistryKey.avro("addTimestamp_user")) .build(); // For data with magic bytes (e.g., Confluent Wire Format), specify an offset: -final var confluentPipeline = registry.avroPipeline("user") +final var confluentPipeline = registry.pipeline(avroFormat) .skipBytes(5) .add(sanitizeKey) .add(RegistryKey.avro("addTimestamp_user")) @@ -481,7 +483,7 @@ final var userKey = RegistryKey.of("userTransform", UserRecord.class); registry.registerOperator(userKey, user -> new UserRecord(user.id(), user.name().toUpperCase(), user.email())); // Build an optimized POJO pipeline -final var pipeline = registry.pojoPipeline(UserRecord.class) +final var pipeline = registry.pipeline(MessageFormat.pojo(UserRecord.class)) .add(userKey) .build(); ``` @@ -512,7 +514,11 @@ final var jsonConsoleSink = new JsonConsoleSink>(); final var avroConsoleSink = new AvroConsoleSink(); // Use a sink directly in the pipeline -final var pipeline = registry.jsonPipeline().add(RegistryKey.json("sanitize")).toSink(jsonConsoleSink).build(); +final var pipeline = registry + .pipeline(MessageFormat.JSON) + .add(RegistryKey.json("sanitize")) + .toSink(jsonConsoleSink) + .build(); ``` ### Custom Sinks @@ -547,7 +553,7 @@ final var dbKey = RegistryKey.of("database", Map.class); registry.register(dbKey, databaseSink); // Use the sink by key in the pipeline -final var pipeline = registry.jsonPipeline() +final var pipeline = registry.pipeline(MessageFormat.JSON) .add(RegistryKey.json("enrich")) .toSink(dbKey) .build(); @@ -562,10 +568,10 @@ The registry provides utilities for adding error handling to sinks: final var safeSink = MessageSinkRegistry.withErrorHandling(riskySink); // Register and use the wrapped sink -final var safeKey = RegistryKey.of("safeDatabase", Map.class); +final var safeKey = RegistryKey.json("safeDatabase"); registry.register(safeKey, safeSink); -final var pipeline = registry.jsonPipeline() +final var pipeline = registry.pipeline(MessageFormat.JSON) .toSink(safeKey) .build(); ``` @@ -585,19 +591,19 @@ final var consoleSink = new JsonConsoleSink>(); final var compositeSink = new CompositeMessageSink<>(List.of(postgresSink, consoleSink)); // Use in pipeline -final var pipeline = registry.jsonPipeline().toSink(compositeSink).build(); +final var pipeline = registry.pipeline(MessageFormat.JSON).toSink(compositeSink).build(); ``` --- -## Consumer Runner +## KPipe Runner -The `ConsumerRunner` provides a high-level management layer for Kafka consumers, handling lifecycle, metrics, and -graceful shutdown: +The `KPipeRunner` provides a high-level management layer for Kafka consumers, handling lifecycle, metrics, and graceful +shutdown: ```java // Create a consumer runner with default settings -final var runner = ConsumerRunner.builder(consumer).build(); +final var runner = KPipeRunner.builder(consumer).build(); // Start the consumer runner.start(); @@ -608,11 +614,11 @@ runner.awaitShutdown(); ### Advanced Configuration -The `ConsumerRunner` supports extensive configuration options: +The `KPipeRunner` supports extensive configuration options: ```java // Create a consumer runner with advanced configuration -final var runner = ConsumerRunner.builder(consumer) +final var runner = KPipeRunner.builder(consumer) // Configure metrics reporting .withMetricsReporters(List.of(ConsumerMetricsReporter.forConsumer(consumer::getMetrics))) .withMetricsInterval(30_000) // Report metrics every 30 seconds @@ -629,14 +635,14 @@ final var runner = ConsumerRunner.builder(consumer) // Configure custom graceful shutdown .withGracefulShutdown((c, timeoutMs) -> { log.log(Level.INFO, "Initiating graceful shutdown with timeout: " + timeoutMs + "ms"); - return ConsumerRunner.performGracefulConsumerShutdown(c, timeoutMs); + return KPipeRunner.performGracefulConsumerShutdown(c, timeoutMs); }) .build(); ``` ### Lifecycle Management -The `ConsumerRunner` manages the complete lifecycle of a consumer: +The `KPipeRunner` manages the complete lifecycle of a consumer: ```java // Start the consumer (idempotent - safe to call multiple times) @@ -654,11 +660,11 @@ runner.close(); ### Metrics Integration -The `ConsumerRunner` integrates with metrics reporting: +The `KPipeRunner` integrates with metrics reporting: ```java // Add multiple metrics reporters -final var runner = ConsumerRunner.builder(consumer) +final var runner = KPipeRunner.builder(consumer) .withMetricsReporters( List.of( ConsumerMetricsReporter.forConsumer(consumer::getMetrics), @@ -671,10 +677,10 @@ final var runner = ConsumerRunner.builder(consumer) ### Using with AutoCloseable -The `ConsumerRunner` implements `AutoCloseable` for use with try-with-resources: +The `KPipeRunner` implements `AutoCloseable` for use with try-with-resources: ```java -try (final var runner = ConsumerRunner.builder(consumer).build()) { +try (final var runner = KPipeRunner.builder(consumer).build()) { runner.start(); // Application logic here // Runner will be automatically closed when exiting the try block @@ -691,7 +697,7 @@ Here's a concise example of a KPipe application: public class KPipeApp implements AutoCloseable { private static final System.Logger LOGGER = System.getLogger(KPipeApp.class.getName()); - private final ConsumerRunner> runner; + private final KPipeRunner> runner; static void main() { // Load configuration from environment variables @@ -716,9 +722,9 @@ public class KPipeApp implements AutoCloseable { final var functionalConsumer = KPipeConsumer.builder() .withProperties(KafkaConsumerConfig.createConsumerConfig(config.bootstrapServers(), config.consumerGroup())) .withTopic(config.topic()) - .withProcessor( + .withPipeline( processorRegistry - .jsonPipeline() + .pipeline(MessageFormat.JSON) .add(RegistryKey.json("addSource")) .add(RegistryKey.json("markProcessed")) .add(RegistryKey.json("addTimestamp")) @@ -736,7 +742,7 @@ public class KPipeApp implements AutoCloseable { .build(); // Set up the consumer runner with metrics and shutdown hooks - runner = ConsumerRunner.builder(functionalConsumer) + runner = KPipeRunner.builder(functionalConsumer) .withMetricsInterval(config.metricsInterval().toMillis()) .withShutdownTimeout(config.shutdownTimeout().toMillis()) .withShutdownHook(true) @@ -877,7 +883,7 @@ registry.registerOperator(enrichmentKey, JsonMessageProcessor.addTimestampOperator("processedAt")); // Compose them into an optimized pipeline -final var fullPipeline = registry.jsonPipeline() +final var fullPipeline = registry.pipeline(MessageFormat.JSON) .add(securityKey) .add(enrichmentKey) .build(); @@ -913,7 +919,7 @@ KPipe provides a fluent `when()` operator directly in the `TypedPipelineBuilder` ```java final var pipeline = registry - .jsonPipeline() + .pipeline(MessageFormat.JSON) .when( (map) -> "VIP".equals(map.get("level")), (map) -> { diff --git a/app/protobuf/src/main/java/org/kpipe/App.java b/app/protobuf/src/main/java/org/kpipe/App.java index 764c2b0..e527bb2 100644 --- a/app/protobuf/src/main/java/org/kpipe/App.java +++ b/app/protobuf/src/main/java/org/kpipe/App.java @@ -15,8 +15,8 @@ import org.kpipe.config.AppConfig; import org.kpipe.config.KafkaConsumerConfig; import org.kpipe.consumer.ConsumerCommand; -import org.kpipe.consumer.KPipeRunner; import org.kpipe.consumer.KPipeConsumer; +import org.kpipe.consumer.KPipeRunner; import org.kpipe.consumer.KafkaOffsetManager; import org.kpipe.consumer.OffsetManager; import org.kpipe.health.HttpHealthServer; diff --git a/lib/src/main/java/module-info.java b/lib/src/main/java/module-info.java index 98db415..bda6016 100644 --- a/lib/src/main/java/module-info.java +++ b/lib/src/main/java/module-info.java @@ -1,3 +1,6 @@ +/// This module defines the core components of the KPipe library, including +/// configuration, consumers, health checks, metrics, processors, registry, +/// and sinks. module org.kpipe { requires com.fasterxml.jackson.core; requires java.net.http; diff --git a/lib/src/main/java/org/kpipe/consumer/KPipeConsumer.java b/lib/src/main/java/org/kpipe/consumer/KPipeConsumer.java index 74f8ee6..ad56fd8 100644 --- a/lib/src/main/java/org/kpipe/consumer/KPipeConsumer.java +++ b/lib/src/main/java/org/kpipe/consumer/KPipeConsumer.java @@ -48,21 +48,20 @@ /// Example usage: /// /// ```java +/// final var pipeline = registry.pipeline(MessageFormat.JSON) +/// .add(sanitizeKey) +/// .toSink(MessageSinkRegistry.JSON_LOGGING) +/// .build(); +/// /// final var consumer = KPipeConsumer.builder() /// .withProperties(kafkaProps) /// .withTopic("example-topic") -/// .withProcessor(value -> processValue(value)) +/// .withPipeline(pipeline) /// .withRetry(3, Duration.ofSeconds(1)) -/// .withSequentialProcessing(false) // Set to true for ordered processing -/// .withOffsetManagerProvider(consumer -> KafkaOffsetManager.builder(consumer) -/// .withCommitInterval(Duration.ofSeconds(30)) -/// .withCommandQueue(commandQueue) -/// .build()) /// .build(); /// -/// consumer.start(); -/// // Later when finished -/// consumer.close(); +/// final var runner = KPipeRunner.builder(consumer).build(); +/// runner.start(); /// ``` /// /// @param the type of keys in the consumed records diff --git a/lib/src/test/java/org/kpipe/consumer/ConsumerRunnerTest.java b/lib/src/test/java/org/kpipe/consumer/KPipeRunnerTest.java similarity index 99% rename from lib/src/test/java/org/kpipe/consumer/ConsumerRunnerTest.java rename to lib/src/test/java/org/kpipe/consumer/KPipeRunnerTest.java index d5796e1..a5018c5 100644 --- a/lib/src/test/java/org/kpipe/consumer/ConsumerRunnerTest.java +++ b/lib/src/test/java/org/kpipe/consumer/KPipeRunnerTest.java @@ -15,7 +15,7 @@ import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) -class ConsumerRunnerTest { +class KPipeRunnerTest { @Mock private KPipeConsumer mockConsumer;