-
Notifications
You must be signed in to change notification settings - Fork 0
Guide: Message Queues
Stream your SQL Server changes to any message queue platform. This guide shows you how to set up real-time database change notifications using RabbitMQ, Azure Service Bus, or AWS SQS.
Message queues let you build loosely coupled systems where database changes trigger actions across your infrastructure. This means you'll rely on middleware to integrate your data with various platforms:
- Microservices: Notify other services when data changes without direct API calls
- Event Processing: Feed changes to analytics platforms, data warehouses, or ML pipelines
- Workflow Automation: Trigger business processes when specific records are created or updated
- System Integration: Connect SQL Server to cloud services, mobile apps, or third-party platforms
Unlike HTTP webhooks, message queues provide buffering, retry logic, and guaranteed delivery—perfect for high-volume scenarios or unreliable networks.
Pick the message queue that fits your infrastructure:
Best for: On-premises deployments, complex routing patterns, self-managed infrastructure
Choose RabbitMQ if you:
- Need flexible routing with exchanges and routing keys
- Want full control over your messaging infrastructure
- Already run RabbitMQ or have in-house AMQP expertise
- Require message sizes up to 128MB
- Need pattern-based message routing (topic exchanges, fanout)
Best for: Azure-native applications, enterprise integration, managed services
Choose Azure Service Bus if you:
- Run primarily on Azure infrastructure
- Need pub/sub messaging with topics and subscriptions
- Want a fully managed service with SLA guarantees
- Require enterprise-grade security and compliance
- Need integration with Azure ecosystem (Event Grid, Logic Apps)
Best for: AWS-native applications, simple queueing, serverless architectures
Choose AWS SQS if you:
- Run primarily on AWS infrastructure
- Need simple, scalable message queuing
- Want IAM-based security and permissions
- Prefer fully managed services with minimal ops overhead
- Need integration with Lambda, SNS, or other AWS services
Not sure? Start with RabbitMQ for maximum flexibility, or choose the cloud provider you're already using for simpler ops.
Follow these steps to get database changes flowing to your message queue in under 15 minutes.
Before configuring Trignis, set up your message queue infrastructure:
RabbitMQ:
# Create a user for Trignis
rabbitmqctl add_user trignis your-secure-password
rabbitmqctl set_permissions -p / trignis ".*" ".*" ".*"
# Create a queue (or let Trignis create it)
rabbitmqadmin declare queue name=db-changes durable=trueAzure Service Bus:
- Create a Service Bus namespace in the Azure Portal
- Create a queue or topic for database changes
- Generate a Shared Access Policy with "Send" permissions
- Copy the connection string
AWS SQS:
- Create an SQS queue in the AWS Console
- Note the queue URL
- Create an IAM policy with
sqs:SendMessagepermission - Attach the policy to your EC2 instance role or create access keys
Add a message queue endpoint to your environment configuration file (e.g., environments/production.json):
RabbitMQ Example:
{
"ChangeTracking": {
"ApiEndpoints": [
{
"Key": "rabbitmq_customers",
"MessageQueueType": "RabbitMQ",
"MessageQueue": {
"HostName": "rabbitmq.yourcompany.com",
"Port": 5672,
"VirtualHost": "/",
"Username": "trignis",
"Password": "your-secure-password",
"QueueName": "customer-changes"
}
}
]
}
}Azure Service Bus Example:
{
"ChangeTracking": {
"ApiEndpoints": [
{
"Key": "azure_customers",
"MessageQueueType": "AzureServiceBus",
"MessageQueue": {
"ConnectionString": "Endpoint=sb://your-namespace.servicebus.windows.net/;SharedAccessKeyName=SendPolicy;SharedAccessKey=your-key",
"QueueName": "customer-changes"
}
}
]
}
}AWS SQS Example:
{
"ChangeTracking": {
"ApiEndpoints": [
{
"Key": "aws_customers",
"MessageQueueType": "AWSSQS",
"MessageQueue": {
"QueueUrl": "https://sqs.us-east-1.amazonaws.com/123456789012/customer-changes",
"Region": "us-east-1"
}
}
]
}
}Start Trignis and watch for successful message delivery:
# Windows
TrignisBackgroundService.bat test
# Check the logs for confirmation
tail -f log/trignis-*.logLook for log entries like:
[INFO] [production] └─ [MQ] Exported to RabbitMQ queue 'customer-changes'
Set up a consumer to process the messages. Here's a simple example:
Python Consumer (RabbitMQ):
import pika
import json
connection = pika.BlockingConnection(
pika.ConnectionParameters('rabbitmq.yourcompany.com'))
channel = connection.channel()
def callback(ch, method, properties, body):
message = json.loads(body)
print(f"Received {len(message['Data'])} changes")
for change in message['Data']:
operation = change.get('$operation', 'FULL')
print(f" {operation}: {change}")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='customer-changes', on_message_callback=callback)
channel.start_consuming()C# Consumer (Azure Service Bus):
var client = new ServiceBusClient(connectionString);
var processor = client.CreateProcessor("customer-changes");
processor.ProcessMessageAsync += async args =>
{
var body = args.Message.Body.ToString();
var message = JsonSerializer.Deserialize<ChangeMessage>(body);
Console.WriteLine($"Received {message.Data.Count} changes");
await args.CompleteMessageAsync(args.Message);
};
await processor.StartProcessingAsync();Send different tables to different queues using RabbitMQ exchanges:
{
"ChangeTracking": {
"TrackingObjects": [
{
"Name": "Customers",
"TableName": "dbo.Customers",
"ApiEndpointKeys": ["rabbitmq_crm"]
},
{
"Name": "Orders",
"TableName": "dbo.Orders",
"ApiEndpointKeys": ["rabbitmq_fulfillment"]
}
],
"ApiEndpoints": [
{
"Key": "rabbitmq_crm",
"MessageQueueType": "RabbitMQ",
"MessageQueue": {
"HostName": "rabbitmq.company.com",
"Exchange": "database-changes",
"RoutingKey": "crm.customers"
}
},
{
"Key": "rabbitmq_fulfillment",
"MessageQueueType": "RabbitMQ",
"MessageQueue": {
"HostName": "rabbitmq.company.com",
"Exchange": "database-changes",
"RoutingKey": "fulfillment.orders"
}
}
]
}
}Then consumers can subscribe to specific routing patterns:
-
crm.*- Receives all CRM-related changes -
fulfillment.*- Receives all fulfillment-related changes -
*.orders- Receives order changes from any source -
#- Receives everything
Use Azure Service Bus topics to send changes to multiple subscribers:
{
"MessageQueueType": "AzureServiceBus",
"MessageQueue": {
"ConnectionString": "...",
"TopicName": "customer-changes"
}
}Then create multiple subscriptions in Azure:
-
analytics-subscription- For data warehouse updates -
notification-subscription- For customer notifications -
audit-subscription- For compliance logging
Each subscription receives a copy of every message.
For tables with millions of rows, use batching to avoid overwhelming your queue:
{
"ChangeTracking": {
"GlobalSettings": {
"MaxRecordsPerBatch": 100,
"EnablePayloadBatching": true
},
"TrackingObjects": [
{
"Name": "LargeTable",
"InitialSyncMode": "Full",
"TableName": "dbo.BigTransactionTable"
}
]
}
}Trignis automatically splits full syncs into batches. Your consumer receives:
- Message 1: Records 1-100 with
X-Batch-Number: 1, X-Total-Batches: 50 - Message 2: Records 101-200 with
X-Batch-Number: 2, X-Total-Batches: 50 - ...and so on
Before going live, complete these steps:
- Encrypt connection strings with
PWENC:prefix (run Trignis once to encrypt) - Use dedicated service accounts with minimum required permissions
- Enable TLS/SSL for RabbitMQ connections (port 5671)
- Store credentials in Azure Key Vault, AWS Secrets Manager, or environment variables
- Rotate credentials every 90 days
- Enable dead letter queue monitoring
- Set up alerts for circuit breaker opens (via health endpoint)
- Configure message retention appropriate for your SLA
- Test failover scenarios (network issues, queue unavailability)
- Document your message contracts and consumers
- Test with production-scale data volumes
- Tune
PollingIntervalSecondsfor your latency requirements - Enable batching if syncing large tables initially
- Monitor queue depths and consumer lag
- Set up dashboards for message throughput
- Add health check monitoring (
GET /health/connections) - Set up log aggregation (Splunk, ELK, Azure Monitor)
- Create alerts for:
- Circuit breaker opens
- Dead letter threshold exceeded
- Queue depth increases
- Consumer errors
- Document escalation procedures
Here's what happens when a database change occurs:
- Change Detection: SQL Server's change tracking records INSERT/UPDATE/DELETE
-
Polling: Trignis queries for changes every
PollingIntervalSeconds - Transformation: Stored procedure formats changes as JSON
- Compression: Messages over 1KB are gzip-compressed (typically 60-80% reduction)
- Circuit Breaker: Checks if endpoint is healthy (3 failures = circuit opens)
- Delivery: Message sent to queue with retry logic (3 attempts)
- Confirmation: Success logged or failure sent to dead letter queue
Reliability guarantees:
- At-least-once delivery (messages may be delivered multiple times)
- Circuit breaker prevents cascade failures
- Dead letter queue captures undeliverable messages
- Automatic compression handles large payloads
What messages contain:
{
"Metadata": {
"Sync": {
"Version": 12345, // Change tracking version
"Type": "Diff" // "Full" or "Diff"
}
},
"Data": [
{
"$operation": "UPDATE", // INSERT, UPDATE, or DELETE
"$version": 12345, // Version when change occurred
"CustomerId": "C123", // Your actual data fields
"Name": "Updated Name"
}
]
}Plus metadata attributes:
-
CorrelationId: Unique identifier for tracking -
Timestamp: When message was created -
Compressed: Whether body is gzip-compressed -
ContentType:application/jsonorapplication/json+gzip
What it means: Trignis can't reach your message queue server.
Check:
- Is the server running?
rabbitmqctl statusor check Azure/AWS console - Is the hostname correct? Try
ping rabbitmq.yourcompany.com - Is the port accessible?
telnet rabbitmq.yourcompany.com 5672 - Are there firewall rules blocking the connection?
Fix: Update your configuration with the correct hostname/port, or open firewall rules.
What it means: Your credentials are wrong or lack permissions.
Check:
- Azure: Does your policy have "Send" claims? Regenerate the key if needed
- RabbitMQ: Can you log in with
rabbitmqctl authenticate_user trignis password? - AWS: Does your IAM role/user have
sqs:SendMessagepermission?
Fix: Verify credentials, update permissions, or regenerate access keys.
What it means: Your message is too large even after compression.
Options:
-
Enable batching: Split large full syncs into smaller chunks
"MaxRecordsPerBatch": 100, "EnablePayloadBatching": true
-
Reduce batch size: Lower
MaxRecordsPerBatchto 50 or 25 -
Upgrade tier: Use Azure Service Bus Premium (100MB limit) instead of Standard (256KB)
-
Filter data: Modify your stored procedure to return only essential fields
What it means: Three consecutive failures opened the circuit; it retries after 1 minute.
Fix:
- Check the dead letter queue for error details:
GET /health/deadletters - Fix the underlying issue (credentials, network, permissions)
- Wait 60 seconds for automatic retry
- If urgent, restart Trignis service to reset circuit breakers
What it means: Your messages exceeded 1KB, so Trignis compressed them.
How to handle:
if (message.Properties.ContentType == "application/json+gzip")
{
var compressed = Convert.FromBase64String(messageBody);
using var input = new MemoryStream(compressed);
using var gzip = new GZipStream(input, CompressionMode.Decompress);
using var reader = new StreamReader(gzip);
var json = await reader.ReadToEndAsync();
// Now parse json
}Add debug logging to appsettings.json:
{
"Serilog": {
"MinimumLevel": {
"Override": {
"Trignis.MicrosoftSQL.Services.MessageQueueService": "Debug"
}
}
}
}Then restart Trignis and check log/trignis-*.log for detailed information.
Available Properties:
| Property | Required | Default | Notes |
|---|---|---|---|
HostName |
Yes | - | Server hostname or IP |
Port |
No | 5672 | Use 5671 for TLS |
VirtualHost |
No | "/" | Logical isolation |
Username |
No | "guest" | Auth username |
Password |
No | "guest" | Supports PWENC:
|
QueueName |
One of these | - | Direct queue name |
Exchange |
One of these | - | Exchange name |
RoutingKey |
No | "" | Used with Exchange |
Exchange Types:
- Direct: Exact routing key match
-
Topic: Pattern matching (e.g.,
orders.*,*.updates) - Fanout: Broadcast to all queues (no routing key needed)
When to use what:
- Simple workflows → Use
QueueName - Multiple consumers with filtering → Use
Exchange+RoutingKey - Broadcast to everyone → Use
Exchangeonly (fanout)
Available Properties:
| Property | Required | Default | Notes |
|---|---|---|---|
ConnectionString |
Yes | - | Supports PWENC:
|
QueueName |
One of these | - | Point-to-point |
TopicName |
One of these | - | Pub-sub |
Tier Limits:
- Standard: 256KB messages
- Premium: 100MB messages
When to use what:
- Single consumer → Use
QueueName - Multiple consumers → Use
TopicName+ create subscriptions in Azure
Available Properties:
| Property | Required | Default | Notes |
|---|---|---|---|
QueueUrl |
Yes | - | Full queue URL |
Region |
Recommended | - | e.g., "us-east-1" |
AccessKeyId |
No | - | Explicit credentials |
SecretAccessKey |
No | - | Supports PWENC:
|
Authentication:
- Omit credentials → Uses IAM role (recommended for EC2/ECS)
- Provide credentials → Uses access keys (for on-prem or cross-account)
Note: Always provide both AccessKeyId and SecretAccessKey together, or neither.
Check message queue connection health:
curl http://localhost:2455/health/connectionsResponse shows which endpoints are healthy:
{
"totalEndpoints": 3,
"healthyEndpoints": 2,
"unhealthyEndpoints": 1,
"details": {
"rabbitmq_production": {
"isHealthy": true,
"consecutiveFailures": 0
},
"azure_queue": {
"isHealthy": false,
"consecutiveFailures": 5,
"downtimeDuration": "00:05:00"
}
}
}Check failed messages:
curl http://localhost:2455/health/deadlettersResponse shows accumulated failures:
{
"totalDeadLetters": 42,
"last24Hours": 8,
"last7Days": 15
}Set up alerts when last24Hours exceeds your threshold.
Watch for these patterns in log/trignis-*.log:
Success:
[INFO] [production] └─ [MQ] Exported to RabbitMQ queue 'customer-changes'
Circuit breaker opened:
[WARN] Circuit breaker opened for 'rabbitmq_production' for 60s due to: Connection refused
Large message compressed:
[DEBUG] Compressed message from 15234 to 3456 bytes (77.30% reduction)
Dead letter saved:
[WARN] Saved dead letter for Customers (PrimaryDB): Message size exceeds limit
Want changes delivered as fast as possible?
{
"ChangeTracking": {
"GlobalSettings": {
"PollingIntervalSeconds": 5
}
}
}Trade-off: More database load from frequent polling.
Processing millions of rows? Enable batching:
{
"ChangeTracking": {
"GlobalSettings": {
"PollingIntervalSeconds": 60,
"MaxRecordsPerBatch": 1000,
"EnablePayloadBatching": true
}
}
}Trade-off: Higher latency, but better for large datasets.
| Platform | Max Message Size | What Happens When Exceeded |
|---|---|---|
| RabbitMQ | 128 MB | Error → Dead letter queue |
| Azure Service Bus (Standard) | 256 KB | Auto-compress → Error if still too large |
| Azure Service Bus (Premium) | 100 MB | Auto-compress → Error if still too large |
| AWS SQS | 256 KB | Auto-compress → Error if still too large |
Messages over 1KB are automatically compressed (typically 60-80% size reduction).
Now that you have message queues set up:
- Add more tables: Configure additional tracking objects
- Build consumers: Process messages in your downstream services
- Set up monitoring: Add dashboards and alerts
- Test failover: Simulate failures and verify circuit breakers work
- Document: Write runbooks for your team
Need help? Check the main README or open an issue on GitHub.