diff --git a/EasyNetQ.HostedService.sln b/EasyNetQ.HostedService.sln index b139c33..1e32e2e 100755 --- a/EasyNetQ.HostedService.sln +++ b/EasyNetQ.HostedService.sln @@ -5,6 +5,8 @@ VisualStudioVersion = 15.0.26124.0 MinimumVisualStudioVersion = 15.0.26124.0 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EasyNetQ.HostedService", "src\EasyNetQ.HostedService\EasyNetQ.HostedService.csproj", "{BDDC417C-1EAB-44D9-B2E9-6FE6F601B949}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EasyNetQ.HostedService.TestApp", "tests\EasyNetQ.HostedService.TestApp\EasyNetQ.HostedService.TestApp.csproj", "{20DA595B-5F46-44E4-A24A-6841EB010D18}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -30,5 +32,17 @@ Global {BDDC417C-1EAB-44D9-B2E9-6FE6F601B949}.Release|x64.Build.0 = Release|Any CPU {BDDC417C-1EAB-44D9-B2E9-6FE6F601B949}.Release|x86.ActiveCfg = Release|Any CPU {BDDC417C-1EAB-44D9-B2E9-6FE6F601B949}.Release|x86.Build.0 = Release|Any CPU + {20DA595B-5F46-44E4-A24A-6841EB010D18}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {20DA595B-5F46-44E4-A24A-6841EB010D18}.Debug|Any CPU.Build.0 = Debug|Any CPU + {20DA595B-5F46-44E4-A24A-6841EB010D18}.Debug|x64.ActiveCfg = Debug|Any CPU + {20DA595B-5F46-44E4-A24A-6841EB010D18}.Debug|x64.Build.0 = Debug|Any CPU + {20DA595B-5F46-44E4-A24A-6841EB010D18}.Debug|x86.ActiveCfg = Debug|Any CPU + {20DA595B-5F46-44E4-A24A-6841EB010D18}.Debug|x86.Build.0 = Debug|Any CPU + {20DA595B-5F46-44E4-A24A-6841EB010D18}.Release|Any CPU.ActiveCfg = Release|Any CPU + {20DA595B-5F46-44E4-A24A-6841EB010D18}.Release|Any CPU.Build.0 = Release|Any CPU + {20DA595B-5F46-44E4-A24A-6841EB010D18}.Release|x64.ActiveCfg = Release|Any CPU + {20DA595B-5F46-44E4-A24A-6841EB010D18}.Release|x64.Build.0 = Release|Any CPU + {20DA595B-5F46-44E4-A24A-6841EB010D18}.Release|x86.ActiveCfg = Release|Any CPU + {20DA595B-5F46-44E4-A24A-6841EB010D18}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection EndGlobal diff --git a/doc/EasyNetQ.HostedService.xml b/doc/EasyNetQ.HostedService.xml index dc09110..1d923ce 100644 --- a/doc/EasyNetQ.HostedService.xml +++ b/doc/EasyNetQ.HostedService.xml @@ -750,7 +750,7 @@ - + @@ -790,9 +790,14 @@ . + + + The initialized that is exposed to subclasses of . + + - The initialized that is exposed to subclasses of . + The initialized that is exposed to subclasses of . @@ -904,5 +909,250 @@ + + + A convenience type to help register a handler for start/end events, emitted by the + library. + + + + + Subscribes handlers for the start/end events, emitted + by an . + + + + + + + Subscribes handlers for the start/end events, emitted + by an . + + This variant subscribes to all start/end events, irrespective from the actual source + within the library. + + + + + + + The keys that describe the activity names being emitted through instances of . + + + + + The key used when starting a new in a . + + + + + The key used when starting a new in a . + + + + + The keys that describe the activity tag names being emitted through instances of . + + + + + The tag name for the exchange where a RabbitMQ message is being published. + + + + + The tag name for the rouging key used for a RabbitMQ message that is being published. + + + + + The tag name for the mandatory boolean value, used when a RabbitMQ message is being published. + + + + + The tag name for the headers of a RabbitMQ message that is being consumed or published. + + + + + The tag name for the correlation id of a RabbitMQ message that is being consumed. + + + + + The tag name for the delivery tag of a RabbitMQ message that is being consumed. + + + + + The tag name for the redelivered flag of a RabbitMQ message that is being consumed. + + + + + The tag name used when an exception occurs in an , in a + . + + + + + The keys that describe what kind of event is being emitted through instances of + and . + + + + + The key for all emitted logs, through the emitting . + + + + + The key used when cancelling an in a . + + + + + The key used when an exception occurs in an , in a . + + + + + The type of log emitted from an instance of . + + + + + Creates a new . + + + + + + + + The log's . + + + + + The log's message. + + + + + The log's , if it's an error. + + + + + A convenience type to help register a handler for log messages, emitted by the library. + + + + + Subscribes an as a handler for the log events, emitted + by a . + + + The handler for the log events, emitted by a . + + + + + + Subscribes an as a handler for the log events, emitted + by a . + + This variant subscribes to all log messages, irrespective from the actual source within the library. + + + The handler for the log events, emitted by a . + + + + + + The class that emits instances of , through an instance of . + + + + + Creates a new . + + + + + + Logs a message using the Trace . + + + + + + Logs a message using the Debug . + + + + + + Logs a message using the Information . + + + + + + Logs a message using the Error . + + + + + + + Logs a message using the Critical . + + + + + + + This is the core logging method, used by the public one's + + It checks whether there are any listeners for the corresponding and if there are, + it emits a instance. + + + + + + + + Holds the keys for creating instances of and . + + + The type of the derived , which implements a consumer or a producer. + + + + + The key for a , through which logs are emitted. + + + + + The key for an , through which consumer and producers emit events on receiving + and publishing a message, respectively. + + + + + Holds the key prefix for all keys in . + + + + + The key prefix for all keys in . + + diff --git a/package-hashes.txt b/package-hashes.txt index 1ebc6f9..874a6ba 100755 --- a/package-hashes.txt +++ b/package-hashes.txt @@ -1 +1 @@ -SHA512 EasyNetQ.HostedService.0.5.0.nupkg 45fc7d5fb4ed2ad1c3f7e7d06fc9a00925e4f1e75a76ce8008b74185d4914b3df8a6be701080c371b78d3ee578e8722688cdb18524ddee204096e724c59876c2 +SHA512 EasyNetQ.HostedService.0.6.0.nupkg f573603fba2d43dd5e9a8dda7f0ed31c6f15ece085e29fe1607e566812222d5c99138ad569b06c9ad73d345ccb8c8319be1d0baa534dadb6fbe7509f1daacf6e diff --git a/src/EasyNetQ.HostedService/EasyNetQ.HostedService.csproj b/src/EasyNetQ.HostedService/EasyNetQ.HostedService.csproj index 4fe22db..e724d63 100755 --- a/src/EasyNetQ.HostedService/EasyNetQ.HostedService.csproj +++ b/src/EasyNetQ.HostedService/EasyNetQ.HostedService.csproj @@ -9,7 +9,7 @@ EasyNetQ.HostedService A wrapper around EasyNetQ (https://easynetq.com/) to provide an easy API for a .NET Core hosted service. - 0.5.0 + 0.6.0 Spyridon Alfredos Desyllas https://github.com/sadesyllas/EasyNetQ.HostedService https://github.com/sadesyllas/EasyNetQ.HostedService/blob/master/LICENSE @@ -20,7 +20,7 @@ - + diff --git a/src/EasyNetQ.HostedService/Internals/HandlerRegistrar.cs b/src/EasyNetQ.HostedService/Internals/HandlerRegistrar.cs index e31e8fb..9b3fbfb 100644 --- a/src/EasyNetQ.HostedService/Internals/HandlerRegistrar.cs +++ b/src/EasyNetQ.HostedService/Internals/HandlerRegistrar.cs @@ -1,18 +1,25 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; using System.Threading; using EasyNetQ.Consumer; using EasyNetQ.HostedService.Abstractions; +using EasyNetQ.HostedService.Tracing; namespace EasyNetQ.HostedService.Internals { - internal sealed class HandlerRegistrar : IHandlerRegistration + internal sealed class HandlerRegistrar : IHandlerRegistration { private readonly IHandlerRegistration _handlers; private readonly IIncomingMessageInterceptor _incomingMessageInterceptor; + private readonly ActivitySource _activitySource; - internal HandlerRegistrar(IHandlerRegistration handlers, IIncomingMessageInterceptor incomingMessageInterceptor) + internal HandlerRegistrar(IHandlerRegistration handlers, IIncomingMessageInterceptor incomingMessageInterceptor, + ActivitySource activitySource) { _handlers = handlers; _incomingMessageInterceptor = incomingMessageInterceptor; + _activitySource = activitySource; } public IHandlerRegistration Add(IMessageHandler handler) @@ -25,7 +32,50 @@ public IHandlerRegistration Add(IMessageHandler handler) await _incomingMessageInterceptor.InterceptMessage(message, messageReceivedInfo, cancellationToken); } - return await handler(message, messageReceivedInfo, cancellationToken); + var maybeTraceId = message?.Properties.Headers?["X-TRACE-ID"]; + var traceId = maybeTraceId is string ? (string)maybeTraceId : null; + + using (var activity = + _activitySource.StartActivity($"{typeof(TConsumer).FullName} receive", ActivityKind.Consumer, + // ReSharper disable once AssignNullToNotNullAttribute + traceId)) + { + if (activity != null) + { + activity + .AddTag("messaging.system", "rabbitmq") + .AddTag("messaging.operation", "receive") + .AddTag("messaging.rabbitmq.routing_key", messageReceivedInfo.RoutingKey) + .AddTag("x-messaging.rabbitmq.correlation_id", message?.Properties.CorrelationId) + .AddTag("x-messaging.rabbitmq.delivery_tag", messageReceivedInfo.DeliveryTag) + .AddTag("x-messaging.rabbitmq.redelivered", messageReceivedInfo.Redelivered) + .AddTag("x-messaging.rabbitmq.headers", message?.Properties.Headers); + + if ((message?.Properties.Headers?.ContainsKey("X-MESSAGE-ID")).GetValueOrDefault()) + { + activity.AddTag("messaging.message_id", message?.Properties.Headers?["X-MESSAGE-ID"]); + } + } + + try + { + return await handler(message, messageReceivedInfo, cancellationToken); + } + catch (Exception exception) + { + if (activity != null) + { + activity.AddEvent(new ActivityEvent(TraceEventName.Exception, + tags: new ActivityTagsCollection( + new[] + { + new KeyValuePair("exception", exception) + }))); + } + + throw; + } + } }); } diff --git a/src/EasyNetQ.HostedService/RabbitMqConsumer.cs b/src/EasyNetQ.HostedService/RabbitMqConsumer.cs index 37111a1..99d6307 100755 --- a/src/EasyNetQ.HostedService/RabbitMqConsumer.cs +++ b/src/EasyNetQ.HostedService/RabbitMqConsumer.cs @@ -10,8 +10,8 @@ using EasyNetQ.HostedService.DependencyInjection; using EasyNetQ.HostedService.Internals; using EasyNetQ.HostedService.Models; +using EasyNetQ.HostedService.Tracing; using EasyNetQ.Internals; -using Microsoft.Extensions.Logging; using Newtonsoft.Json; using RabbitMQ.Client; @@ -102,8 +102,7 @@ namespace EasyNetQ.HostedService /// public abstract partial class RabbitMqConsumer : RabbitMqService { - private IDisposable _startConsumingDisposable; - private List _startConsumingEventSubscriptions = new List(); + private readonly List _startConsumingEventSubscriptions = new List(); /// /// @@ -148,9 +147,13 @@ protected virtual void OnStartConsumingEvent(StartConsumingFailedEvent @event) { var model = ExtractModelFromInternalConsumer(@event.Consumer, Logger); - Logger?.LogCritical( - $"Failed to consume from {@event.Queue.Name} ({@event.Queue.Arguments}) with " + - $"code {model?.CloseReason.ReplyCode} and reason {model?.CloseReason.ReplyText}."); + // If `model` is `null`, then there is no open channel with RabbitMQ. + if (model != null) + { + Logger?.LogCritical( + $"Failed to consume from {@event.Queue.Name} ({@event.Queue.Arguments}) with " + + $"code {model.CloseReason.ReplyCode} and reason {model.CloseReason.ReplyText}."); + } } /// @@ -163,27 +166,7 @@ protected override void InitializeConsumer(CancellationToken cancellationToken) SubscribeToStartConsumingEvent(OnStartConsumingEvent); - _startConsumingDisposable = StartConsuming(cancellationToken); - - AddDisposable(_startConsumingDisposable); - - Bus.Connected += (sender, args) => - { - try - { - _startConsumingDisposable?.Dispose(); - } - catch (Exception exception) - { - Logger?.LogError( - $"Could not dispose of {nameof(_startConsumingDisposable)} in {nameof(RabbitMqConsumer)}: " + - $"{exception.Message}\n{exception.StackTrace}"); - } - - _startConsumingDisposable = StartConsuming(cancellationToken); - - AddDisposable(_startConsumingDisposable); - }; + AddDisposable(StartConsuming(cancellationToken)); } private void SubscribeToStartConsumingEvent(Action eventHandler) @@ -213,7 +196,8 @@ private IDisposable StartConsuming(CancellationToken cancellationToken) { try { - RegisterMessageHandlers(new HandlerRegistrar(handlers, IncomingMessageInterceptor)); + RegisterMessageHandlers(new HandlerRegistrar(handlers, IncomingMessageInterceptor, + ActivitySource)); } catch (Exception exception) { @@ -239,7 +223,7 @@ private IDisposable StartConsuming(CancellationToken cancellationToken) } // TODO: THIS USE OF REFLECTION IS FRAGILE AND MUST BE RECONSIDERED - private static IModel ExtractModelFromInternalConsumer(IConsumer consumer, ILogger logger) + private static IModel ExtractModelFromInternalConsumer(IConsumer consumer, TraceLogWriter logger) { IModel model = null; var consumerType = consumer.GetType(); @@ -249,10 +233,10 @@ private static IModel ExtractModelFromInternalConsumer(IConsumer consumer, ILogg if (internalConsumersField != null) { - var internalConsumers = (ConcurrentSet) internalConsumersField.GetValue(consumer); + var internalConsumers = (ConcurrentSet)internalConsumersField.GetValue(consumer); // ReSharper disable once AssignNullToNotNullAttribute // ReSharper disable once ConstantConditionalAccessQualifier - model = ((InternalConsumer) internalConsumers.FirstOrDefault())?.Model; + model = ((InternalConsumer)internalConsumers.FirstOrDefault())?.Model; } var internalConsumerField = @@ -260,9 +244,9 @@ private static IModel ExtractModelFromInternalConsumer(IConsumer consumer, ILogg if (internalConsumerField != null) { - var internalConsumer = (IInternalConsumer) internalConsumerField.GetValue(consumer); + var internalConsumer = (IInternalConsumer)internalConsumerField.GetValue(consumer); // ReSharper disable once ConstantConditionalAccessQualifier - model = ((InternalConsumer) internalConsumer)?.Model; + model = ((InternalConsumer)internalConsumer)?.Model; } if (model == null) diff --git a/src/EasyNetQ.HostedService/RabbitMqProducer.cs b/src/EasyNetQ.HostedService/RabbitMqProducer.cs index 4a05450..e3ebf57 100755 --- a/src/EasyNetQ.HostedService/RabbitMqProducer.cs +++ b/src/EasyNetQ.HostedService/RabbitMqProducer.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; @@ -8,8 +9,8 @@ using EasyNetQ.Consumer; using EasyNetQ.HostedService.DependencyInjection; using EasyNetQ.HostedService.Models; +using EasyNetQ.HostedService.Tracing; using EasyNetQ.Topology; -using Microsoft.Extensions.Logging; using Newtonsoft.Json; using RabbitMQ.Client.Exceptions; @@ -101,24 +102,44 @@ public virtual Task PublishAsync( bool mandatory = false, IDictionary headers = null) { + var tracingActivity = + // ReSharper disable once AssignNullToNotNullAttribute + ActivitySource.StartActivity($"{typeof(T).FullName} send", ActivityKind.Producer, Activity.Current?.Id); + if (_cancellationToken.IsCancellationRequested) { + if (tracingActivity != null) + { + tracingActivity.AddEvent(new ActivityEvent(TraceEventName.Cancelled)); + + tracingActivity.Dispose(); + } + return Task.FromResult(PublishResult.NotPublished); } if (exchange == null) { - throw new ArgumentException("The exchange must not be null."); + AdornActivityWithTags(tracingActivity, null, routingKey, mandatory, headers, null); + + DisposeActivityAndThrowException(tracingActivity, + new ArgumentException("The exchange must not be null.")); } if (routingKey == null) { - throw new ArgumentException("The routing key must not be null."); + AdornActivityWithTags(tracingActivity, exchange, null, mandatory, headers, null); + + DisposeActivityAndThrowException(tracingActivity, + new ArgumentException("The routing key must not be null.")); } if (payload == null) { - throw new ArgumentException("The payload must not be null."); + AdornActivityWithTags(tracingActivity, exchange, routingKey, mandatory, headers, null); + + DisposeActivityAndThrowException(tracingActivity, + new ArgumentException("The payload must not be null.")); } var payloadBytes = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(payload)); @@ -126,6 +147,13 @@ public virtual Task PublishAsync( var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + if (tracingActivity != null) + { + headers = headers ?? new Dictionary(); + + headers.Add("X-TRACE-ID", tracingActivity.Id); + } + var message = new Message { Exchange = exchange, @@ -135,6 +163,7 @@ public virtual Task PublishAsync( Type = typeof(TMessage), Headers = headers, TaskCompletionSource = taskCompletionSource, + TracingActivity = tracingActivity }; EnqueueMessage(message); @@ -194,11 +223,13 @@ private protected virtual void OnCancellation() } private void StartProducerLoop(CancellationToken cancellationToken) => + // ReSharper disable once AsyncVoidLambda new Thread(async () => { while (true) { Message message = null; + var success = false; try { @@ -226,7 +257,7 @@ private void StartProducerLoop(CancellationToken cancellationToken) => }; #if LOG_DEBUG_RABBITMQ_PRODUCER_PUBLISHED_MESSAGES - Logger?.LogDebug( + Logger.LogDebug( $"Publishing message to exchange {message.Exchange} " + $"({GetMessageInformation(message)}) with routing key {message.RoutingKey} and payload " + $"{Encoding.UTF8.GetString(message.Payload)}."); @@ -238,7 +269,10 @@ await OutgoingMessageInterceptor.InterceptMessage(message.Payload, message.Type, message.Headers, cancellationToken); } - Bus.Publish( + AdornActivityWithTags(message.TracingActivity, message.Exchange, message.RoutingKey, + message.Mandatory, message.Headers, message.Payload); + + await Bus.PublishAsync( exchange, message.RoutingKey, message.Mandatory, @@ -249,56 +283,71 @@ await OutgoingMessageInterceptor.InterceptMessage(message.Payload, message.Type, message.TaskCompletionSource.SetResult(PublishResult.Published); _messages.TryDequeue(out _); + + success = true; } } - catch (OperationCanceledException) + catch (OperationCanceledException exception) { - Logger?.LogDebug($"Stopping producer loop with configuration {RabbitMqConfig.Id}."); + if (exception.CancellationToken == cancellationToken) + { + Logger?.LogDebug($"Stopping producer loop with configuration {RabbitMqConfig.Id}."); + + return; + } - return; + var error = + "Operation cancelled while waiting for message to be confirmed with configuration" + + $"{RabbitMqConfig.Id} ({GetMessageInformation(message)})"; + + FailAndDiscardMessage(message, new Exception(error, exception)); } - catch (TimeoutException) + catch (TimeoutException exception) { - Logger?.LogError( - $"Timeout occured while trying to publish with configuration " + - $"{RabbitMqConfig.Id} ({GetMessageInformation(message)})"); + var error = + "Timeout occured while trying to publish with configuration " + + $"{RabbitMqConfig.Id} ({GetMessageInformation(message)})"; - ProducerLoopWaitAndContinue(cancellationToken); + FailAndDiscardMessage(message, new Exception(error, exception)); } catch (AlreadyClosedException exception) { - Logger?.LogError( + var error = $"AMQP error in producer loop: {exception.Message}\n{exception.StackTrace}\n" + - $"({GetMessageInformation(message)})"); + $"({GetMessageInformation(message)})"; - // 404 - NOT FOUND means that the message cannot be processed at all at this point and it's - // best to notify the library's client and discard it - if (exception.ShutdownReason?.ReplyCode == 404) - { - ProducerLoopDiscardMessageAndContinue(message); - } + FailAndDiscardMessage(message, new Exception(error, exception)); } catch (Exception exception) { - Logger?.LogCritical( + var error = $"Critical error in producer loop: {exception.Message}\n{exception.StackTrace}\n" + - $"({GetMessageInformation(message)})"); + $"({GetMessageInformation(message)})"; - ProducerLoopWaitAndContinue(cancellationToken); + FailAndDiscardMessage(message, new Exception(error, exception)); + } + finally + { + if (success) + { + message.TracingActivity?.Dispose(); + } } } }).Start(); - private void ProducerLoopWaitAndContinue(CancellationToken cancellationToken) + private void FailAndDiscardMessage(Message message, Exception exception) { - Task.Delay(RabbitMqConfig.PublisherLoopErrorBackOffMilliseconds, cancellationToken).Wait(cancellationToken); + if (message?.TracingActivity != null) + { + message.TracingActivity.AddEvent(new ActivityEvent(TraceEventName.Exception, + tags: new ActivityTagsCollection(new[] + { new KeyValuePair("exception", exception) }))); - _messageSemaphore.Release(); - } + message.TracingActivity.Dispose(); + } - private void ProducerLoopDiscardMessageAndContinue(Message message) - { - message?.TaskCompletionSource.SetResult(PublishResult.NotPublished); + message?.TaskCompletionSource.SetException(exception); _messages.TryDequeue(out _); } @@ -321,12 +370,56 @@ private protected sealed class Message public IDictionary Headers { get; set; } = null; public Type Type { get; set; } = null; public TaskCompletionSource TaskCompletionSource { get; set; } = null; + public Activity TracingActivity { get; set; } // ReSharper restore RedundantDefaultMemberInitializer public override string ToString() => $"exchange: {Exchange}, routing key: {RoutingKey}, publisher confirms: mandatory: {Mandatory}"; } + + private void AdornActivityWithTags(Activity activity, string exchange, string routingKey, bool mandatory, + IDictionary headers, byte[] payload) + { + if (activity != null) + { + activity + .AddTag("messaging.system", "rabbitmq") + .AddTag("messaging.operation", "send") + .AddTag("messaging.destination", exchange) + .AddTag("messaging.message_payload_size_bytes", payload?.Length) + .AddTag("messaging.rabbitmq.routing_key", routingKey) + // TODO: add configuration to allow including the payload + // .AddTag("messaging.message_payload", Encoding.UTF8.GetString(payload)) + .AddTag("x-messaging.rabbitmq.mandatory", mandatory); + + if (headers != null) + { + if (headers.ContainsKey("X-MESSAGE-ID")) + { + activity.AddTag("messaging.message_id", headers["X-MESSAGE-ID"]); + } + + activity.AddTag("x-messaging.rabbitmq.headers", headers); + } + } + } + + private void DisposeActivityAndThrowException(Activity activity, Exception exception) + { + if (activity == null) + { + throw exception; + } + + activity.AddEvent(new ActivityEvent(TraceEventName.Exception, + tags: new ActivityTagsCollection( + new[] { new KeyValuePair("exception", exception) }))); + + activity.Dispose(); + + throw exception; + } } #region Consumer Implementation diff --git a/src/EasyNetQ.HostedService/RabbitMqService.cs b/src/EasyNetQ.HostedService/RabbitMqService.cs index fd7a276..9ec8b56 100755 --- a/src/EasyNetQ.HostedService/RabbitMqService.cs +++ b/src/EasyNetQ.HostedService/RabbitMqService.cs @@ -7,11 +7,12 @@ using EasyNetQ.HostedService.Abstractions; using EasyNetQ.HostedService.DependencyInjection; using EasyNetQ.HostedService.Internals; +using EasyNetQ.HostedService.Tracing; using EasyNetQ.Logging; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -using ILogger = Microsoft.Extensions.Logging.ILogger; +using DiagnosticListener = System.Diagnostics.DiagnosticListener; namespace EasyNetQ.HostedService { @@ -21,8 +22,9 @@ namespace EasyNetQ.HostedService /// /// /// - /// - public delegate void OnConnectedCallback(IAdvancedBus b, IRabbitMqConfig c, CancellationToken t, ILogger logger); + /// + public delegate void OnConnectedCallback(IAdvancedBus b, IRabbitMqConfig c, CancellationToken t, + TraceLogWriter logWriter); /// /// A hosted service that accepts an EasyNetQ and uses it to either set up a consumer or @@ -55,7 +57,6 @@ public abstract class RabbitMqService : IHostedService private protected IOutgoingMessageInterceptor OutgoingMessageInterceptor; private bool _isProperlyInitialized; private readonly List _disposables = new List(); - private ILogger _logger; // ReSharper disable RedundantDefaultMemberInitializer @@ -84,9 +85,15 @@ static RabbitMqService() protected IRabbitMqConfig RabbitMqConfig => _rmqConfig; /// - /// The initialized that is exposed to subclasses of . + /// The initialized that is exposed to subclasses of . /// - protected ILogger Logger => _logger; + protected ActivitySource ActivitySource { get; } = + new ActivitySource(TraceSourceName.Activity, typeof(T).Assembly.GetName().Version.ToString()); + + /// + /// The initialized that is exposed to subclasses of . + /// + protected TraceLogWriter Logger { get; } = new TraceLogWriter(new DiagnosticListener(TraceSourceName.Log)); /// /// This static method is used by to construct a singleton hosted service @@ -154,7 +161,6 @@ public static TDerived Create( service._busProxy = busProxy; service._rmqConfig = rmqConfig; service._onConnected = onConnected; - service._logger = logger; service._isProperlyInitialized = true; service.Initialize(); @@ -244,29 +250,29 @@ public virtual Task StartAsync(CancellationToken cancellationToken) $"This {nameof(RabbitMqService)} instance should have been created through a call to " + $"{nameof(RabbitMqService)}.{nameof(Create)}."); - _logger?.LogInformation($"RabbitMqService {_rmqConfig.Id} is starting."); + Logger.LogInformation($"RabbitMqService {_rmqConfig.Id} is starting."); if (Bus.IsConnected) { - _logger?.LogDebug($"Connected to RabbitMQ with configuration {_rmqConfig.Id}."); + Logger.LogDebug($"Connected to RabbitMQ with configuration {_rmqConfig.Id}."); } Bus.Connected += (sender, args) => - _logger?.LogDebug($"Connected to RabbitMQ with configuration {_rmqConfig.Id}."); + Logger.LogDebug($"Connected to RabbitMQ with configuration {_rmqConfig.Id}."); Bus.Disconnected += (sender, args) => - _logger?.LogDebug($"Disconnected from RabbitMQ with configuration {_rmqConfig.Id}."); + Logger.LogDebug($"Disconnected from RabbitMQ with configuration {_rmqConfig.Id}."); // run the setup callbacks on connection if (Bus.IsConnected) { _onConnected.ForEach(callback => - HandleCallbackError(callback)(Bus, _rmqConfig, cancellationToken, _logger)); + HandleCallbackError(callback)(Bus, _rmqConfig, cancellationToken, Logger)); } _onConnected.ForEach(callback => Bus.Connected += (sender, args) => - HandleCallbackError(callback)(Bus, _rmqConfig, cancellationToken, _logger)); + HandleCallbackError(callback)(Bus, _rmqConfig, cancellationToken, Logger)); if (_isConsumer) { @@ -288,7 +294,7 @@ public virtual Task StartAsync(CancellationToken cancellationToken) /// public virtual Task StopAsync(CancellationToken cancellationToken) { - _logger?.LogInformation($"RabbitMqService {_rmqConfig.Id} is stopping."); + Logger.LogInformation($"RabbitMqService {_rmqConfig.Id} is stopping."); if (!cancellationToken.IsCancellationRequested) { @@ -300,7 +306,7 @@ public virtual Task StopAsync(CancellationToken cancellationToken) } catch (Exception exception) { - _logger?.LogCritical( + Logger.LogCritical( $"Duplicate IDisposable disposal on {serviceTypeName} termination: " + $"{exception.Message}\n" + exception.StackTrace); @@ -312,7 +318,7 @@ public virtual Task StopAsync(CancellationToken cancellationToken) } catch (Exception exception) { - _logger?.LogCritical( + Logger.LogCritical( $"Could not dispose {nameof(IAdvancedBus)} on {serviceTypeName} termination: " + $"{exception.Message}\n" + exception.StackTrace); @@ -379,7 +385,7 @@ private OnConnectedCallback HandleCallbackError(OnConnectedCallback callback) => } catch (Exception exception) { - _logger?.LogCritical($"Failed to run callback on connection:\n{exception}", exception); + Logger.LogCritical($"Failed to run callback on connection:\n{exception}", exception); } }; } diff --git a/src/EasyNetQ.HostedService/Tracing/TraceActivityListener.cs b/src/EasyNetQ.HostedService/Tracing/TraceActivityListener.cs new file mode 100644 index 0000000..4ea25be --- /dev/null +++ b/src/EasyNetQ.HostedService/Tracing/TraceActivityListener.cs @@ -0,0 +1,45 @@ +using System; +using System.Diagnostics; + +namespace EasyNetQ.HostedService.Tracing +{ + /// + /// A convenience type to help register a handler for start/end events, emitted by the + /// library. + /// + public static class TraceActivityListener + { + /// + /// Subscribes handlers for the start/end events, emitted + /// by an . + /// + /// + /// + public static void Subscribe(Action onActivityStarted, Action onActivityStopped) => + ActivitySource.AddActivityListener(new ActivityListener + { + ShouldListenTo = source => source.Name == TraceSourceName.Activity, + Sample = (ref ActivityCreationOptions options) => ActivitySamplingResult.AllDataAndRecorded, + ActivityStarted = onActivityStarted, + ActivityStopped = onActivityStopped + }); + + /// + /// Subscribes handlers for the start/end events, emitted + /// by an . + /// + /// This variant subscribes to all start/end events, irrespective from the actual source + /// within the library. + /// + /// + /// + public static void SubscribeToAll(Action onActivityStarted, Action onActivityStopped) => + ActivitySource.AddActivityListener(new ActivityListener + { + ShouldListenTo = source => source.Name.StartsWith(TraceSourceName.KeyPrefix), + Sample = (ref ActivityCreationOptions options) => ActivitySamplingResult.AllDataAndRecorded, + ActivityStarted = onActivityStarted, + ActivityStopped = onActivityStopped + }); + } +} diff --git a/src/EasyNetQ.HostedService/Tracing/TraceActivityName.cs b/src/EasyNetQ.HostedService/Tracing/TraceActivityName.cs new file mode 100644 index 0000000..9a34f5d --- /dev/null +++ b/src/EasyNetQ.HostedService/Tracing/TraceActivityName.cs @@ -0,0 +1,20 @@ +using System.Diagnostics; + +namespace EasyNetQ.HostedService.Tracing +{ + /// + /// The keys that describe the activity names being emitted through instances of . + /// + public static class TraceActivityName + { + /// + /// The key used when starting a new in a . + /// + public static readonly string Consume = "Consume"; + + /// + /// The key used when starting a new in a . + /// + public static readonly string Publish = "Publish"; + } +} diff --git a/src/EasyNetQ.HostedService/Tracing/TraceActivityTagName.cs b/src/EasyNetQ.HostedService/Tracing/TraceActivityTagName.cs new file mode 100644 index 0000000..da9942f --- /dev/null +++ b/src/EasyNetQ.HostedService/Tracing/TraceActivityTagName.cs @@ -0,0 +1,51 @@ +using System.Diagnostics; + +namespace EasyNetQ.HostedService.Tracing +{ + /// + /// The keys that describe the activity tag names being emitted through instances of . + /// + public static class TraceActivityTagName + { + /// + /// The tag name for the exchange where a RabbitMQ message is being published. + /// + public static readonly string Exchange = "Exchange"; + + /// + /// The tag name for the rouging key used for a RabbitMQ message that is being published. + /// + public static readonly string RoutingKey = "RoutingKey"; + + /// + /// The tag name for the mandatory boolean value, used when a RabbitMQ message is being published. + /// + public static readonly string Mandatory = "Mandatory"; + + /// + /// The tag name for the headers of a RabbitMQ message that is being consumed or published. + /// + public static readonly string Headers = "Headers"; + + /// + /// The tag name for the correlation id of a RabbitMQ message that is being consumed. + /// + public static readonly string CorrelationId = "CorrelationId"; + + /// + /// The tag name for the delivery tag of a RabbitMQ message that is being consumed. + /// + public static readonly string DeliveryTag = "DeliveryTag"; + + /// + /// The tag name for the redelivered flag of a RabbitMQ message that is being consumed. + /// + public static readonly string Redelivered = "Redelivered"; + + /// + /// The tag name used when an exception occurs in an , in a + /// . + /// + public static readonly string Exception = "Exception"; + } +} diff --git a/src/EasyNetQ.HostedService/Tracing/TraceEventName.cs b/src/EasyNetQ.HostedService/Tracing/TraceEventName.cs new file mode 100644 index 0000000..ed1d2db --- /dev/null +++ b/src/EasyNetQ.HostedService/Tracing/TraceEventName.cs @@ -0,0 +1,26 @@ +using System.Diagnostics; + +namespace EasyNetQ.HostedService.Tracing +{ + /// + /// The keys that describe what kind of event is being emitted through instances of + /// and . + /// + public static class TraceEventName + { + /// + /// The key for all emitted logs, through the emitting . + /// + public static readonly string Log = "Log"; + + /// + /// The key used when cancelling an in a . + /// + public static readonly string Cancelled = "Cancelled"; + + /// + /// The key used when an exception occurs in an , in a . + /// + public static readonly string Exception = "Exception"; + } +} diff --git a/src/EasyNetQ.HostedService/Tracing/TraceLog.cs b/src/EasyNetQ.HostedService/Tracing/TraceLog.cs new file mode 100644 index 0000000..cfff2a4 --- /dev/null +++ b/src/EasyNetQ.HostedService/Tracing/TraceLog.cs @@ -0,0 +1,40 @@ +using System; +using System.Diagnostics; +using Microsoft.Extensions.Logging; + +namespace EasyNetQ.HostedService.Tracing +{ + /// + /// The type of log emitted from an instance of . + /// + public sealed class TraceLog + { + /// + /// Creates a new . + /// + /// + /// + /// + public TraceLog(LogLevel logLevel, string message, Exception exception = null) + { + LogLevel = logLevel; + Message = message; + Exception = exception; + } + + /// + /// The log's . + /// + public LogLevel LogLevel { get; } + + /// + /// The log's message. + /// + public string Message { get; } + + /// + /// The log's , if it's an error. + /// + public Exception Exception { get; } + } +} diff --git a/src/EasyNetQ.HostedService/Tracing/TraceLogListener.cs b/src/EasyNetQ.HostedService/Tracing/TraceLogListener.cs new file mode 100644 index 0000000..e77f797 --- /dev/null +++ b/src/EasyNetQ.HostedService/Tracing/TraceLogListener.cs @@ -0,0 +1,109 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using Microsoft.Extensions.Logging; + +namespace EasyNetQ.HostedService.Tracing +{ + /// + /// A convenience type to help register a handler for log messages, emitted by the library. + /// + public static class TraceLogListener + { + /// + /// Subscribes an as a handler for the log events, emitted + /// by a . + /// + /// + /// The handler for the log events, emitted by a . + /// + /// + public static IDisposable Subscribe(Action handler, LogLevel minLogLevel = LogLevel.Debug) => + // ReSharper disable once AccessToStaticMemberViaDerivedType + DiagnosticListener.AllListeners.Subscribe( + new DiagnosticListenerObserver(listenerName => listenerName == TraceSourceName.Log, handler, + minLogLevel)); + + /// + /// Subscribes an as a handler for the log events, emitted + /// by a . + /// + /// This variant subscribes to all log messages, irrespective from the actual source within the library. + /// + /// + /// The handler for the log events, emitted by a . + /// + /// + public static IDisposable SubscribeToAll(Action handler, LogLevel minLogLevel = LogLevel.Debug) => + // ReSharper disable once AccessToStaticMemberViaDerivedType + DiagnosticListener.AllListeners.Subscribe( + new DiagnosticListenerObserver( + listenerName => listenerName.StartsWith(TraceSourceName.KeyPrefix), + handler, minLogLevel)); + + private sealed class DiagnosticListenerObserver : IObserver + { + public DiagnosticListenerObserver(Func listenerNamePredicate, Action handler, + LogLevel minLogLevel) + { + _listenerNamePredicate = listenerNamePredicate; + _handler = handler; + _minLogLevel = minLogLevel; + } + + public void OnCompleted() + { + } + + public void OnError(Exception error) + { + } + + public void OnNext(DiagnosticListener listener) + { + if (_listenerNamePredicate(listener.Name)) + { + listener.Subscribe(new LogObserver(_handler, _minLogLevel), + (eventName, logLevel, what) => + eventName == TraceEventName.Log && _minLogLevel <= (LogLevel) logLevel); + } + } + + private readonly Func _listenerNamePredicate; + private readonly Action _handler; + private readonly LogLevel _minLogLevel; + } + + private sealed class LogObserver : IObserver> + { + public LogObserver(Action handler, LogLevel minLogLevel) + { + _handler = handler; + _minLogLevel = minLogLevel; + } + + public void OnCompleted() + { + } + + public void OnError(Exception error) + { + } + + public void OnNext(KeyValuePair value) + { + var traceLog = (TraceLog) value.Value; + + if (_minLogLevel > traceLog.LogLevel) + { + return; + } + + _handler(traceLog); + } + + private readonly Action _handler; + private readonly LogLevel _minLogLevel; + } + } +} diff --git a/src/EasyNetQ.HostedService/Tracing/TraceLogWriter.cs b/src/EasyNetQ.HostedService/Tracing/TraceLogWriter.cs new file mode 100644 index 0000000..c0f9169 --- /dev/null +++ b/src/EasyNetQ.HostedService/Tracing/TraceLogWriter.cs @@ -0,0 +1,80 @@ +using System; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using Microsoft.Extensions.Logging; + +namespace EasyNetQ.HostedService.Tracing +{ + /// + /// The class that emits instances of , through an instance of . + /// + public sealed class TraceLogWriter + { + /// + /// Creates a new . + /// + /// + public TraceLogWriter(DiagnosticSource diagnosticSource) + { + _diagnosticSource = diagnosticSource; + } + + /// + /// Logs a message using the Trace . + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void LogTrace(string message) => LogTrace(LogLevel.Trace, message); + + /// + /// Logs a message using the Debug . + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void LogDebug(string message) => LogTrace(LogLevel.Debug, message); + + /// + /// Logs a message using the Information . + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void LogInformation(string message) => LogTrace(LogLevel.Information, message); + + /// + /// Logs a message using the Error . + /// + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void LogError(string message, Exception exception = null) => + LogTrace(LogLevel.Error, message, exception); + + /// + /// Logs a message using the Critical . + /// + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void LogCritical(string message, Exception exception = null) => + LogTrace(LogLevel.Critical, message, exception); + + /// + /// This is the core logging method, used by the public one's + /// + /// It checks whether there are any listeners for the corresponding and if there are, + /// it emits a instance. + /// + /// + /// + /// + private void LogTrace(LogLevel logLevel, string message, Exception exception = null) + { + if (_diagnosticSource.IsEnabled(TraceEventName.Log, logLevel)) + { + _diagnosticSource.Write(TraceEventName.Log, new TraceLog(logLevel, message, exception)); + } + } + + private readonly DiagnosticSource _diagnosticSource; + } +} diff --git a/src/EasyNetQ.HostedService/Tracing/TraceSourceName.cs b/src/EasyNetQ.HostedService/Tracing/TraceSourceName.cs new file mode 100644 index 0000000..2914187 --- /dev/null +++ b/src/EasyNetQ.HostedService/Tracing/TraceSourceName.cs @@ -0,0 +1,36 @@ +using System.Diagnostics; + +namespace EasyNetQ.HostedService.Tracing +{ + /// + /// Holds the keys for creating instances of and . + /// + /// + /// The type of the derived , which implements a consumer or a producer. + /// + public static class TraceSourceName + { + /// + /// The key for a , through which logs are emitted. + /// + public static readonly string Log = $"{TraceSourceName.KeyPrefix}{typeof(T).FullName}.Log"; + + /// + /// The key for an , through which consumer and producers emit events on receiving + /// and publishing a message, respectively. + /// + public static readonly string Activity = $"{TraceSourceName.KeyPrefix}{typeof(T).FullName}.Activity"; + } + + /// + /// Holds the key prefix for all keys in . + /// + public static class TraceSourceName + { + /// + /// The key prefix for all keys in . + /// + public static readonly string KeyPrefix = + $"{typeof(TraceSourceName).Assembly.GetName().Name}@{typeof(TraceSourceName).Assembly.GetName().Version}:"; + } +} diff --git a/tests/EasyNetQ.HostedService.TestApp/EasyNetQ.HostedService.TestApp.csproj b/tests/EasyNetQ.HostedService.TestApp/EasyNetQ.HostedService.TestApp.csproj new file mode 100644 index 0000000..6ed0a48 --- /dev/null +++ b/tests/EasyNetQ.HostedService.TestApp/EasyNetQ.HostedService.TestApp.csproj @@ -0,0 +1,19 @@ + + + + netcoreapp3.1 + + + + + + + + + + + + + + + diff --git a/tests/EasyNetQ.HostedService.TestApp/EchoMessage.cs b/tests/EasyNetQ.HostedService.TestApp/EchoMessage.cs new file mode 100644 index 0000000..106cceb --- /dev/null +++ b/tests/EasyNetQ.HostedService.TestApp/EchoMessage.cs @@ -0,0 +1,7 @@ +namespace EasyNetQ.HostedService.TestApp +{ + public struct EchoMessage + { + public string Text { get; set; } + } +} diff --git a/tests/EasyNetQ.HostedService.TestApp/IRabbitMqServiceTestConsumer.cs b/tests/EasyNetQ.HostedService.TestApp/IRabbitMqServiceTestConsumer.cs new file mode 100644 index 0000000..8ecf316 --- /dev/null +++ b/tests/EasyNetQ.HostedService.TestApp/IRabbitMqServiceTestConsumer.cs @@ -0,0 +1,6 @@ +namespace EasyNetQ.HostedService.TestApp +{ + public interface IRabbitMqServiceTestConsumer + { + } +} diff --git a/tests/EasyNetQ.HostedService.TestApp/IRabbitMqServiceTestProducer.cs b/tests/EasyNetQ.HostedService.TestApp/IRabbitMqServiceTestProducer.cs new file mode 100644 index 0000000..6acafe3 --- /dev/null +++ b/tests/EasyNetQ.HostedService.TestApp/IRabbitMqServiceTestProducer.cs @@ -0,0 +1,6 @@ +namespace EasyNetQ.HostedService.TestApp +{ + public interface IRabbitMqServiceTestProducer + { + } +} diff --git a/tests/EasyNetQ.HostedService.TestApp/MessageInterceptor.cs b/tests/EasyNetQ.HostedService.TestApp/MessageInterceptor.cs new file mode 100644 index 0000000..647b31b --- /dev/null +++ b/tests/EasyNetQ.HostedService.TestApp/MessageInterceptor.cs @@ -0,0 +1,36 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using EasyNetQ.HostedService.Abstractions; +using Microsoft.Extensions.Logging; + +namespace EasyNetQ.HostedService.TestApp +{ + public class MessageInterceptor : IIncomingMessageInterceptor, + IOutgoingMessageInterceptor + { + private readonly ILogger _logger; + + public MessageInterceptor(ILogger logger) + { + _logger = logger; + } + + public Task InterceptMessage(IMessage message, MessageReceivedInfo messageReceivedInfo, + CancellationToken cancellationToken) + { + _logger.LogDebug($"Intercepted incoming message of type {message.MessageType.FullName}."); + + return Task.CompletedTask; + } + + public Task InterceptMessage(byte[] message, Type type, IDictionary headers, + CancellationToken cancellationToken) + { + _logger.LogDebug($"Intercepted outgoing message of type {type.FullName}."); + + return Task.CompletedTask; + } + } +} diff --git a/tests/EasyNetQ.HostedService.TestApp/Program.cs b/tests/EasyNetQ.HostedService.TestApp/Program.cs new file mode 100644 index 0000000..6a29b68 --- /dev/null +++ b/tests/EasyNetQ.HostedService.TestApp/Program.cs @@ -0,0 +1,67 @@ +using System; +using EasyNetQ.HostedService.Abstractions; +using EasyNetQ.HostedService.DependencyInjection; +using EasyNetQ.HostedService.Models; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Serilog; + +namespace EasyNetQ.HostedService.TestApp +{ + public static class Program + { + internal static string TestQueueName; + + public static void Main(string[] args) + { + Log.Logger = new LoggerConfiguration() + .MinimumLevel.Debug() + .WriteTo.Console() + .CreateLogger(); + + AppDomain.CurrentDomain.UnhandledException += (sender, eventArgs) => + Log.Error($"Host crashed: {((Exception) eventArgs.ExceptionObject).Message}"); + + CreateHostBuilder(args).Build().Run(); + } + + private static IHostBuilder CreateHostBuilder(string[] args) => + Host.CreateDefaultBuilder(args) + .ConfigureServices((hostContext, services) => + { + TestQueueName = hostContext.Configuration.GetValue("RabbitMQ:QueueName"); + + var rabbitMqConfigConsumer = + hostContext.Configuration.GetSection("RabbitMQ:Test").Get(); + + new RabbitMqServiceBuilder() + .WithRabbitMqConfig(rabbitMqConfigConsumer) + .WithStronglyTypedMessages + .Build(services) + .Add(services, typeof(RabbitMqServiceTestConsumer), typeof(IRabbitMqServiceTestConsumer)); + + var rabbitMqConfigProducer = + hostContext.Configuration.GetSection("RabbitMQ:Test").Get(); + + rabbitMqConfigProducer.Id = "Producer"; + + new RabbitMqServiceBuilder() + .WithRabbitMqConfig(rabbitMqConfigProducer) + .WithStronglyTypedMessages + .Build(services) + .Add(services, typeof(RabbitMqServiceTestProducer), typeof(IRabbitMqServiceTestProducer)); + + services.AddSingleton(rabbitMqConfigConsumer); + + services.AddHostedService(); + + services + .AddSingleton, MessageInterceptor>(); + + services + .AddSingleton, MessageInterceptor>(); + }) + .UseSerilog(); + } +} diff --git a/tests/EasyNetQ.HostedService.TestApp/Properties/launchSettings.json b/tests/EasyNetQ.HostedService.TestApp/Properties/launchSettings.json new file mode 100644 index 0000000..d7d04a3 --- /dev/null +++ b/tests/EasyNetQ.HostedService.TestApp/Properties/launchSettings.json @@ -0,0 +1,10 @@ +{ + "profiles": { + "EasyNetQ.HostedService.TestApp": { + "commandName": "Project", + "environmentVariables": { + "DOTNET_ENVIRONMENT": "Development" + } + } + } +} diff --git a/tests/EasyNetQ.HostedService.TestApp/RabbitMqServiceTestConsumer.cs b/tests/EasyNetQ.HostedService.TestApp/RabbitMqServiceTestConsumer.cs new file mode 100644 index 0000000..457cd7a --- /dev/null +++ b/tests/EasyNetQ.HostedService.TestApp/RabbitMqServiceTestConsumer.cs @@ -0,0 +1,91 @@ +using System; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using EasyNetQ.Consumer; +using EasyNetQ.Events; +using EasyNetQ.HostedService.Models; +using EasyNetQ.Topology; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; + +namespace EasyNetQ.HostedService.TestApp +{ + public class RabbitMqServiceTestConsumer : RabbitMqConsumer, + IRabbitMqServiceTestConsumer + { + private readonly IQueue _queue = new Queue(Program.TestQueueName); + private readonly TaskCompletionSource _taskCompletionSourceUntyped = new TaskCompletionSource(); + + private readonly TaskCompletionSource _taskCompletionSourceTyped = + new TaskCompletionSource(); + + // ReSharper disable once ParameterOnlyUsedForPreconditionCheck.Local + public RabbitMqServiceTestConsumer(IHostEnvironment env) + { + Debug.Assert(env != null, $"{nameof(env)} must not be null."); + } + + public Task UntypedResponse => _taskCompletionSourceUntyped.Task; + + public Task TypedResponse => _taskCompletionSourceTyped.Task; + + public override Task StopAsync(CancellationToken cancellationToken) + { + var baseTask = base.StopAsync(cancellationToken); + + return baseTask; + } + + protected override void Initialize() + { + Bus.QueueDeclare(_queue.Name, _queue.IsDurable, _queue.IsExclusive, _queue.IsAutoDelete, + CancellationToken.None); + } + + protected override void RegisterMessageHandlers(IHandlerRegistration handlers) + { + handlers.Add((IMessageHandler) HandleMessage); + handlers.Add((IMessageHandler) HandleMessage); + } + + protected override ConsumerConfig GetConsumerConfig(CancellationToken cancellationToken) + { + return new ConsumerConfig + { + Queue = _queue + }; + } + + protected override void OnStartConsumingEvent(StartConsumingFailedEvent @event) + { + throw new Exception($"Should not be able to consume with a conflicted queue declaration."); + } + + private Task HandleMessage(IMessage message, MessageReceivedInfo info, + CancellationToken token) + { + Logger.LogDebug($"Received untyped message: {message.Body}"); + + _taskCompletionSourceUntyped.SetResult(new string(message.Body.Reverse().ToArray())); + + return Task.FromResult(AckStrategies.Ack); + } + + private Task HandleMessage(IMessage message, MessageReceivedInfo info, + CancellationToken token) + { + var typedMessage = message.Body; + + Logger.LogDebug($"Received typed message: {JsonConvert.SerializeObject(typedMessage)}"); + + typedMessage.Text = new string(typedMessage.Text.Reverse().ToArray()); + + _taskCompletionSourceTyped.SetResult(typedMessage); + + return Task.FromResult(AckStrategies.Ack); + } + } +} diff --git a/tests/EasyNetQ.HostedService.TestApp/RabbitMqServiceTestProducer.cs b/tests/EasyNetQ.HostedService.TestApp/RabbitMqServiceTestProducer.cs new file mode 100644 index 0000000..81c315c --- /dev/null +++ b/tests/EasyNetQ.HostedService.TestApp/RabbitMqServiceTestProducer.cs @@ -0,0 +1,7 @@ +namespace EasyNetQ.HostedService.TestApp +{ + public class RabbitMqServiceTestProducer : RabbitMqProducer, + IRabbitMqServiceTestProducer + { + } +} diff --git a/tests/EasyNetQ.HostedService.TestApp/RabbitMqServiceTester.cs b/tests/EasyNetQ.HostedService.TestApp/RabbitMqServiceTester.cs new file mode 100644 index 0000000..59339c5 --- /dev/null +++ b/tests/EasyNetQ.HostedService.TestApp/RabbitMqServiceTester.cs @@ -0,0 +1,128 @@ +// ReSharper disable UnusedMember.Local + +using System; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; + +namespace EasyNetQ.HostedService.TestApp +{ + public class RabbitMqServiceTester : IHostedService + { + private readonly RabbitMqServiceTestConsumer _testConsumer; + private readonly RabbitMqServiceTestProducer _testProducer; + private readonly ILogger _logger; + private readonly IHostApplicationLifetime _hostApplicationLifetime; + + public RabbitMqServiceTester( + // ReSharper disable once ParameterOnlyUsedForPreconditionCheck.Local + IRabbitMqServiceTestConsumer iTestConsumer, + RabbitMqServiceTestConsumer testConsumer, + // ReSharper disable once ParameterOnlyUsedForPreconditionCheck.Local + IRabbitMqServiceTestProducer iTestProducer, + RabbitMqServiceTestProducer testProducer, + ILogger logger, + IHostApplicationLifetime hostApplicationLifetime) + { + Debug.Assert(ReferenceEquals(iTestConsumer, testConsumer), + $"{nameof(iTestConsumer)} must be reference the same object as {nameof(testConsumer)}."); + + Debug.Assert(ReferenceEquals(iTestProducer, testProducer), + $"{nameof(iTestProducer)} must be reference the same object as {nameof(testProducer)}."); + + _testConsumer = testConsumer; + _testProducer = testProducer; + _logger = logger; + _hostApplicationLifetime = hostApplicationLifetime; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + _logger.LogDebug("Starting test..."); + + RunProduceConsumeTest(); + + // RunLoopBased(100_000, cancellationToken); + + // RunConsoleBased(cancellationToken).GetAwaiter().GetResult(); + + _logger.LogDebug("Stopping application..."); + + _hostApplicationLifetime.StopApplication(); + + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + private void RunProduceConsumeTest() + { + const string message = "Hello World"; + + // var payload = message; + + var payload = new EchoMessage {Text = message}; + + _testProducer.PublishAsync("", Program.TestQueueName, payload).GetAwaiter().GetResult(); + + // var response = _testConsumer.UntypedResponse.GetAwaiter().GetResult(); + // + // _logger.LogDebug($"Received response: {response}"); + + var response = _testConsumer.TypedResponse.GetAwaiter().GetResult(); + + _logger.LogDebug($"Received response: {JsonConvert.SerializeObject(response)}"); + + // var test = new string(message.Reverse().ToArray()); + + // Debug.Assert(payload == test); + + var test = new string(response.Text.Reverse().ToArray()); + + Debug.Assert(payload.Text == test); + } + + private Task RunConsoleBased(CancellationToken cancellationToken) => + Task.Run(async () => + { + while (true) + { + var key = Console.ReadKey(); + + if (key.Key == ConsoleKey.Escape) + { + break; + } + + await _testProducer.PublishAsync( + "", + Program.TestQueueName, + new EchoMessage {Text = "This is a test."}); + } + }, cancellationToken); + + private void RunLoopBased(int max, CancellationToken cancellationToken) + { + Task.WaitAll(Enumerable.Range(0, max).Select(i => Task.Run(async () => + { + _logger.LogDebug($"Sending message #{i}."); + + var result = await _testProducer + .PublishAsync( + "", + Program.TestQueueName, + new EchoMessage {Text = "This is a test."}) + .ConfigureAwait(false); + + _logger.LogDebug($"Result #{i}: {result}."); + }, cancellationToken)).ToArray()); + } + } +} diff --git a/tests/EasyNetQ.HostedService.TestApp/appsettings.Development.json b/tests/EasyNetQ.HostedService.TestApp/appsettings.Development.json new file mode 100644 index 0000000..29a4935 --- /dev/null +++ b/tests/EasyNetQ.HostedService.TestApp/appsettings.Development.json @@ -0,0 +1,16 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Debug", + "Microsoft": "Warning", + "Microsoft.Hosting.Lifetime": "Information" + } + }, + "RabbitMQ": { + "Test": { + "HostName": "localhost", + "UserName": "guest", + "PassWord": "guest" + } + } +} diff --git a/tests/EasyNetQ.HostedService.TestApp/appsettings.json b/tests/EasyNetQ.HostedService.TestApp/appsettings.json new file mode 100644 index 0000000..4f6e78a --- /dev/null +++ b/tests/EasyNetQ.HostedService.TestApp/appsettings.json @@ -0,0 +1,18 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft": "Warning", + "Microsoft.Hosting.Lifetime": "Information" + } + }, + "RabbitMQ": { + "Test": { + "Id": "Consumer", + "HostName": "", + "UserName": "", + "PassWord": "" + }, + "QueueName": "easynetq.hosted.service.test.queue" + } +}