Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions doc/EasyNetQ.HostedService.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ namespace EasyNetQ.HostedService.DependencyInjection
/// </example>
public sealed class RabbitMqServiceBuilder<T> where T : RabbitMqService<T>
{
private bool _configUseStronglyTypedMessages;
private MessageSerializationStrategy _configUseStronglyTypedMessages = MessageSerializationStrategy.UnTyped;
private bool _configUseCorrelationIds;
private HeaderTypeSerializationConfiguration _headerTypeSerializationConfiguration;
private IRabbitMqConfig _configRabbitMqConfig;
private readonly List<OnConnectedCallback> _configOnConnected = new List<OnConnectedCallback>();

Expand All @@ -45,11 +46,18 @@ public RabbitMqServiceBuilder<T> WithStronglyTypedMessages
{
get
{
_configUseStronglyTypedMessages = true;
_configUseStronglyTypedMessages = MessageSerializationStrategy.Typed;

return this;
}
}

public RabbitMqServiceBuilder<T> WithHeaderTypedMessages(HeaderTypeSerializationConfiguration configuration)
{
_configUseStronglyTypedMessages = MessageSerializationStrategy.Header;
_headerTypeSerializationConfiguration = configuration;
return this;
}

/// <summary>
/// When set, producers will send a correlation id along with each message.
Expand Down Expand Up @@ -173,7 +181,7 @@ public Func<IServiceProvider, T> Build(IServiceCollection serviceCollection)
.BuildServiceProvider();

var bus = RabbitMqService<T>.CreateLazyBus(
_configRabbitMqConfig, _configUseStronglyTypedMessages, _configUseCorrelationIds, serviceProvider);
_configRabbitMqConfig, _configUseStronglyTypedMessages, _configUseCorrelationIds, _headerTypeSerializationConfiguration, serviceProvider);

busProxy = new BusProxy(_configRabbitMqConfig.Id, bus);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
using System;
using System.Linq;

namespace EasyNetQ.HostedService.Internals
{
public class HeaderMessageSerializationStrategy : IMessageSerializationStrategy
{
private readonly bool _useCorrelationIds;
private readonly ISerializer _serializer;
private readonly ICorrelationIdGenerationStrategy _correlationIdGenerator;
private readonly HeaderTypeSerializationConfiguration _headerTypeSerializationConfiguration;

public HeaderMessageSerializationStrategy(
bool useCorrelationIds,
HeaderTypeSerializationConfiguration headerTypeSerializationConfiguration,
ISerializer serializer,
ICorrelationIdGenerationStrategy correlationIdGenerator)
{
_useCorrelationIds = useCorrelationIds;
_serializer = serializer;
_correlationIdGenerator = correlationIdGenerator;
_headerTypeSerializationConfiguration = headerTypeSerializationConfiguration;
}

public SerializedMessage SerializeMessage(IMessage message)
{
var bytes = _serializer.MessageToBytes(message.MessageType, message.GetBody());
var properties = message.Properties;

var typeHeader = _headerTypeSerializationConfiguration
.TypeMappings
.SingleOrDefault(v => message.MessageType == v.Value)
.Key;

if (string.IsNullOrEmpty(typeHeader))
{
throw new EasyNetQException(
$"Did not find a unique mapping for the specified type {message.MessageType}");
}

properties.Headers.Add(_headerTypeSerializationConfiguration.TypeHeader, typeHeader);

if (_useCorrelationIds && string.IsNullOrEmpty(properties.CorrelationId))
{
properties.CorrelationId = _correlationIdGenerator.GetCorrelationId();
}

return new SerializedMessage(properties, bytes);
}

public IMessage DeserializeMessage(MessageProperties properties, byte[] body)
{
var typeHeaderValueExists = properties.Headers.TryGetValue(
_headerTypeSerializationConfiguration.TypeHeader,
out var typeFromHeader);

if (!typeHeaderValueExists)
{
if (_headerTypeSerializationConfiguration.ShouldRequeue)
{
throw new UnhandledHeaderTypeExceptionWithRequeue("Type header not present on message");
}

throw new UnhandledHeaderTypeExceptionWithoutRequeue("Type header not present on message");
}

var typeExistsInMapping = _headerTypeSerializationConfiguration.TypeMappings.TryGetValue(
System.Text.Encoding.Default.GetString((byte[])typeFromHeader),
out var messageType);

if (!typeExistsInMapping)
{
if (_headerTypeSerializationConfiguration.ShouldRequeue)
{
throw new UnhandledHeaderTypeExceptionWithRequeue("Message type from header not found in mapping");
}

throw new UnhandledHeaderTypeExceptionWithoutRequeue("Message type from header not found in mapping");
}

var message = _serializer.BytesToMessage(messageType, body);

return MessageFactory.CreateInstance(messageType, message, properties);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;

namespace EasyNetQ.HostedService.Internals
{
public class HeaderTypeSerializationConfiguration
{
public string TypeHeader { get; set; }

public Dictionary<string, Type> TypeMappings { get; set; }

public bool ShouldRequeue { get; set; } = true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace EasyNetQ.HostedService.Internals
{
public enum MessageSerializationStrategy
{
Typed,
UnTyped,
Header
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using System.Runtime.Serialization;
using EasyNetQ.HostedService.Abstractions;

namespace EasyNetQ.HostedService.Internals
{
public class UnhandledHeaderTypeExceptionWithRequeue : Exception, INackWithRequeueException
{
/// <inheritdoc />
public UnhandledHeaderTypeExceptionWithRequeue() { }

/// <inheritdoc />
public UnhandledHeaderTypeExceptionWithRequeue(string message) : base(message) { }

/// <inheritdoc />
public UnhandledHeaderTypeExceptionWithRequeue(string format, params object[] args) : base(string.Format(format, args)) { }

/// <inheritdoc />
public UnhandledHeaderTypeExceptionWithRequeue(string message, Exception inner) : base(message, inner) { }

/// <inheritdoc />
protected UnhandledHeaderTypeExceptionWithRequeue(SerializationInfo info, StreamingContext context) : base(info, context) { }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using System.Runtime.Serialization;
using EasyNetQ.HostedService.Abstractions;

namespace EasyNetQ.HostedService.Internals
{
public class UnhandledHeaderTypeExceptionWithoutRequeue : Exception, INackWithoutRequeueException
{
/// <inheritdoc />
public UnhandledHeaderTypeExceptionWithoutRequeue() { }

/// <inheritdoc />
public UnhandledHeaderTypeExceptionWithoutRequeue(string message) : base(message) { }

/// <inheritdoc />
public UnhandledHeaderTypeExceptionWithoutRequeue(string format, params object[] args) : base(string.Format(format, args)) { }

/// <inheritdoc />
public UnhandledHeaderTypeExceptionWithoutRequeue(string message, Exception inner) : base(message, inner) { }

/// <inheritdoc />
protected UnhandledHeaderTypeExceptionWithoutRequeue(SerializationInfo info, StreamingContext context) : base(info, context) { }
}
}
36 changes: 26 additions & 10 deletions src/EasyNetQ.HostedService/RabbitMqService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,15 @@ public static TDerived Create<TDerived>(
/// <see cref="RabbitMqServiceBuilder{T}"/>.
/// </summary>
/// <param name="rmqConfig"/>
/// <param name="useStronglyTypedMessages"/>
/// <param name="messageSerializationStrategy"/>
/// <param name="useCorrelationIds"/>
/// <param name="serviceProvider"/>
/// <returns/>
public static Lazy<IAdvancedBus> CreateLazyBus(
IRabbitMqConfig rmqConfig,
bool useStronglyTypedMessages,
MessageSerializationStrategy messageSerializationStrategy,
bool useCorrelationIds,
HeaderTypeSerializationConfiguration headerTypeSerializationConfiguration,
IServiceProvider serviceProvider) =>
new Lazy<IAdvancedBus>(() => RabbitHutch.CreateBus(new ConnectionConfiguration
{
Expand Down Expand Up @@ -215,16 +216,31 @@ public static Lazy<IAdvancedBus> CreateLazyBus(
var serializer = serviceResolver.Resolve<ISerializer>();
var correlationIdGenerationStrategy = serviceResolver.Resolve<ICorrelationIdGenerationStrategy>();

if (useStronglyTypedMessages)
switch (messageSerializationStrategy)
{
var typeNameSerializer = serviceResolver.Resolve<ITypeNameSerializer>();

return new TypedMessageSerializationStrategy(
useCorrelationIds, typeNameSerializer, serializer, correlationIdGenerationStrategy);
case MessageSerializationStrategy.Typed:
{
var typeNameSerializer = serviceResolver.Resolve<ITypeNameSerializer>();

return new TypedMessageSerializationStrategy(
useCorrelationIds,
typeNameSerializer,
serializer,
correlationIdGenerationStrategy);
}
case MessageSerializationStrategy.Header:
{
return new HeaderMessageSerializationStrategy(
useCorrelationIds,
headerTypeSerializationConfiguration,
serializer,
correlationIdGenerationStrategy);
}
case MessageSerializationStrategy.UnTyped:
default:
return new UntypedMessageSerializationStrategy(
useCorrelationIds, serializer, correlationIdGenerationStrategy);
}

return new UntypedMessageSerializationStrategy(
useCorrelationIds, serializer, correlationIdGenerationStrategy);
});
}).Advanced);

Expand Down