// Just add one annotation!
@PublishEvent(eventType = "USER_CREATED")
public User createUser(CreateUserRequest request) {
return userRepository.save(new User(request));
}β Automatically publishes to Kafka + PII masking + DLQ on failure + Metrics collection β¨
// 50+ lines of boilerplate code
@Service
public class UserService {
@Autowired
private KafkaTemplate<String, Object> kafka;
@Autowired
private ObjectMapper objectMapper;
public User createUser(UserRequest request) {
User user = userRepository.save(
new User(request)
);
try {
// Manual event creation
EventEnvelope event = EventEnvelope.builder()
.eventId(UUID.randomUUID().toString())
.eventType("USER_CREATED")
.occurredAt(Instant.now())
.publishedAt(Instant.now())
.metadata(/* ... */)
.payload(/* ... */)
.build();
// Manual PII masking
String json = maskPii(
objectMapper.writeValueAsString(event)
);
// Manual Kafka send with retry
kafka.send("user-events", json)
.get(30, TimeUnit.SECONDS);
} catch (Exception e) {
// Manual error handling
log.error("Failed to publish event", e);
sendToDlq(event);
}
return user;
}
} |
// Just 1 annotation!
@Service
public class UserService {
@PublishEvent(eventType = "USER_CREATED")
public User createUser(UserRequest request) {
return userRepository.save(
new User(request)
);
}
}90% less code Everything handled automatically:
|
No more Kafka boilerplate - just add @PublishEvent annotation. Supports SpEL for flexible payload extraction.
Multi-Topic Support (v0.2.0+):
Route different events to different Kafka topics using the topic attribute:
@PublishEvent(eventType = "ORDER_CREATED", topic = "orders.events")
@PublishEvent(eventType = "STOCK_UPDATED", topic = "inventory.events")All events follow a unified schema with metadata (source, actor, trace, tags).
Main Topic β DLQ β Local File Backup Zero event loss even when Kafka is down for 24 hours.
@PiiField annotation automatically masks/encrypts sensitive data. Supports AWS KMS and HashiCorp Vault for key management.
- Sync mode: ~500 TPS
- Async mode: ~10,000+ TPS (with MDC Context Propagation)
- Transactional Outbox: Guarantees atomicity and consistency.
Replay previously published events via /actuator/curve-outbox endpoint:
- View outbox statistics
- Replay events since a specific timestamp
- Perfect for consumer recovery and testing
Framework-independent core for maximum flexibility.
- Spring Actuator Health Indicator
- Custom metrics endpoint (
/actuator/curve-metrics) - Outbox replay endpoint (
/actuator/curve-outbox) - Detailed event tracking
- Async Context Propagation: MDC (Trace ID) is preserved even in async threads.
- Provides
MockEventProducerfor easy unit/integration testing without Kafka.
Gradle (build.gradle)
dependencies {
implementation 'io.github.closeup1202:curve:0.2.0'
}Maven (pom.xml)
<dependency>
<groupId>io.github.closeup1202</groupId>
<artifactId>curve</artifactId>
<version>0.2.0</version>
</dependency>application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
curve:
enabled: true
kafka:
topic: event.audit.v1
dlq-topic: event.audit.dlq.v1import io.github.closeup1202.curve.spring.audit.annotation.PublishEvent;
import io.github.closeup1202.curve.core.type.EventSeverity;
@Service
public class OrderService {
@PublishEvent(
eventType = "ORDER_CREATED",
severity = EventSeverity.INFO
)
public Order createOrder(OrderRequest request) {
// Your business logic
return orderRepository.save(new Order(request));
}
}docker-compose up -d- Kafka UI: http://localhost:8080
- Health Check: http://localhost:8081/actuator/health/curve
- Metrics: http://localhost:8081/actuator/curve-metrics
Done! π
| Feature | Spring Events | Spring Cloud Stream | Curve |
|---|---|---|---|
| Kafka Integration | β | β | β |
| Declarative Usage | β | β³ | β |
| Standardized Schema | β | β | β |
| PII Protection | β | β | β |
| KMS Integration | β | β | β |
| DLQ Support | β | β | β |
| Local File Backup | β | β | β |
| Health Check | β | β | β |
| Custom Metrics | β | β | β |
| Snowflake ID | β | β | β |
| Transactional Outbox | β | β | β |
| Boilerplate Code | Medium | High | Minimal |
graph TB
A[Domain Layer Core] --> B[Spring Adapter]
A --> C[Kafka Adapter]
B --> D[AOP / Context]
C --> E[Producer / DLQ]
style A fill:#4051b5
style B fill:#00897b
style C fill:#00897b
graph LR
User[User Service] -->|"@PublishEvent"| Curve[Curve Library]
subgraph Curve Library
Context[Context Extractor]
PII[PII Masking]
Outbox[Outbox Saver]
Producer[Kafka Producer]
end
Curve -->|Sync/Async| Kafka[Kafka Topic]
Curve -->|Transaction| DB[(Database)]
DB -->|Polling| Producer
Producer -->|Retry/DLQ| Kafka
curve/
βββ core/ # Pure domain model (framework-independent)
β βββ envelope/ # EventEnvelope, EventMetadata
β βββ port/ # EventProducer, IdGenerator (interfaces)
β βββ context/ # ContextProvider (interfaces)
β βββ validation/ # EventValidator
β βββ exception/ # Domain exceptions
β
βββ spring/ # Spring Framework adapter
β βββ aop/ # @PublishEvent Aspect
β βββ context/ # Spring-based Context Provider implementations
β βββ factory/ # EventEnvelopeFactory
β βββ infrastructure/ # SnowflakeIdGenerator, UtcClockProvider
β βββ publisher/ # AbstractEventPublisher
β βββ test/ # Test utilities (MockEventProducer)
β
βββ kafka/ # Kafka adapter
β βββ producer/ # KafkaEventProducer
β βββ dlq/ # FailedEventRecord
β
βββ kms/ # KMS adapter
β βββ provider/ # AwsKmsProvider, VaultKeyProvider
β βββ autoconfigure/ # KMS auto-configuration
β
βββ spring-boot-autoconfigure/ # Spring Boot auto-configuration
βββ CurveAutoConfiguration # Main configuration
βββ CurveProperties # Configuration properties
βββ health/ # Health indicator & metrics
-
Dependency Inversion Principle (DIP)
- Core module has zero framework dependencies
- External dependencies isolated via Port interfaces
-
Single Responsibility Principle (SRP)
- Each ContextProvider handles one responsibility
- EventValidator validates, EventProducer publishes
-
Open/Closed Principle (OCP)
- EventProducer interface allows non-Kafka brokers
- ContextProvider implementations are replaceable
@PublishEvent(eventType = "USER_LOGIN", severity = INFO)
public User login(String username, String password) {
return authService.authenticate(username, password);
}@PublishEvent(eventType = "ORDER_COMPLETED")
public Order completeOrder(Long orderId) {
Order order = orderRepository.findById(orderId);
order.setStatus(OrderStatus.COMPLETED);
return orderRepository.save(order);
}@PublishEvent(eventType = "CUSTOMER_REGISTERED")
public Customer registerCustomer(CustomerRequest request) {
// Event automatically flows to data lake/warehouse
return customerRepository.save(new Customer(request));
}public class UserEventPayload implements DomainEventPayload {
@PiiField(type = PiiType.EMAIL, strategy = PiiStrategy.MASK)
private String email; // "user@example.com" β "user@***.com"
@PiiField(type = PiiType.PHONE, strategy = PiiStrategy.ENCRYPT)
private String phone; // Encrypted with AES-256-GCM
@PiiField(type = PiiType.NAME, strategy = PiiStrategy.HASH)
private String name; // HMAC-SHA256 hashed
}Supported Strategies:
- MASK: Pattern-based masking (e.g.,
j***@gm***.com) - ENCRYPT: AES-256-GCM encryption (reversible, Base64-encoded 32-byte key required)
- HASH: HMAC-SHA256 hashing (irreversible, salt recommended)
KMS Support:
- AWS KMS: Envelope encryption with DEK caching
- HashiCorp Vault: Centralized key management
Configuration:
curve:
pii:
enabled: true
kms:
enabled: true
type: aws # or vault
aws:
region: us-east-1
default-key-arn: arn:aws:kms:us-east-1:123456789012:key/uuidcurl http://localhost:8081/actuator/health/curveResponse:
{
"status": "UP",
"details": {
"kafkaProducerInitialized": true,
"clusterId": "lkc-abc123",
"nodeCount": 3,
"topic": "event.audit.v1",
"dlqTopic": "event.audit.dlq.v1"
}
}curl http://localhost:8081/actuator/curve-metricsResponse:
{
"summary": {
"totalEventsPublished": 1523,
"successfulEvents": 1520,
"failedEvents": 3,
"successRate": "99.80%",
"totalDlqEvents": 3,
"totalKafkaErrors": 0
},
"events": {
"published": [...],
"publishDuration": [...]
},
"dlq": {...},
"kafka": {...}
}curve:
enabled: true
id-generator:
worker-id: 1 # 0-1023, unique per instance
auto-generate: false
async:
enabled: false # Enable dedicated async executor
core-pool-size: 2
max-pool-size: 10
queue-capacity: 500
kafka:
topic: event.audit.v1
dlq-topic: event.audit.dlq.v1
retries: 3
retry-backoff-ms: 1000
request-timeout-ms: 30000
sync-timeout-seconds: 30
async-mode: false # true for high throughput
async-timeout-ms: 5000
retry:
enabled: true
max-attempts: 3
initial-interval: 1000
multiplier: 2.0
max-interval: 10000
security:
use-forwarded-headers: false # true when behind proxy
pii:
enabled: true
crypto:
default-key: ${PII_ENCRYPTION_KEY} # Base64-encoded 32-byte AES-256 key
salt: ${PII_HASH_SALT}
kms:
enabled: false # Set to true to use KMS
type: aws # aws or vault
outbox:
enabled: true
publisher-enabled: true # false for CDC-based publishing
poll-interval-ms: 1000
batch-size: 100
max-retries: 3
send-timeout-seconds: 10
dynamic-batching-enabled: true
circuit-breaker-enabled: true
cleanup-enabled: true
retention-days: 7
cleanup-cron: "0 0 2 * * *"
serde:
type: JSON # JSON, AVRO, PROTOBUF
# schema-registry-url: http://localhost:8081 # Required when type is AVROIf you want to use Avro serialization (serde.type: AVRO), add these dependencies:
build.gradle:
repositories {
mavenCentral()
maven { url 'https://packages.confluent.io/maven/' }
}
dependencies {
implementation 'org.apache.avro:avro:1.11.4'
implementation 'io.confluent:kafka-avro-serializer:7.5.0'
}Note: JSON serialization works out of the box without additional dependencies.
Development:
spring:
config:
activate:
on-profile: dev
curve:
kafka:
async-mode: true # Fast iteration
topic: event.audit.dev.v1Production:
spring:
config:
activate:
on-profile: prod
curve:
id-generator:
worker-id: ${POD_ORDINAL} # Kubernetes StatefulSet
kafka:
async-mode: false # Reliability first
retries: 5See Configuration Guide for details.
Distributed unique ID generation without collisions.
Structure:
| 42 bits: Timestamp | 10 bits: Worker ID | 12 bits: Sequence |
Capacity:
- Up to 1,024 workers
- 4,096 IDs per millisecond per worker
- Time-sortable
Guarantees atomicity between DB transactions and event publishing.
- Exponential Backoff: Automatically retries failed events with increasing delays (1s, 2s, 4s...) to reduce DB load.
- SKIP LOCKED: Uses pessimistic locking to prevent duplicate processing in multi-instance environments.
@Transactional
@PublishEvent(
eventType = "ORDER_CREATED",
outbox = true,
aggregateType = "Order",
aggregateId = "#result.orderId"
)
public Order createOrder(OrderRequest req) {
return orderRepo.save(new Order(req));
}Extract specific data for the event payload using SpEL.
@PublishEvent(
eventType = "USER_UPDATED",
payload = "#args[0].toEventDto()"
)
public User updateUser(UserUpdateRequest request) {
// ...
}Implement EventProducer interface for non-Kafka brokers:
@Component
public class RabbitMqEventProducer extends AbstractEventPublisher {
private final RabbitTemplate rabbitTemplate;
@Override
protected <T extends DomainEventPayload> void send(EventEnvelope<T> envelope) {
String json = objectMapper.writeValueAsString(envelope);
rabbitTemplate.convertAndSend(exchange, routingKey, json);
}
}# List backup files
./scripts/dlq-recovery.sh --list
# Recover all files
./scripts/dlq-recovery.sh --topic event.audit.v1 --broker localhost:9092
# Recover specific file
./scripts/dlq-recovery.sh --file 1234567890.json --topic event.audit.v1- Performance benchmarks & optimization guide
- Multi-broker publishing (publish to different Kafka clusters per event)
- AWS EventBridge adapter
- Grafana dashboard template
- More PII type presets (SSN, Credit Card, etc.)
- Production-ready release
- Spring Cloud Stream binder
- Avro schema evolution support
- gRPC event streaming
- Multi-cloud KMS support (GCP, Azure)
Have ideas? Vote for features or suggest new ones in GitHub Discussions π‘
| Document | Description |
|---|---|
| Configuration Guide | Detailed configuration options |
| Operations Guide | Production operations and best practices |
| Troubleshooting | Common issues and solutions |
| Monitoring Guide | Metrics, dashboards, and alerting |
| Migration Guide | Version upgrade instructions |
| Changelog | Version history and changes |
| Example Configuration | Configuration examples |
| Sample Application | Full working example |
Join the Curve community:
- π¬ GitHub Discussions - Ask questions, share ideas, get help
- π Issues - Report bugs, request features
- π€ Contributing - Contribution guidelines
Using Curve in production? We'd love to hear your story! Share it in Discussions and get featured here π
Contributions are welcome! Please feel free to submit a Pull Request.
See CONTRIBUTING.md for guidelines.
This project is licensed under the MIT License - see the LICENSE file for details.
- Inspired by Spring Cloud Stream and Spring Kafka
- Built with Spring Boot and Apache Kafka
- Hexagonal Architecture pattern from Alistair Cockburn
- Issues: GitHub Issues
- Email: closeup1202@gmail.com