Wednesday, November 16, 2016

Distributed Transaction (XATransaction) For JMS Trasnport- WSO2 ESB 5.0.0


Distributed Transaction (XATransaction) For JMS Trasnport- WSO2 ESB 5.0.0

Distributed transactions plays a major role in today's business world. It adds a very important functionality to your system, which you will only see in a system crash. Having good transactional support in your system will definitely save your business in a system crash.

Distributed JMS transaction

An external transaction manager manages the coordination of the transaction. Designing and using JMS distributed transactions is more complex than using local JMS transactions.

The transaction manager is the primary component of the distributed transaction infrastructure and distributed JMS transactions are managed by the XAResource enabled transaction manager in the J2EE application server. Also note that you need to check if your message broker supports XA transactions.

XA two-phase commit process

The two-phase commit consists of the following steps:
  • Immediately after the transaction begins, the transaction manager invokes start() on the JMS XA resource, which indicates that the resource should initialize a new transaction. The JMS XA resource now generates a new transaction ID and sends it over the network to the remote broker.
  • The JMS XA resource now forwards all of the operations that arise during a JMS session (for example, messages, acknowledgments, and so on) to the remote broker.
  • On the broker side, the received operations are not performed immediately. Because the operations are happening in a transaction context and the transaction is not yet committed, the broker buffers all of the operations in a transaction store (held in memory, initially). Messages held in the transaction store are not forwarded to JMS consumers.
  • In a two-phase commit process, the first phase of completing the transaction is where the transaction manager invokes prepare() on all of the participating XA resources. At this stage, the JMS XA resource sends the prepare() operation to the remote broker.
  • On the broker side, when the transaction store receives the prepare() operation, it writes all of the buffered operations to disk. Hence, after the prepare phase, there is no longer any risk of losing data associated with this transaction branch.
  • The second phase of completing the transaction is where the transaction manager invokes commit() on all of the participating XA resources. The JMS XA resource sends the commit() operation to the remote broker.
  • On the broker side, the transaction store marks this transaction as complete. The pending operations are now executed and any pending messages can now be forwarded to JMS consumers.
Please follow documentation to configure WSO2 esb with activemq browser.
XA transaction example.

ESB listen to message queue and send that message to multiple queues.  if something went wrong in sending the message to one of those queues, the original message should be rollbacked to Listening queue and none of the queues should receive the message. The entire transaction should be rollbacked.



The following example code shows the configuration of ESB which listen to a JMS queue and consume messages as well as to send messages to multiple JMS queues in transactional manner.

<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="JMSListenerProxy"
       transports="https http jms"
       startOnLoad="true">
   <description/>
   <target>
      <inSequence>
         <property name="OUT_ONLY" value="true"/>
         <log level="custom">
            <property name="MESSAGE_ID_A" expression="get-property('MessageID')"/>
         </log>
         <log level="custom">
            <property name="BEFORE" expression="$body"/>
         </log>
         <property name="MESSAGE_ID_B"
                   expression="get-property('MessageID')"
                   scope="operation"
                   type="STRING"/>
         <property name="failureResultProperty"
                   scope="default"
                   description="FailureResultProperty">
            <result xmlns="">failure</result>
         </property>
         <enrich>
            <source clone="true" xpath="$ctx:failureResultProperty"/>
            <target type="body"/>
         </enrich>
         <log level="custom">
            <property name="AFTER" expression="$body"/>
         </log>
         <property name="BEFORE1" value="ABCD" scope="axis2" type="STRING"/>
         <callout serviceURL="jms:/ActiveMQPublisher1?transport.jms.ConnectionFactoryJNDIName=XAConnectionFactory&amp;java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&amp;java.naming.provider.url=tcp://localhost:61616&amp;transport.jms.DestinationType=queue;transport.jms.TransactionCommand=begin">
            <source type="envelope"/>
            <target xmlns:s12="http://www.w3.org/2003/05/soap-envelope"
                    xmlns:s11="http://schemas.xmlsoap.org/soap/envelope/"
                    xpath="s11:Body/child::*[fn:position()=1] | s12:Body/child::*[fn:position()=1]"/>
         </callout>
         <callout serviceURL="jms:/ActiveMQPublisher2?transport.jms.ConnectionFactoryJNDIName=XAConnectionFactory&amp;java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&amp;java.naming.provider.url=tcp://localhost:61616&amp;transport.jms.DestinationType=queue">
            <source type="envelope"/>
            <target xmlns:s12="http://www.w3.org/2003/05/soap-envelope"
                    xmlns:s11="http://schemas.xmlsoap.org/soap/envelope/"
                    xpath="s11:Body/child::*[fn:position()=1] | s12:Body/child::*[fn:position()=1]"/>
         </callout>
         <callout serviceURL="jms:/ActiveMQPublisher3?transport.jms.ConnectionFactoryJNDIName=XAConnectionFactory&amp;java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&amp;java.naming.provider.url=tcp://localhost:61616&amp;transport.jms.DestinationType=queue;transport.jms.TransactionCommand=end">
            <source type="envelope"/>
            <target xmlns:s12="http://www.w3.org/2003/05/soap-envelope"
                    xmlns:s11="http://schemas.xmlsoap.org/soap/envelope/"
                    xpath="s11:Body/child::*[fn:position()=1] | s12:Body/child::*[fn:position()=1]"/>
         </callout>
         <drop/>
      </inSequence>
      <faultSequence>
         <log level="custom">
            <property name="Transaction Action" value="Rollbacked"/>
         </log>
         <callout serviceURL="jms:/ActiveMQPublisherFault?transport.jms.ConnectionFactoryJNDIName=XAConnectionFactory&amp;java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&amp;java.naming.provider.url=tcp://localhost:61616&amp;transport.jms.DestinationType=queue;transport.jms.TransactionCommand=rollback">
            <source type="envelope"/>
            <target xmlns:s12="http://www.w3.org/2003/05/soap-envelope"
                    xmlns:s11="http://schemas.xmlsoap.org/soap/envelope/"
                    xpath="s11:Body/child::*[fn:position()=1] | s12:Body/child::*[fn:position()=1]"/>
         </callout>
      </faultSequence>
   </target>
   <parameter name="transport.jms.ContentType">
      <rules>
         <jmsProperty>contentType</jmsProperty>
         <default>application/xml</default>
      </rules>
   </parameter>
   <parameter name="transport.jms.Destination">MyJMSQueue</parameter>
</proxy>

To place a message into a JMS queue, execute following command from /samples/axis2Client directory.

ant stockquote -Dmode=placeorder -Dtrpurl="jms:/MyJMSQueue?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://localhost:61616&transport.jms.ContentTypeProperty=Content-Type&transport.jms.DestinationType=queue"

You should be able to observe ESB consumes messages from MYJMSQueue and sends to multiple queues. To check rollback functionality provide unreachable host name to any destination queue and save the configurations. You should be able to observe ESB fault sequence getting invoked and failed message delivered to configured destination within fault sequence
Anyway,
  • The problem with XA is it can be a bit slow; as the XA protocol requires multiple syncs to disk to ensure it can always recover properly under every possible failure scenario. This adds significant cost (in terms of latency, performance, resources and complexity). Also quite a few EJB servers and databases don't actually properly support XA
  • So a good optimisation is to use regular JMS transactions - with no XA - and just perform some duplicate message detection in your code to check you have not already processed the message. (ref ActiveMQ)

No comments:

Post a Comment