From 886c682e11c61577f3cef8392703b43ffe990da1 Mon Sep 17 00:00:00 2001 From: Charalampos Detsis Date: Mon, 23 Aug 2021 15:26:36 +0300 Subject: [PATCH 1/2] feat(serialization): add serialization based on specific header value --- doc/EasyNetQ.HostedService.xml | 19 ++++- .../RabbitMqServiceBuilder.cs | 14 +++- .../HeaderMessageSerializationStrategy.cs | 76 +++++++++++++++++++ .../HeaderTypeSerializationConfiguration.cs | 12 +++ .../Internals/MessageSerializationStrategy.cs | 9 +++ .../Internals/UnhandledHeaderTypeException.cs | 25 ++++++ src/EasyNetQ.HostedService/RabbitMqService.cs | 36 ++++++--- 7 files changed, 176 insertions(+), 15 deletions(-) create mode 100644 src/EasyNetQ.HostedService/Internals/HeaderMessageSerializationStrategy.cs create mode 100644 src/EasyNetQ.HostedService/Internals/HeaderTypeSerializationConfiguration.cs create mode 100644 src/EasyNetQ.HostedService/Internals/MessageSerializationStrategy.cs create mode 100644 src/EasyNetQ.HostedService/Internals/UnhandledHeaderTypeException.cs diff --git a/doc/EasyNetQ.HostedService.xml b/doc/EasyNetQ.HostedService.xml index dc09110..c1adce0 100644 --- a/doc/EasyNetQ.HostedService.xml +++ b/doc/EasyNetQ.HostedService.xml @@ -277,6 +277,21 @@ type property. + + + + + + + + + + + + + + + Thrown when a consumer's default handler for is run, since @@ -819,7 +834,7 @@ producer as a singleton hosted service. - + This static method is used by to construct a singleton hosted service for a consumer of a producer. @@ -834,7 +849,7 @@ . - + diff --git a/src/EasyNetQ.HostedService/DependencyInjection/RabbitMqServiceBuilder.cs b/src/EasyNetQ.HostedService/DependencyInjection/RabbitMqServiceBuilder.cs index 8fb56c5..63b1a08 100755 --- a/src/EasyNetQ.HostedService/DependencyInjection/RabbitMqServiceBuilder.cs +++ b/src/EasyNetQ.HostedService/DependencyInjection/RabbitMqServiceBuilder.cs @@ -26,8 +26,9 @@ namespace EasyNetQ.HostedService.DependencyInjection /// public sealed class RabbitMqServiceBuilder where T : RabbitMqService { - private bool _configUseStronglyTypedMessages; + private MessageSerializationStrategy _configUseStronglyTypedMessages = MessageSerializationStrategy.UnTyped; private bool _configUseCorrelationIds; + private HeaderTypeSerializationConfiguration _headerTypeSerializationConfiguration; private IRabbitMqConfig _configRabbitMqConfig; private readonly List _configOnConnected = new List(); @@ -45,11 +46,18 @@ public RabbitMqServiceBuilder WithStronglyTypedMessages { get { - _configUseStronglyTypedMessages = true; + _configUseStronglyTypedMessages = MessageSerializationStrategy.Typed; return this; } } + + public RabbitMqServiceBuilder WithHeaderTypedMessages(HeaderTypeSerializationConfiguration configuration) + { + _configUseStronglyTypedMessages = MessageSerializationStrategy.Header; + _headerTypeSerializationConfiguration = configuration; + return this; + } /// /// When set, producers will send a correlation id along with each message. @@ -173,7 +181,7 @@ public Func Build(IServiceCollection serviceCollection) .BuildServiceProvider(); var bus = RabbitMqService.CreateLazyBus( - _configRabbitMqConfig, _configUseStronglyTypedMessages, _configUseCorrelationIds, serviceProvider); + _configRabbitMqConfig, _configUseStronglyTypedMessages, _configUseCorrelationIds, _headerTypeSerializationConfiguration, serviceProvider); busProxy = new BusProxy(_configRabbitMqConfig.Id, bus); diff --git a/src/EasyNetQ.HostedService/Internals/HeaderMessageSerializationStrategy.cs b/src/EasyNetQ.HostedService/Internals/HeaderMessageSerializationStrategy.cs new file mode 100644 index 0000000..75a24d0 --- /dev/null +++ b/src/EasyNetQ.HostedService/Internals/HeaderMessageSerializationStrategy.cs @@ -0,0 +1,76 @@ +using System; +using System.Linq; + +namespace EasyNetQ.HostedService.Internals +{ + public class HeaderMessageSerializationStrategy : IMessageSerializationStrategy + { + private readonly bool _useCorrelationIds; + private readonly ISerializer _serializer; + private readonly ICorrelationIdGenerationStrategy _correlationIdGenerator; + private readonly HeaderTypeSerializationConfiguration _headerTypeSerializationConfiguration; + + public HeaderMessageSerializationStrategy( + bool useCorrelationIds, + HeaderTypeSerializationConfiguration headerTypeSerializationConfiguration, + ISerializer serializer, + ICorrelationIdGenerationStrategy correlationIdGenerator) + { + _useCorrelationIds = useCorrelationIds; + _serializer = serializer; + _correlationIdGenerator = correlationIdGenerator; + _headerTypeSerializationConfiguration = headerTypeSerializationConfiguration; + } + + public SerializedMessage SerializeMessage(IMessage message) + { + var bytes = _serializer.MessageToBytes(message.MessageType, message.GetBody()); + var properties = message.Properties; + + var typeHeader = _headerTypeSerializationConfiguration + .TypeMappings + .SingleOrDefault(v => message.MessageType == v.Value) + .Key; + + if (string.IsNullOrEmpty(typeHeader)) + { + throw new EasyNetQException( + $"Did not find a unique mapping for the specified type {message.MessageType}"); + } + + properties.Headers.Add(_headerTypeSerializationConfiguration.TypeHeader, typeHeader); + + if (_useCorrelationIds && string.IsNullOrEmpty(properties.CorrelationId)) + { + properties.CorrelationId = _correlationIdGenerator.GetCorrelationId(); + } + + return new SerializedMessage(properties, bytes); + } + + public IMessage DeserializeMessage(MessageProperties properties, byte[] body) + { + var typeHeaderValueExists = properties.Headers.TryGetValue( + _headerTypeSerializationConfiguration.TypeHeader, + out var typeFromHeader); + + if (!typeHeaderValueExists) + { + throw new UnhandledHeaderTypeException("Type header not present on message"); + } + + var typeExistsInMapping = _headerTypeSerializationConfiguration.TypeMappings.TryGetValue( + System.Text.Encoding.Default.GetString((byte[])typeFromHeader), + out var messageType); + + if (!typeExistsInMapping) + { + throw new UnhandledHeaderTypeException("Message type from header not found in mapping"); + } + + var message = _serializer.BytesToMessage(messageType, body); + + return MessageFactory.CreateInstance(messageType, message, properties); + } + } +} \ No newline at end of file diff --git a/src/EasyNetQ.HostedService/Internals/HeaderTypeSerializationConfiguration.cs b/src/EasyNetQ.HostedService/Internals/HeaderTypeSerializationConfiguration.cs new file mode 100644 index 0000000..b3649d8 --- /dev/null +++ b/src/EasyNetQ.HostedService/Internals/HeaderTypeSerializationConfiguration.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; + +namespace EasyNetQ.HostedService.Internals +{ + public class HeaderTypeSerializationConfiguration + { + public string TypeHeader { get; set; } + + public Dictionary TypeMappings { get; set; } + } +} \ No newline at end of file diff --git a/src/EasyNetQ.HostedService/Internals/MessageSerializationStrategy.cs b/src/EasyNetQ.HostedService/Internals/MessageSerializationStrategy.cs new file mode 100644 index 0000000..1d5f6b2 --- /dev/null +++ b/src/EasyNetQ.HostedService/Internals/MessageSerializationStrategy.cs @@ -0,0 +1,9 @@ +namespace EasyNetQ.HostedService.Internals +{ + public enum MessageSerializationStrategy + { + Typed, + UnTyped, + Header + } +} \ No newline at end of file diff --git a/src/EasyNetQ.HostedService/Internals/UnhandledHeaderTypeException.cs b/src/EasyNetQ.HostedService/Internals/UnhandledHeaderTypeException.cs new file mode 100644 index 0000000..255a40f --- /dev/null +++ b/src/EasyNetQ.HostedService/Internals/UnhandledHeaderTypeException.cs @@ -0,0 +1,25 @@ +using System; +using System.Runtime.Serialization; +using EasyNetQ.HostedService.Abstractions; + +namespace EasyNetQ.HostedService.Internals +{ + public class UnhandledHeaderTypeException : Exception, INackWithRequeueException + { + /// + public UnhandledHeaderTypeException() { } + + /// + public UnhandledHeaderTypeException(string message) : base(message) { } + + /// + public UnhandledHeaderTypeException(string format, params object[] args) : base(string.Format(format, args)) { } + + /// + public UnhandledHeaderTypeException(string message, Exception inner) : base(message, inner) { } + + /// + protected UnhandledHeaderTypeException(SerializationInfo info, StreamingContext context) : base(info, context) { } + + } +} \ No newline at end of file diff --git a/src/EasyNetQ.HostedService/RabbitMqService.cs b/src/EasyNetQ.HostedService/RabbitMqService.cs index fd7a276..d5ae364 100755 --- a/src/EasyNetQ.HostedService/RabbitMqService.cs +++ b/src/EasyNetQ.HostedService/RabbitMqService.cs @@ -176,14 +176,15 @@ public static TDerived Create( /// . /// /// - /// + /// /// /// /// public static Lazy CreateLazyBus( IRabbitMqConfig rmqConfig, - bool useStronglyTypedMessages, + MessageSerializationStrategy messageSerializationStrategy, bool useCorrelationIds, + HeaderTypeSerializationConfiguration headerTypeSerializationConfiguration, IServiceProvider serviceProvider) => new Lazy(() => RabbitHutch.CreateBus(new ConnectionConfiguration { @@ -215,16 +216,31 @@ public static Lazy CreateLazyBus( var serializer = serviceResolver.Resolve(); var correlationIdGenerationStrategy = serviceResolver.Resolve(); - if (useStronglyTypedMessages) + switch (messageSerializationStrategy) { - var typeNameSerializer = serviceResolver.Resolve(); - - return new TypedMessageSerializationStrategy( - useCorrelationIds, typeNameSerializer, serializer, correlationIdGenerationStrategy); + case MessageSerializationStrategy.Typed: + { + var typeNameSerializer = serviceResolver.Resolve(); + + return new TypedMessageSerializationStrategy( + useCorrelationIds, + typeNameSerializer, + serializer, + correlationIdGenerationStrategy); + } + case MessageSerializationStrategy.Header: + { + return new HeaderMessageSerializationStrategy( + useCorrelationIds, + headerTypeSerializationConfiguration, + serializer, + correlationIdGenerationStrategy); + } + case MessageSerializationStrategy.UnTyped: + default: + return new UntypedMessageSerializationStrategy( + useCorrelationIds, serializer, correlationIdGenerationStrategy); } - - return new UntypedMessageSerializationStrategy( - useCorrelationIds, serializer, correlationIdGenerationStrategy); }); }).Advanced); From 7c3a9d7c2ff47964aafbe9f14344949d3abec499 Mon Sep 17 00:00:00 2001 From: Charalampos Detsis Date: Mon, 23 Aug 2021 17:38:51 +0300 Subject: [PATCH 2/2] add option for requeue --- .../HeaderMessageSerializationStrategy.cs | 14 +++++++++-- .../HeaderTypeSerializationConfiguration.cs | 2 ++ .../Internals/UnhandledHeaderTypeException.cs | 25 ------------------- ...UnhandledHeaderTypeExceptionWithRequeue.cs | 24 ++++++++++++++++++ ...andledHeaderTypeExceptionWithoutRequeue.cs | 24 ++++++++++++++++++ 5 files changed, 62 insertions(+), 27 deletions(-) delete mode 100644 src/EasyNetQ.HostedService/Internals/UnhandledHeaderTypeException.cs create mode 100644 src/EasyNetQ.HostedService/Internals/UnhandledHeaderTypeExceptionWithRequeue.cs create mode 100644 src/EasyNetQ.HostedService/Internals/UnhandledHeaderTypeExceptionWithoutRequeue.cs diff --git a/src/EasyNetQ.HostedService/Internals/HeaderMessageSerializationStrategy.cs b/src/EasyNetQ.HostedService/Internals/HeaderMessageSerializationStrategy.cs index 75a24d0..e2d74a5 100644 --- a/src/EasyNetQ.HostedService/Internals/HeaderMessageSerializationStrategy.cs +++ b/src/EasyNetQ.HostedService/Internals/HeaderMessageSerializationStrategy.cs @@ -56,7 +56,12 @@ public IMessage DeserializeMessage(MessageProperties properties, byte[] body) if (!typeHeaderValueExists) { - throw new UnhandledHeaderTypeException("Type header not present on message"); + if (_headerTypeSerializationConfiguration.ShouldRequeue) + { + throw new UnhandledHeaderTypeExceptionWithRequeue("Type header not present on message"); + } + + throw new UnhandledHeaderTypeExceptionWithoutRequeue("Type header not present on message"); } var typeExistsInMapping = _headerTypeSerializationConfiguration.TypeMappings.TryGetValue( @@ -65,7 +70,12 @@ public IMessage DeserializeMessage(MessageProperties properties, byte[] body) if (!typeExistsInMapping) { - throw new UnhandledHeaderTypeException("Message type from header not found in mapping"); + if (_headerTypeSerializationConfiguration.ShouldRequeue) + { + throw new UnhandledHeaderTypeExceptionWithRequeue("Message type from header not found in mapping"); + } + + throw new UnhandledHeaderTypeExceptionWithoutRequeue("Message type from header not found in mapping"); } var message = _serializer.BytesToMessage(messageType, body); diff --git a/src/EasyNetQ.HostedService/Internals/HeaderTypeSerializationConfiguration.cs b/src/EasyNetQ.HostedService/Internals/HeaderTypeSerializationConfiguration.cs index b3649d8..edbea71 100644 --- a/src/EasyNetQ.HostedService/Internals/HeaderTypeSerializationConfiguration.cs +++ b/src/EasyNetQ.HostedService/Internals/HeaderTypeSerializationConfiguration.cs @@ -8,5 +8,7 @@ public class HeaderTypeSerializationConfiguration public string TypeHeader { get; set; } public Dictionary TypeMappings { get; set; } + + public bool ShouldRequeue { get; set; } = true; } } \ No newline at end of file diff --git a/src/EasyNetQ.HostedService/Internals/UnhandledHeaderTypeException.cs b/src/EasyNetQ.HostedService/Internals/UnhandledHeaderTypeException.cs deleted file mode 100644 index 255a40f..0000000 --- a/src/EasyNetQ.HostedService/Internals/UnhandledHeaderTypeException.cs +++ /dev/null @@ -1,25 +0,0 @@ -using System; -using System.Runtime.Serialization; -using EasyNetQ.HostedService.Abstractions; - -namespace EasyNetQ.HostedService.Internals -{ - public class UnhandledHeaderTypeException : Exception, INackWithRequeueException - { - /// - public UnhandledHeaderTypeException() { } - - /// - public UnhandledHeaderTypeException(string message) : base(message) { } - - /// - public UnhandledHeaderTypeException(string format, params object[] args) : base(string.Format(format, args)) { } - - /// - public UnhandledHeaderTypeException(string message, Exception inner) : base(message, inner) { } - - /// - protected UnhandledHeaderTypeException(SerializationInfo info, StreamingContext context) : base(info, context) { } - - } -} \ No newline at end of file diff --git a/src/EasyNetQ.HostedService/Internals/UnhandledHeaderTypeExceptionWithRequeue.cs b/src/EasyNetQ.HostedService/Internals/UnhandledHeaderTypeExceptionWithRequeue.cs new file mode 100644 index 0000000..0cd4e94 --- /dev/null +++ b/src/EasyNetQ.HostedService/Internals/UnhandledHeaderTypeExceptionWithRequeue.cs @@ -0,0 +1,24 @@ +using System; +using System.Runtime.Serialization; +using EasyNetQ.HostedService.Abstractions; + +namespace EasyNetQ.HostedService.Internals +{ + public class UnhandledHeaderTypeExceptionWithRequeue : Exception, INackWithRequeueException + { + /// + public UnhandledHeaderTypeExceptionWithRequeue() { } + + /// + public UnhandledHeaderTypeExceptionWithRequeue(string message) : base(message) { } + + /// + public UnhandledHeaderTypeExceptionWithRequeue(string format, params object[] args) : base(string.Format(format, args)) { } + + /// + public UnhandledHeaderTypeExceptionWithRequeue(string message, Exception inner) : base(message, inner) { } + + /// + protected UnhandledHeaderTypeExceptionWithRequeue(SerializationInfo info, StreamingContext context) : base(info, context) { } + } +} \ No newline at end of file diff --git a/src/EasyNetQ.HostedService/Internals/UnhandledHeaderTypeExceptionWithoutRequeue.cs b/src/EasyNetQ.HostedService/Internals/UnhandledHeaderTypeExceptionWithoutRequeue.cs new file mode 100644 index 0000000..cfd1fa4 --- /dev/null +++ b/src/EasyNetQ.HostedService/Internals/UnhandledHeaderTypeExceptionWithoutRequeue.cs @@ -0,0 +1,24 @@ +using System; +using System.Runtime.Serialization; +using EasyNetQ.HostedService.Abstractions; + +namespace EasyNetQ.HostedService.Internals +{ + public class UnhandledHeaderTypeExceptionWithoutRequeue : Exception, INackWithoutRequeueException + { + /// + public UnhandledHeaderTypeExceptionWithoutRequeue() { } + + /// + public UnhandledHeaderTypeExceptionWithoutRequeue(string message) : base(message) { } + + /// + public UnhandledHeaderTypeExceptionWithoutRequeue(string format, params object[] args) : base(string.Format(format, args)) { } + + /// + public UnhandledHeaderTypeExceptionWithoutRequeue(string message, Exception inner) : base(message, inner) { } + + /// + protected UnhandledHeaderTypeExceptionWithoutRequeue(SerializationInfo info, StreamingContext context) : base(info, context) { } + } +} \ No newline at end of file