The goals of Spring's MSMQ 3.0 messaging support is to raise the
level of abstraction when writing MSMQ applications. The
System.Messaging
API is a low-level API that provides
the basis for creating a messaging application. However, 'Out-of-the-box',
System.Messaging
leaves the act of creating
sophisticated multi-threaded messaging servers and clients as an
infrastructure activity for the developer. Spring fills this gap by
proving easy to use helper classes that makes creating an enterprise
messaging application easy. These helper classes take into account the
nuances of the System.Messaging
API, such as its lack
of thread-safety in many cases, the handling of so-called 'poison
messages' (messages that are endlessly redelivered due to an unrecoverable
exception during message processing), and combining database transactions
with message transactions. Other goals of Spring's MSMQ messaging support
are to support messaging best practices, in particular encouraging a clean
architectural layering that separates the messaging middleware specifics
from the core business processing.
Spring's approach to distributed computing has always been to
promote a plain old CLR object approach or a POCO programming model. In
this approach plain CLR objects are those that are devoid of any
reference to a particular middleware technology. Spring provides the
'adapter' classes that converts between the middleware world, in this case
MSMQ, and the oo-world of your business processing. This is done through
the use of Spring's MessageListenerAdapter
class and
IMessageConverters
.
The namespace Spring.Messaging
provides the core
functionality for messaging. It contains the class
MessageQueueTemplate
that simplifies the use of
System.Messaging.MessageQueue
by handling the lack of
thread-safety in most of
System.Messaging.MessageQueue's
methods (for example
Send
). A single instance of
MessageQueueTemplate
can be used throughout your
application and Spring will ensure that a different instance of a
MessageQueue
class is used per thread when using
MessageQueueTemplate's
methods. This per-thread
instance of a System.Messaging.MessageQueue
is also
available via its property MessageQueue
. The
MessageQueueTemplate
class is also aware of the
presence of either an 'ambient' System.Transaction's
transaction or a local
System.Messaging.MessageQueueTransaction
. As such if
you use MessageQueueTemplate's
send and receive
methods, unlike with plain use of
System.Messaging.MessageQueue
, you do not need to keep
track of this information yourself and call the correct overloaded
System.Messaging.MessageQueue
method for a specific
transaction environment. When using a
System.Messaging.MessageQueueTransaction
this would
usually require you as a developer to come up with your own mechanism for
passing around a MessageQueueTransaction
to multiple
classes and layers in your application.
MessageQueueTemplate
manages this for you, so you don't
have to do so yourself. These resource management and transaction features
of MessageQueueTemplate
are quite analogous to the
transactional features of Spring's AdoTemplate
in case
you are already familiar with that functionality.
For asynchronous reception Spring provides several multi-threaded message listener containers. You can pick and configure the container that matches your message transactional processing needs and configure poison-message handling policies. The message listener container leverages Spring's support for managing transactions. Both DTC, local messaging transactions, and local database transactions are supported. In particular, you can easily coordinate the commit and rollback of a local MessageQueueTransaction and a local database transaction when they are used together.
From a programming perspective, Spring's MSMQ support involves you
configuring message listener containers and
writing a callback function for message processing.
On the sending side, it involves you learning how to use
MessageQueueTemplate
. In both cases you will quite
likely want to take advantage of using
MessageListenerConverters
so you can better structure
the translation from the System.Messaging.Message data structure to your
business objects. After the initial learning hurdle, you should find that
you will be much more productive leveraging Spring's helper classes to
write enterprise MSMQ applications than rolling your own infrastructure.
Feedback and new feature requests are always welcome.
The Spring.MsmqQuickstart application located in the examples directory of the distribution shows this functionality in action.
Here is a quick example of how to use Spring's MSMQ support to create a client that sends a message and a multi-threaded server application that receives the message. (The client code could also be used as-is in a multi-threaded environment but this is not demonstrated).
On the client side you create an instance of the
MessageQueueTemplate
class and configure it to use a
MessageQueue
. This can be done programmatically but it
is common to use dependency injection and Spring's XML configuration file
to configure your client class as shown below.
<object id='questionTxQueue' type='Spring.Messaging.Support.MessageQueueFactoryObject, Spring.Messaging'> <property name='Path' value='.\Private$\questionTxQueue'/> <property name='MessageReadPropertyFilterSetAll' value='true'/> </object> <object id="messageQueueTemplate" type="Spring.Messaging.Core.MessageQueueTemplate, Spring.Messaging"> <property name="MessageQueueObjectName" value="questionTxQueue"/> </object> <!-- Class you write --> <object id="questionService" type="MyNamespace.QuestionService, MyAssembly"> <property name="MessageQueueTemplate" ref="messageQueueTemplate"/> <object>
The MessageQueue
object is created via an
instance of MessageQueueFactoryObject
and the
MessageQueueTemplate
refers to this factory object by
name and not by reference. The SimpleSender
class looks
like this
public class QuestionService : IQuestionService { private MessageQueueTemplate messageQueueTemplate; public MessageQueueTemplate { get { return messageQueueTemplate; } set { messageQueueTemplate = value; } } public void SendQuestion(string question) { MessageQueueTemplate.ConvertAndSend(question); } }
This class can be shared across multiple threads and the
MessageQueueTemplate
will take care of managing thread
local access to a System.Messaging.MessageQueue
as well
as any System.Messaging.IMessageFormatter
instances.
Furthermore, since this is a transactional queue (only the name
gives it away), the message will be sent using a single local messaging
transaction. The conversion from the string to the underling message is
managed by an instance of the IMessageConverter
class.
By default an implementation that uses an
XmlMessageFormatter
with a
TargetType
of System.String
is used.
You can configure the MessageQueueTemplate
to use other
IMessageConveter
implementations that do conversions
above and beyond what the 'stock' IMessageFormatters
do. See the section on MessageConverters for more details.
On the receiving side we would like to consume the messages
transactionally from the queue. Since no other database operations are
being performed in our server side processing, we select the
TransactionMessageListenerContainer
and configure it to
use the MessageQueueTransactionManager
. The
MessageQueueTransactionManager
an implementation of
Spring's IPlatformTransactionManager
abstraction that
provides a uniform API on top of various transaction manager
(ADO.NET,NHibernate, MSMQ, etc). Spring's
MessageQueueTransactionManager
is responsible for
createing, committing, and rolling back a MSMQ
MessageQueueTransaction
.
While you can create the message listener container programmatically, we will show the declarative configuration approach below
<!-- Queue to receive from --> <object id='questionTxQueue' type='Spring.Messaging.Support.MessageQueueFactoryObject, Spring.Messaging'> <property name='Path' value='.\Private$\questionTxQueue'/> <property name='MessageReadPropertyFilterSetAll' value='true'/> </object> <!-- MSMQ Transaction Manager --> <object id="messageQueueTransactionManager" type="Spring.Messaging.Core.MessageQueueTransactionManager, Spring.Messaging"/> <!-- Message Listener Container that uses MSMQ transactional for receives --> <object id="transactionalMessageListenerContainer" type="Spring.Messaging.Listener.TransactionalMessageListenerContainer, Spring.Messaging"> <property name="MessageQueueObjectName" value="questionTxQueue"/> <property name="PlatformTransactionManager" ref="messageQueueTransactionManager"/> <property name="MaxConcurrentListeners" value="10"/> <property name="MessageListener" ref="messageListenerAdapter"/> </object> <!-- Adapter to call a POCO as a messaging callback --> <object id="messageListenerAdapter" type="Spring.Messaging.Listener.MessageListenerAdapter, Spring.Messaging"> <property name="HandlerObject" ref="questionHandler"/> </object> <!-- The POCO class that you write --> <object id="questionHandler" type="MyNamespace.QuestionHandler, MyAssembly"/>
We have specified the queue to listen, that we want to consume the
messages transactionally, process messages from the queue using 10
threads, and that our plain object that will handle the business
processing is of the type QuestionHandler
. The only
class you need to write, QuestionHandler
, looks
like
public class QuestionHandler : IQuestionHandler { public void HandleObject(string question) { // perform message processing here Console.WriteLine("Received question: " + question); // use an instance of MessageQueueTemplate and have other MSQM send operations // partake in the same local message transaction used to receive } }
That is general idea. You write the sender class using
MessageQueueTemplate
and the consumer class which does
not refer to any messaging specific class. The rest is configuration of
Spring provided helper classes.
Note that if the HandleObject
method has returned
a string value a reply message would be sent to a response queue. The
response queue would be taken from the Message's own
ResponseQueue
property or can be specified explicitly
using MessageListenerAdapter's DefaultResponseQueueName
property.
If an exception is thrown inside the QuestionHandler, then the MSMQ transaction is rolled back, putting the message back on the queue for redelivery. If the exception is not due to a transient error in the system, but a logical processing exception, then one would get endless redelivery of the message - clearly not a desirable situation. These messages are so called 'poison messages' and a strategy needs to be developed to deal with them. This is left as a development task if you when using the System.Messaging APIs but Spring provides a strategy for handling poison messages, both for DTC based message reception as well as for local messaging transactions.
In the last part this 'quick tour' we will configure the message
listener container to handle poison messages. This is done by creating an
instance of SendToQueueExceptionHandler
and setting the
property MaxRetry
to be the number of exceptions or
retry attempts we are willing to tolerate before taking corrective
actions. In this case, the corrective action is to send the message to
another queue. We can then create other message listener containers to
read from those queues and handle the messages appropriately or perhaps
you will avoid automated processing of these messages and take manual
corrective actions.
<!-- The 'error' queue to send poison messages --> <object id='errorQuestionTxQueue' type='Spring.Messaging.Support.MessageQueueFactoryObject, Spring.Messaging'> <property name='Path' value='.\Private$\errorQuestionTxQueue'/> <property name='MessageReadPropertyFilterSetAll' value='true'/> </object> <!-- Message Listener Container that uses MSMQ transactional for receives --> <object id="transactionalMessageListenerContainer" type="Spring.Messaging.Listener.TransactionalMessageListenerContainer, Spring.Messaging"> <!-- as before but adding --> <property name="MessageTransactionExceptionHandler" ref="messageTransactionExceptionHandler"/> </object> <!-- Poison message handling policy --> <object id="messageTransactionExceptionHandler" type="Spring.Messaging.Listener.SendToQueueExceptionHandler, Spring.Messaging"> <property name="MaxRetry" value="5"/> <property name="MessageQueueObjectName" value="errorQuestionTxQueue"/> </object>
In the event of an exception while processing the message, the
message transaction will be rolled back (putting the message back on the
queue questionTxQueue for redelivery). If the same message causes an
exception in processing 5 times ,then it will be sent transactionally to
the errorQuestionTxQueue and the message transaction will commit (removing
it from the queue questionTxQueue). You can also specify that certain
exceptions should commit the transaction (remove from the queue) but this
is not shown here ,see below for more informatio non this functionality
The SendToQueueExceptionHandler
implements the
interface IMessageTransactionExceptionHandler
(discussed below) so you can write your own implementations should the
provided ones not meet your needs.
That's the quick tour folks. Hopefully you got a general feel for how things work, what requires configuration, and what is the code you need to write. The following sections describe each of Spring's helper classes in more detail. The sample application that ships with Spring is also a good place to get started.
The MessageQueueTemplate
is used for
synchronously sending and receiving messages. A single instance can be
shared across multiple threads, unlike the standard
System.Messaging.MessageQueue
class. (One less
resource management issue to worry about!) A thread-local instance of
the MessageQueue
class is available via
MessageQueueTemplate's
property
MessageQueue
. A
MessageQueueTemplate
is created by passing a
reference to the name of a MessageQueueFactoryObject
,
you can think of it as a friendly name for your
MessagingQueue
and the recipe of how to create an
instance of it. See the following section on
MessageQueueFactoryObject
for more
information.
The MessageQueueTemplate
also provides several
convenience methods for sending and receiving messages. A family of
overloaded ConvertAndSend
and
ReceiveAndConvert
methods allow you to send and
receive an object. The default message queue to send and receive from is
specified using the MessageQueueTemplate's
property
MessageQueueObjectName
. The responsibility of
converting the object to a Message
and vice versa is
given to the template's associated IMessageConverter
implementation. This can be set using the property
MessageConverter
. The default implementation,
XmlMessageConverter
, uses an
XmlMessageFormatter
with its
TargetType
set to System.String
.
Note that System.Messaging.IMessageFormatter
classes
are also not thread safe, so MessageQueueTemplate
ensures that thread-local instances of
IMessageConverter
are used (as they generally wrap
IMessageFormatter's
that are not thread-safe).
You can use the MessageQueueTemplate
to send
messages to other MessageQueues by specifying their queue 'object name',
the name of the MessageQueueFactoryObject
.
The family of overloaded ConvertAndSend
and
ReceiveAndConvert
methods are shown below
void ConvertAndSend(object obj); void ConvertAndSend(object obj, MessagePostProcessorDelegate messagePostProcessorDelegate); void ConvertAndSend(string messageQueueObjectName, object message); void ConvertAndSend(string messageQueueObjectName, object obj, MessagePostProcessorDelegate messagePostProcessorDelegate); object ReceiveAndConvert(); object ReceiveAndConvert(string messageQueueObjectName);
The transactional settings of the underlying overloaded
System.Messaging.MessageQueue
Send method that are
used are based on the following algorithm.
If the message queue is transactional and there is an ambient
MessageQueueTransaction
in thread local storage
(put there via the use of Spring's
MessageQueueTransactionManager
or
TransactionalMessageListenerContainer
), the
message will be sent transactionally using the
MessageQueueTransaction
object in thread local
storage.
Note | |
---|---|
This lets you group together multiple messaging operations
within the same transaction without having to explicitly pass
around the |
f the message queue is transactional but there is no ambient
MessageQueueTransaction
, then a single message
transaction is created on each messaging operation.
(MessageQueueTransactionType = Single).
If there is an ambient System.Transactions transaction then that transaction will be used (MessageQueueTransactionType = Automatic).
If the queue is not transactional, then a non-transactional send (MessageQueueTransactionType = None) is used.
The delegate MessagePostProcessorDelegate
has
the following signature
public delegate Message MessagePostProcessorDelegate(Message message);
This lets you modify the message after it has been converted from
and object to a message using the IMessageConverter
but before it is sent. This is useful for setting
Message
properties (e.g.
CorrelationId
, AppSpecific
,
TimeToReachQueue
). Using anonymous delegates in .NET
2.0 makes this a very succinct coding task. If you have elaborate
properties that need to be set, perhaps creating a custom
IMessageConverter
would be appropriate.
Overloaded Send
and Receive
operations that use the algorithm listed above to set transactional
delivery options are also available. These are listed below
Message Receive(); Message Receive(string messageQueueObjectName); void Send(Message message); void Send(string messageQueueObjectName, Message message); void Send(MessageQueue messageQueue, Message message);
Note that in the last Send
method that takes a
MessageQueue
instance, it is the callers
responsibility to ensure that this instance is not accessed from
multiple threads. This Send
method is commonly used
when getting the MessageQueue
from the
ResponseQueue
property of a
Message
during an asynchronous receive process. The
receive timeout of the Receive
operations is set
using the ReceiveTimeout
property of
MessageQueueTemplate
. The default value is
MessageQueue.InfiniteTimeout
(which is actually ~3
months).
The XML configuration snippit for defining a MessageQueueTemplate is shown in the previous section and also is located in the MSMQ quickstart application configuraiton file Messaging.xml
The MessageQueueFactoryObject
is responsible
for creating MessageQueue
instances. You configure
the factory with some basic information, namely the constructor
parameters you are familiar with already when creating a standard
MessageQueue
instance, and then setting
MessageQueue
properties, such a Label etc. Some
configuration tasks of a MessageQueue
involve calling
methods, for example to set which properties of the message to read.
These available as properties to set on the
MessageQueueFactoryObject
. An example declarative
configuration is shown below
<object id='testqueue' type='Spring.Messaging.Support.MessageQueueFactoryObject, Spring.Messaging'> <!-- propeties passed to the MessageQueue constructor --> <property name='Path' value='.\Private$\testqueue'/> <property name='DenySharedReceive' value='true'/> <property name='AccessMode' value='Receive'/> <property name='EnableCache' value='true'/> <!-- properties that call configuration methods on the MessageQueue --> <property name='MessageReadPropertyFilterSetAll' value='true'/> <property name='ProductTemplate'> <object> <property name='Label' value='MyLabel'/> <!-- other MessageQueue properties can be set here --> </object> </property> </object>
Whenever an object reference is made to 'testqueue' an new
instance of the MessageQueue
class is created. This
Spring's so-called 'prototype' model, which differs from 'singleton'
mode. In the singleton creation mode whenever an object reference is
made to a 'testqueue' the same MessageQueue
instance
would be used. So that a new instance can be retrieved based on need,
the message listener containers take as an argument the name of the
MessageQueueFactoryObject
and not a reference. (i.e.
use of 'value' instead of 'ref' in the XML).
Note | |
---|---|
The <mq:messageQueue id="testqueue" path=".\Private$\testqueue" MessageReadPropertyFilterSetAll="true"> <mq:properties label="MyLabel"/> </mq:messageQueue> |
MessageQueues
and
IMessageFormatters
(commonly used in
IMessageConverter
implementations) are not
thread-safe. For example, only the following methods on
MessageQueue
are thread-safe,
BeginPeek
, BeginReceive
,
EndPeek
, EndReceive
,
GetAllMessages
, Peek
, and
Receive
.
To isolate the creation logic of these classes, the factory
interface IMessageQueueFactory
is used. The interface
is shown below
public interface IMessageQueueFactory { MessageQueue CreateMessageQueue(string messageQueueObjectName); IMessageConverter CreateMessageConverter(string messageConverterObjectName); }
A provided implementation,
DefaultMessageQueueFactory
will create an instance of
each class per-thread. It delegates the creation of the
MessageQueue
instance to the Spring container. The
argument, messageConverterObjectName, must be the id/name of a
MessageQueueFactoryObject
defined in the Spring
container.
DefaultMessageQueueFactory
leverages Spring's
local thread storage support so it will work correctly in stand alone
and web applications.
You can use the DefaultMessageQueueFactory
independent of the rest of Spring's MSMQ support should you need only
the functionality it offers. MessageQueueTemplate
and
the listener containers create an instance of
DefaultMessageQueueFactory
by default. Should you
want to share the same instance across these two classes, or provide
your own custom implementation, use the property
MessageQueueFactory
on either
MessageQueueTemplate
or the message listener
classe.s
One of the most common uses of MSMQ is to concurrently process
messages delivered asynchronously. This support is provided in Spring by
message listener containers. A message listener container is the
intermediary between an IMessageListener
and a
MessageQueue
. (Note, message listener containers are
conceptually different than Spring's Inversion of Control container,
though it integrates and leverages the IoC container.) The message
listener container takes care of registering to receive messages,
participating in transactions, resource acquisition and release,
exception conversion and suchlike. This allows you as an application
developer to write the (possibly complex) business logic associated with
receiving a message (and possibly responding to it), and delegate
boilerplate MSMQ infrastructure concerns to the framework.
A subclass of AbstractMessageListenerContainer
is used to receive messages from a MessageQueue
.
Which subclass you pick depends on your transaction processing
requirements. The following subclasses are available in the namespace
Spring.Messaging.Listener
NonTransactionalMessageListenerContainer
-
does not surround the receive operation with a transaction
TransactionalMessageListenerContainer
-
surrounds the receive operation with local (non-DTC) based
transaction(s).
DistributedTxMessageListenerContainer
-
surrounds the receive operation with a distributed (DTC)
transaction
Each of these containers use an implementation in which is based
on Peeking for messages on a MessageQueue
. Peeking is
the only resource efficient approach that can be used in order to have
MessageQueue
receipt in conjunction with
transactions, either local MSMQ transactions, local ADO.NET based
transactions, or DTC transactions. Each container can specify the number
of threads that will be created for processing messages after the Peek
occurs via the property MaxConcurrentListeners
. Each
processing thread will continue to listen for messages up until the
timeout value specified by ListenerTimeLimit
or until
there are no more messages on the queue (whichever comes first). The
default value of ListenerTimeLimit
is
TimeSpan.Zero
, meaning that only one attempt to
receive a message from the queue will be performed by each listener
thread. The current implementation uses the standard .NET thread pool.
Future implementations will use a custom (and pluggable) thread
pool.
This container performs a Receive operation on the
MessageQueue
without any transactional settings. As
such messages will not be redelivered if an exception is thrown during
message processing. Exceptions during message processing can be
handled via an implementation of the interface
IExceptionHandler
. This can be set via the property
ExceptionHandler
on the listener. The
IExceptionHandler
interface is shown below
public interface IExceptionHandler { void OnException(Exception exception, Message message); }
An example of configuring a
NonTransactionalMessageListenerContainer
with an
IExceptionHandler
is shown below
<!-- Queue to receive from --> <object id='msmqTestQueue' type='Spring.Messaging.Support.MessageQueueFactoryObject, Spring.Messaging'> <property name='Path' value='.\Private$\testqueue'/> <property name='MessageReadPropertyFilterSetAll' value='true'/> <property name='ProductTemplate'> <object> <property name='Label' value='MyTestQueueLabel'/> </object> </property> </object> <!-- Queue to respond to --> <object id='msmqTestResponseQueue' type='Spring.Messaging.Support.MessageQueueFactoryObject, Spring.Messaging'> <property name='Path' value='.\Private$\testresponsequeue'/> <property name='MessageReadPropertyFilterSetAll' value='true'/> <property name='ProductTemplate'> <object> <property name='Label' value='MyTestResponseQueueLabel'/> </object> </property> </object> <!-- Listener container --> <object id="nonTransactionalMessageListenerContainer" type="Spring.Messaging.Listener.NonTransactionalMessageListenerContainer, Spring.Messaging"> <property name="MessageQueueObjectName" value="msmqTestQueue"/> <property name="MaxConcurrentListeners" value="2"/> <property name="ListenerTimeLimit" value="20s"/> <!-- 20 seconds --> <property name="MessageListener" ref="messageListenerAdapter"/> <property name="ExceptionHandler" ref="exceptionHandler"/> </object> <!-- Delegate to plain CLR object for message handling --> <object id="messageListenerAdapter" type="Spring.Messaging.Listener.MessageListenerAdapter, Spring.Messaging"> <property name="DefaultResponseQueueName" value="msmqTestResponseQueue"/> <property name="HandlerObject" ref="simpleHandler"/> </object> <!-- Classes you need to write --> <object id="simpleHandler" type="MyNamespace.SimpleHandler, MyAssembly"/> <object id="exceptionHandler" type="MyNamespace.SimpleExceptionHandler, MyAssembly"/>
The SimpleHandler class would look something like this
public class SimpleHandler : ISimpleHandler { public void HandleObject(string txt) { // perform message processing... Console.WriteLine("Received text: " + txt); } }
This message listener container performs receive operations
within the context of local transaction. This class requires an
instance of Spring's IPlatformTransactionManager
,
either AdoPlatformTransactionManager
,
HibernateTransactionManager
, or
MessageQueueTransactionManager
.
If you specify a
MessageQueueTransactionManager
then a
MessageQueueTransaction
will be started before
receiving the message and used as part of the container's receive
operation. As with other
IPlatformTransactionManager
implementation's, the
transactional resources (in this case an instance of the
MessageQueueTransaction
class) is bound to thread
local storage. MessageQueueTemplate
will look in
thread-local storage and use this 'ambient' transaction if found for
its send and receive operations. The message listener is invoked and
if no exception occurs, then the
MessageQueueTransactionManager
will commit the
MessageQueueTransaction
.
The message listener implementation can call into service layer
classes that are made transactional using standard Spring declarative
transactional techniques. In case of exceptions in the service layer,
the database operation will be rolled back (nothing new here), and the
TransactionalMessageListenerContainer
will call
it's IMessageTransactionExceptionHandler
implementation to determine if the
MessageQueueTransaction
should commit (removing the
message from the queue) or rollback (leaving the message on the queue
for redelivery).
Note | |
---|---|
The use of a transactional service layer in combination with
a The additional programming logic needed to achieve this is
to keep track of the |
An example of configuring the
TransactionalMessageListenerContainer
using a
MessageQueueTransactionManager
is shown
below
<!-- Queue to receive from --> <object id='msmqTestQueue' type='Spring.Messaging.Support.MessageQueueFactoryObject, Spring.Messaging'> <property name='Path' value='.\Private$\testqueue'/> <property name='MessageReadPropertyFilterSetAll' value='true'/> <property name='ProductTemplate'> <object> <property name='Label' value='MyTestQueueLabel'/> </object> </property> </object> <!-- Queue to respond to --> <object id='msmqTestResponseQueue' type='Spring.Messaging.Support.MessageQueueFactoryObject, Spring.Messaging'> <property name='Path' value='.\Private$\testresponsequeue'/> <property name='MessageReadPropertyFilterSetAll' value='true'/> <property name='ProductTemplate'> <object> <property name='Label' value='MyTestResponseQueueLabel'/> </object> </property> </object> <!-- Transaction Manager for MSMQ Messaging --> <object id="messageQueueTransactionManager" type="Spring.Messaging.Core.MessageQueueTransactionManager, Spring.Messaging"/> <!-- The transaction message listener container --> <object id="transactionalMessageListenerContainer" type="Spring.Messaging.Listener.TransactionalMessageListenerContainer, Spring.Messaging"> <property name="MessageQueueObjectName" value="msmqTestQueue"/> <property name="PlatformTransactionManager" ref="messageQueueTransactionManager"/> <property name="MaxConcurrentListeners" value="5"/> <property name="ListenerTimeLimit" value="20s"/> <property name="MessageListener" ref="messageListenerAdapter"/> <property name="MessageTransactionExceptionHandler" ref="messageTransactionExceptionHandler"/> </object> <!-- Delegate to plain CLR object for message handling --> <object id="messageListenerAdapter" type="Spring.Messaging.Listener.MessageListenerAdapter, Spring.Messaging"> <property name="DefaultResponseQueueName" value="msmqTestResponseQueue"/> <property name="HandlerObject" ref="simpleHandler"/> </object> <!-- Poison message handling --> <object id="messageTransactionExceptionHandler" type="Spring.Messaging.Listener.SendToQueueExceptionHandler, Spring.Messaging"> <property name="MaxRetry" value="5"/> <property name="MessageQueueObjectName" value="testTxErrorQueue"/> </object> <!-- Classes you need to write --> <object id="simpleHandler" type="MyNamespace.SimpleHandler, MyAssembly"/>
If you specify either
AdoPlatformTransactionManager
or
HibernateTransactionManager
then a local database
transaction will be started before the receiving the message. By
default, the container will also start a local
MessageQueueTransaction
after the local database
transaction has started, but before the receiving the message. This
MessageQueueTransaction
will be used to receive the
message. By default the MessageQueueTransaction
will be bound to thread local storage so that any
MessageQueueTemplate
send or receive operations
will participate transparently in the same
MessageQueueTransaction
. If you do not want this
behavior set the property
ExposeContainerManagedMessageQueueTransaction
to
false.
In case of exceptions during IMessageListener
processing when using either either
AdoPlatformTransactionManager
or
HibernateTransactionManager
the container's
IMessageTransactionExceptionHandler
will determine
if the MessageQueueTransaction
should commit
(removing it from the queue) or rollback (placing it back on the queue
for redelivery). The listener exception will always trigger a rollback
in the 'outer' database transaction.
Poison message handing, that is, the endless redelivery of a
message due to exceptions during processing, can be detected using
implementations of the
IMessageTransactionExceptionHandler
. This interface
is shown below
public interface IMessageTransactionExceptionHandler { TransactionAction OnException(Exception exception, Message message, MessageQueueTransaction messageQueueTransaction); }
The return value is an enumeration with the values
Commit
and Rollback
. A specific
implementation is provided that will move the poison message to
another queue after a maximum number of redelivery attempts. See
SendToQueueExceptionHandler
described below. You
can set a specific implementation to by setting
TransactionalMessageListenerContainer's
property
MessageTransactionExceptionHandler
The IMessageTransactionExceptionHandler
implementation SendToQueueExceptionHandler
keeps
track of the Message's Id
property in memory with a
count of how many times an exception has occurred. If that count is
greater than the handler's MaxRetry
count it will
be sent to another queue using the provided
MessageQueueTransaction
. The queue to send the
message to is specified via the property
MessageQueueObjectName
.
This message listener container performs receive operations within the context of distributed transaction. A distributed transaction is started before a message is received. The receive operation participates in this transaction using by specifying MessageQueueTransactionType = Automatic. The transaction that is started is automatically promoted to two-phase-commit to avoid the default behavior of transaction promotion since the only reason to use this container is to use two different resource managers (messaging and database typically).
The commit and rollback semantics are simple, if the message listener does not throw an exception the transaction is committed, otherwise it is rolled back.
Exceptions in message listener processing are handled by
implementations of the
IDistributedTransactionExceptionHandler
interface.
This interface is shown below
public interface IDistributedTransactionExceptionHandler { bool IsPoisonMessage(Message message); void HandlePoisonMessage(Message poisonMessage); void OnException(Exception exception, Message message); }
the IsPoisonMessage
method determines whether
the incoming message is a poison message. This method is called before
the IMessageListener
is invoked. The container will
call HandlePoisonMessage
is
IsPoisonMessage
returns true and will then commit
the distributed transaction (removing the message from the queue.
Typical implementations of HandlePoisonMessage
will
move the poison message to another queue (under the same distributed
transaction used to receive the message). The class
SendToQueueDistributedTransactionExceptionHandler
detects poison messages by tracking the Message Id
property in memory with a count of how many times an exception has
occurred. If that count is greater than the handler's
MaxRetry
count it will be sent to another queue.
The queue to send the message to is specified via the property
MessageQueueObjectName
.
In order to facilitate the sending of business model objects, the
MessageQueueTemplate
has various send methods that
take a .NET object as an argument for a message's data content. The
overloaded methods ConvertAndSend and ReceiveAndConvert in
MessageQueue
delegate the conversion process to an
instance of the IMessageConverter
interface. This
interface defines a simple contract to convert between .NET objects and
JMS messages. The interface is shown below
public interface IMessageConverter : ICloneable { Message ToMessage(object obj); object FromMessage(Message message); }
There are a standard implementations provided the simply wrap
existing IMessageFormatter
implementations.
XmlMessageConverter
- uses a
XmlMessageFormatter.
BinaryMessageConverter
- uses a
BinaryMessageFormatter
ActiveXMessageConverter
- uses a
ActiveXMessageFormatter
The default implementation used in
MessageQueueTemplate
and the message listener
containers is an instance of XmlMessageConverter configured with a
TargetType to be System.String. You specify the types that the
XmlMessageConverter can convert though either the array property
TargetTypes
or TargetTypeNames
.
Here is an example taken from the QuickStart application
<object id="xmlMessageConverter" singleton="false" type="Spring.Messaging.Support.Converters.XmlMessageConverter, Spring.Messaging"> <property name="TargetTypes"> <list> <value>Spring.MsmqQuickStart.Common.Data.TradeRequest, Spring.MsmqQuickStart.Common</value> <value>Spring.MsmqQuickStart.Common.Data.TradeResponse, Spring.MsmqQuickStart.Common</value> <value>System.String, mscorlib</value> </list> </property> </object>
You can specify other IMessageConverter
implementations using the MessageConverterObjectName
property on the MessageQueueTemplate
and
MessageListenerAdapter
.
Note | |
---|---|
The scope of the object definition is set to singleton="false", meaning that a new instance of the MessageConverter will be created each time you ask the container for an object of the name 'xmlMessageConverter'. This is important to ensure that a new instance will be used for each thread. If you forget, a warning will be logged and IMessageConverter's Clone() method will be called to create an indepentend instance. |
Other implementations provided are
XmlDocumentConverter
- loads and saves an
XmlDocument to the message BodyStream. This lets you manipulate
directly the XML data independent of type serialization issues. This
is quite useful if you use XPath expressions to pick out the
relevant information to construct your business objects.
Other potential implementations:
RawBytesMessageConverter - directly write raw bytes to the message stream, compress
CompressedMessageConverter - compresses the message payload
EncryptedMessageConverter - encrypt the message (standard MSMQ encryptiong has several limitations)
SoapMessageConverter - use soap formatting.
The MessageListenerAdapter
allows methods of
a class that does not implement the
IMessageListener
interface to be invoked upon
message delivery. Lets call this class the 'message handler' class. To
achieve this goal the MessageListenerAdapter
implements the standard IMessageListener
interface
to receive a message and then delegates the processing to the message
handler class. Since the message handler class does not contain
methods that refer to MSMQ artifacts such as Message, the
MessageListenerAdapter
uses a
IMessageConverter
to bridge the MSMQ and 'plain
object' worlds. As a reminder, the default
XmlMessageConverter
used in
MessageQueueTemplate
and the message listener
containers converts from Message to string. Once the incoming message
is converted to an object (string for example) a method with the name
'HandleMessage' is invoked via reflection passing in the string as an
argument.
Using the default configuration of XmlMessageConverter in the message listeners, a simple string based message handler would look like this.
public class MyHandler { public void HandleMessage(string text) { ... } }
The next example has a similar method signature but the name of the handler method name has been changed to "DoWork", by setting the adapter's property DefaultHandlerMethod.
public interface IMyHandler { void DoWork(string text); }
If your IMessageConverter implementation will return multiple object types, overloading the handler method is perfectly acceptable, the most specific matching method will be used. A method with an object signature would be consider a 'catch-all' method of last resort.
public interface IMyHandler { void DoWork(string text); void DoWork(OrderRequest orderRequest); void DoWork(InvoiceRequest invoiceRequest); void DoWork(object obj); }
Another of the capabilities of the
MessageListenerAdapter
class is the ability to
automatically send back a response Message
if a
handler method returns a non-void value. Any non-null value that is
returned from the execution of the handler method will (in the default
configuration) be converted to a string. The resulting string will
then be sent to the ResponseQueue
defined in the
Message's ResponseQueue
property of the original
Message, or the DefaultResponseQueueName
on the
MessageListenerAdapter
(if one has been configured)
will be used. If not ResponseQueue
is found then an
Spring MessagingException
will be thrown. Please
note that this exception will not be swallowed and will propagate up
the call stack.
Here is an example of Handler signatures that have various return types.
public interface IMyHandler { string DoWork(string text); OrderResponse DoWork(OrderRequest orderRequest); InvoiceResponse DoWork(InvoiceRequest invoiceRequest); void DoWork(object obj); }
The following configuration shows how to hook up the adapter to process incoming MSMQ messages using the default message converter.
<!-- Delegate to plain CLR object for message handling --> <object id="messageListenerAdapter" type="Spring.Messaging.Listener.MessageListenerAdapter, Spring.Messaging"> <property name="DefaultResponseQueueName" value="msmqTestResponseQueue"/> <property name="HandlerObject" ref="myHandler"/> </object>
The goals of Spring's MSMQ messaging support are quite similar to those of WCF with its MSMQ related bindings, in as much as a WCF service contract is a POCO (minus the attributes if you really picky about what you call a POCO). Spring's messaging support can give you the programming convenience of dealing with POCO contracts for message receiving but does not (at the moment) provide a similar POCO contract for sending, instead relying on explicit use of the MessageQueueTemplate class. This feature exists - some question whether it should for messaging - in the Java version of the Spring framework, see JmsInvokerServiceExporter and JmsInvokerProxyFactoryBean.
The good news is that if and when it comes time to move from a Spring MSMQ solution to WCF, you will be in a great position as the POCO interface used for business processing when receiving in a Spring based MSMQ application can easily be adapted to a WCF environment. There may also be some features unique to MSMQ and/or Spring's MSMQ support that you may find appealing over WCF. Many messaging applications still need to be 'closer to the metal' and this is not possible using the WCF bindings, for example Peeking and Label, AppSpecific properties, multicast.. An interesting recent quote by Yoel Arnon (MSMQ guru) "With all the respect to WCF, System.Messaging is still the major programming model for MSMQ programmers, and is probably going to remain significant for the foreseeable future. The message-oriented programming model is different from the service-oriented model of WCF, and many real-world solutions would always prefer it."