Last week while researching an issue with one of our systems I discovered that six days worth of messages were missing from a queue. After the initial panic we started researching. One of my colleagues discovered the Transactional Dead-Letter queue had a large amount of messages piled up in it. I had read some about that queue but never really dug into it. This gem of an article got me started on a path that would ultimately lead to a best of breed type solution.
The solution from Billy Dunlop's article has you change the ServiceBehavior AddressFilterMode to "Any". What this does is allows your queued service to receive messages destined for any endpoint, so long as the message body is what it expects. I have used this in the past for poison queues and it made perfect sense here. However, we had one other problem. Some of the messages in the dead-letter queue were for other queued services and simply changing one of our services to use the dead letter queue as it's endpoint would not work right. The solution is to use a more generic contract similar to the WCF Message Router covered in this MSDN article.
I took the router approach and created a router service and console application to host it. For work I did all this in VB.NET. As a tribute to my very good friend David Risko I spent the weekend rewriting the whole thing in C#. This was my first shot at writing something from scratch in C#, so if you see something that could be condensed or written in a more clear manner please do let me know.
The default Time-to-live on the NetMsmqBinding is 1 day. This can be adjusted in configuration or through code when you create the binding. As a simple test you can create a NetMsmq service client and set your binding time-to-live to something ridiculous like 30 seconds (it's a time span so you can do this with TimeSpan.FromSeconds(int)). Watch the message hit your transactional queue and refresh and after 30 seconds, *poof*, message gone and you can now find it in your Transactional Dead-Letter queue.
Feel free to follow the code below or download the .ZIP file with the solution. The solution was created in Visual Studio 2008 SP1.
On to the code!
The contracts – included are the Service Contract which will be used by the router service and the Channel contract which will be used by the MessageRouter class to resend the message to it's intended destination. The service contract uses System.ServiceModel.Channels.Message as it's input message. This allows the router to receive any WCF message for any service without having to understand the body. This is a hugely powerful ability that should be used with great care.
namespace CD.ServiceModel.NetMsmqRouter.Contracts
{
/// <summary>
///Service contract interface for the message router service.
/// </summary>
[System.ServiceModel.ServiceContract]
public interfaceIMessageRouterService
{
[System.ServiceModel.OperationContract(IsOneWay=true,Action="*")]
void Route(System.ServiceModel.Channels.Message message);
}
public interface IMessageRouterChannel : System.ServiceModel.IClientChannel,IMessageRouterService
{
}
}
The biggest piece I had to get my head around when learning WCF was the notion of the contract. The same interface will be implemented by both the service and client but each one implements it differently. The service implements it from the perspective of receiving the message and the client implements it from the perspective of sending the message.
Next up is the service implementation. Pretty straight forward and it does two things – uses the MessageWriter class to save the message to disk (in case something really bad happens) and then uses the MessageRouter class to send the message to it's original intended destination.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace CD.ServiceModel.NetMsmqRouter.Service
{
[System.ServiceModel.ServiceBehavior(AddressFilterMode = System.ServiceModel.AddressFilterMode.Any,
ValidateMustUnderstand = false)]
public class RouterService : Contracts.IMessageRouterService
{
private MessageWriter _writer;
private MessageRouter _router;
/// <summary>
/// Initializes a new instance of the RouterService class.
/// </summary>
public RouterService()
{
_writer = new MessageWriter(Properties.Settings.Default.MessageWriterPath);
_router = new MessageRouter(Properties.Settings.Default.NetMsmqBindingConfigurationName);
}
#region IMessageRouterService Members
[System.ServiceModel.OperationBehavior(TransactionAutoComplete = true, TransactionScopeRequired = true)]
public void Route(System.ServiceModel.Channels.Message message)
{
try
{
System.Diagnostics.Trace.WriteLine("----- Message Received -----");
System.ServiceModel.Channels.MessageBuffer messageCopyToWrite = message.CreateBufferedCopy(Int32.MaxValue);
_writer.Write(messageCopyToWrite);
_router.RouteMessage(messageCopyToWrite.CreateMessage());
System.Diagnostics.Trace.WriteLine("----- Message Processing Complete -----");
}
catch (Exception ex)
{
System.Diagnostics.Trace.WriteLine(ex.ToString());
throw;
}
}
#endregion
}
}
The service is geared to work with the Transactional Dead-Letter queue which is why the Route method is marked up with the TransactionAutoComplete and TransactionScopeRequired attributes. Within the try/catch block the service does not handle the exception in order to facilitate the aborting of the transaction scope the message is received under and this is the recommended pattern when you use TransactionAutoComplete = true. If we did not throw here the scope would complete and the message would be lost if it was not routed to it's original queue.
The message writer class has one responsibility – write the message to the path supplied in it's constructor. The message router class is the one we'll focus on next as it is responsible for taking the original message and routing it to the messages original destination:
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.ServiceModel;
using System.Text;
using System.Transactions;
namespace CD.ServiceModel.NetMsmqRouter.Service
{
/// <summary>
/// Routes a message to its original destination
/// </summary>
internal class MessageRouter
{
private string _bindingConfigurationName;
/// <summary>
/// Creates a new instance of the message router class.
/// </summary>
/// <param name="bindingConfigurationName"></param>
public MessageRouter(string bindingConfigurationName)
{
_bindingConfigurationName = bindingConfigurationName;
}
/// <summary>
/// Gets an instance of the NetMsmqBinding
/// </summary>
/// <returns></returns>
/// <remarks>Returns the default binding if no configuration name was specified.</remarks>
private NetMsmqBinding GetBinding()
{
if (!string.IsNullOrEmpty(_bindingConfigurationName))
{
return new NetMsmqBinding(_bindingConfigurationName);
}
else
{
return new NetMsmqBinding();
}
}
/// <summary>
/// Returns an instance of a Contracts.IMessageRouterChannel
/// </summary>
/// <param name="binding"></param>
/// <param name="address"></param>
/// <returns></returns>
private Contracts.IMessageRouterChannel GetChannel(NetMsmqBinding binding, string address)
{
return new ChannelFactory<Contracts.IMessageRouterChannel>(binding, address).CreateChannel();
}
/// <summary>
/// Routes the message to its destination uri.
/// </summary>
/// <param name="message">The message to send</param>
public void RouteMessage(System.ServiceModel.Channels.Message message)
{
System.Diagnostics.Trace.WriteLine(String.Format("Message destination: {0}", message.Headers.To.ToString()));
Uri destinationUri = message.Headers.To;
System.ServiceModel.NetMsmqBinding binding = GetBinding();
using (Contracts.IMessageRouterChannel channel = GetChannel(binding, destinationUri.ToString()))
{
using (System.Transactions.TransactionScope scope =
new System.Transactions.TransactionScope(TransactionScopeOption.RequiresNew))
{
try
{
channel.Route(message);
scope.Complete();
}
catch (Exception ex)
{
Trace.WriteLine(String.Format("**** Error ****\n{0}", ex.ToString()));
channel.Abort();
throw;
}
}
}
}
}
}
All that is left is to wire up the service in the console host and then add the proper configuration elements.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using CD.ServiceModel.NetMsmqRouter;
namespace RouterServiceConsoleHost
{
class Program
{
static void Main(string[] args)
{
System.Diagnostics.Trace.Listeners.Add(new System.Diagnostics.TextWriterTraceListener(Console.Out));
System.Diagnostics.Trace.AutoFlush = true;
System.Diagnostics.Trace.WriteLine("Starting service.");
StartServiceHost(typeof(CD.ServiceModel.NetMsmqRouter.Service.RouterService));
}
public static void StartServiceHost(Type serviceType)
{
using (System.ServiceModel.ServiceHost serviceHost = new System.ServiceModel.ServiceHost(serviceType))
{
serviceHost.Open();
Trace.WriteLine("Service started...press <Enter> to exit");
Trace.WriteLine("");
Trace.WriteLine("");
Console.ReadLine();
if (serviceHost.State == System.ServiceModel.CommunicationState.Faulted)
{
serviceHost.Abort();
}
serviceHost.Close();
}
}
}
}
And the configuration:
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<configSections>
<sectionGroup name="applicationSettings" type="System.Configuration.ApplicationSettingsGroup,
System, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" >
<section name="CD.ServiceModel.NetMsmqRouter.Service.Properties.Settings"
type="System.Configuration.ClientSettingsSection, System, Version=2.0.0.0, Culture=neutral,
PublicKeyToken=b77a5c561934e089" requirePermission="false" />
</sectionGroup>
</configSections>
<applicationSettings>
<CD.ServiceModel.NetMsmqRouter.Service.Properties.Settings>
<setting name="MessageWriterPath" serializeAs="String">
<value>C:\Users\Chris\Desktop\RecoveryRouter\</value>
</setting>
<setting name="NetMsmqBindingConfigurationName" serializeAs="String">
<value>RecoveryRouterNetMsmqBinding</value>
</setting>
</CD.ServiceModel.NetMsmqRouter.Service.Properties.Settings>
</applicationSettings>
<system.serviceModel>
<bindings>
<netMsmqBinding>
<binding name="RecoveryRouterNetMsmqBinding" maxReceivedMessageSize="4194304"
maxRetryCycles="2" receiveRetryCount="1" timeToLive="1">
<readerQuotas maxStringContentLength="4194304"/>
<security mode="None">
<transport msmqProtectionLevel="None"/>
</security>
</binding>
</netMsmqBinding>
</bindings>
<services>
<service name="CD.ServiceModel.NetMsmqRouter.Service.RouterService">
<endpoint address="net.msmq://localhost/system$;DeadXact" binding="netMsmqBinding"
bindingConfiguration="RecoveryRouterNetMsmqBinding" name="RecoveryRouterService"
contract="CD.ServiceModel.NetMsmqRouter.Contracts.IMessageRouterService"/>
</service>
</services>
</system.serviceModel>
</configuration>
Technorati Tags:
WCF,
MSMQ
Currently rated 4.0 by 1 people
- Currently 4/5 Stars.
- 1
- 2
- 3
- 4
- 5