Tuesday, January 29, 2013


Integrating WSO2 ESB and WSO2 Message Broker

We can integrate wso2 enterprise service bus and the wso2 message broker to satisfy the
message brokering needs of wso2 esb. Basically wso2 esb can use wso2 mb to implement
store and forward messaging pattern.

Using ESB for implementing store and forward message pattern can be done in two ways;

1. Using JMS endpoints and JMS proxy services
2. Using message stores and processors

Before implementing both of the above, we need to set up wso2 ESB and wso2 MB in our
deployment environment. When we setting up multiple wso2 products in the same host
it is not possible to start up all those products with their default configuration at the same time,
since with default configuration they are trying to use the same ports. Eg : HTTP, HTTPS..
So when setting up multiple wso2 products in the same machine we need to configure them to
run in different ports. We can easily configure it by changing the port offset value.

In this guide we are setting up WSO2 Message Broker with port offset 1 and WSO2 Enterprise
Service Bus with port offset 0.

Setting up WSO2 Message Broker

  • Download the MB 2.0.1 binary from following URL http://wso2.com/products/message-
  • broker/Unzip wso2mb-2.0.1.zip pack. We consider this unzipped folder as MB_HOME.
  • To configure the port offset, open <MB_HOME>/repository/conf/carbon.xml file and change the offset value to 1.

<Ports>
<!-- Ports offset. This entry will set the value of the ports defined below to
the define value + Offset.
e.g. Offset=2 and HTTPS port=9443 will set the effective HTTPS port to 9445
-->
<Offset>1</Offset>

In WSO2 Message Broker we are using cassandra server as the storage. We have
bundled the cassandra server with MB pack and by default MB uses that one. However
in production setups we recommend to use an external cassandra server. Users can find
information on setting up cassandra server externally from following URL.
http://docs.wso2.org/wiki/display/MB201/Installing+Apache+Cassandra


  • But here, we are going to use the embedded cassandra server. As we have change the

port offset by 1 in the above step , we will have to point MB to correct Cassandra port.

Open <MB_HOME>/repository/conf/advanced/qpid-virtualhosts.xml and edit it to the
correct port. In our case it will be 9161 ( Default 9160 + 1)

<store>
<class>org.wso2.andes.server.store.CassandraMessageStore</class>
<username>admin</username>
<password>admin</password>
<cluster>ClusterOne</cluster>
<idGenerator>org.wso2.andes.server.cluster.coordination.TimeStampBasedMessageId
Generator</idGenerator>
<connectionString>localhost:9161</connectionString>
</store>

Note:
If you are using an external Cassandra server, you need to point MB to it by editing the
value of the connection string above as required.


  • By default , the message batch size for browser subscriptions of MB 2.0.1 has

configured to a very low value like 2. We need to increase it to a large value by
changing configuration parameter. For that, you need to add the following parameter to
<MB_HOME>/repository/conf/advanced/qpid-config.xml

<messageBatchSizeForBrowserSubscriptions>100000</
messageBatchSizeForBrowserSubscriptions>

Note:- You can add the above parameter as a child of <broker> element in qpid-config.xml


  • Now navigate to <MB_HOME>/bin and run wso2server.sh file, and start WSO2 MB.


Setting up WSO2 ESB

Download the ESB 4.5.1 binary from following URL

  • http://wso2.com/products/enterprise-service-bus/
  • Unzip wso2esb-4.5.1.zip pack. We consider this unzipped folder as ESB_HOME.
  • We need to enable JMS transport of ESB to communicate with MB. For that you need to edit the axis2.xml file located in ESB_HOME/repository/conf/axis2 folder.


In this axis2.xml file , you can find a commented out transport receiver block for MB
2.x.x. You need to uncomment that block to integrate MB and ESB.

<!--Uncomment this and configure as appropriate for JMS transport support with WSO2
MB 2.x.x -->
<!--transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
…..
</parameter>
</transportReceiver-->

Apart from that you need to uncomment transport sender block for JMS also. It is in the
same axis2.xml file

<!-- uncomment this and configure to use connection pools for sending messages>
<transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"/--
>


  • Then copy required client libraries from MB to ESB. For that copy the following two jars

(from <MB_HOME>/clent-lib folder) to <ESB_HOME>/repository/components/lib folder

-andes-client-0.13.wso2v4
-geronimo-jms_1.1_spec-1.1.0.wso2v1

Open JNDI.properties file in <ESB_HOME>/repository/conf folder and point to the MB
broker being run. Also use ‘carbon’ as the virtualhost. Define a queue called ‘JMSMS’
there. No need of a topic here, so comment it out. But in order to avoid
getting ‘javax.naming.NameNotFoundException: TopicConnectionFactory’ during server
startup, point TopicConnectionFactory also to MB.


  • # register some connection factories

# connectionfactory.[jndiname] = [ConnectionURL]
connectionfactory.QueueConnectionFactory = amqp://admin:admin@clientID/carbon?
brokerlist='tcp://localhost:5673'
connectionfactory.TopicConnectionFactory = amqp://admin:admin@clientID/carbon?
brokerlist='tcp://localhost:5673'

# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
queue.JMSMS=JMSMS
queue.StockQuotesQueue=StockQuotesQueue


  • Start WSO2 ESB by running <ESB_HOME>/bin/wso2server.sh file.


Now we have completed the setup of ESB. However we need to have some back-end services
to be available for testing the above mentioned scenarios. For that we need to run an ant task
for deploying the “SimpleStockQuoteService” in the simple axis2server we have.

Setting up back-end web service

Navigate to <ESB_HOME>/samples/axis2Server/src/SimpleStockQuoteService
and run ‘ant’ to build.
Come back to <ESB_HOME>/samples/axis2server folder and run
axis2Server.sh file.
Point your browser to http://127.0.0.1:9000/services/SimpleStockQuoteService?
wsdl and verify that the service is running.


  • Using JMS endpoints and JMS proxies


As we have explained before, we can integrate wso2 esb and wso2 mb with two ways. Here we
are explaining on how we can integrate them as JMS endpoints.

Use case :
Store a message received to a http proxy of esb in a jms queue. Consume that queue and get
the message and send to the actual endpoint.
We need to create the consumer first to make a subscription for the queue that we are going to
use in this scenario. For that we need to have a JMS proxy with the queue name we defined in
the jndi.properties file above.

There are multiple ways to create proxy services, sequences, endpoints, message stores and
processors etc.. in ESB management console. You can either use graphical wizards or just
copy and paste the following XML configurations into the source view. You can find the source
view under “Manage → Service Bus → Source View” in the left navigation pane in WSO2 ESB
management console.


  • Create a JMS Proxy with the name “StockQuotesQueue”. Synapse configuration for the

proxy would be as follows.

<proxy xmlns="http://ws.apache.org/ns/synapse" name="StockQuotesQueue"
transports="jms" statistics="disable" trace="disable" startOnLoad="true">
<target>
<inSequence>
<log level="full"/>
<send>
<endpoint>
<address uri="http://localhost:9000/services/SimpleStockQuoteService"/>
</endpoint>
</send>
</inSequence>
</target>
<description></description>
</proxy>

Here we are logging the consumed message from the JMS queue “StockQuotesQueue”
and send it to the endpoint SimpleStockQuoteService.


  • Create the HTTP proxy to send message to the JMS Queue. Synapse configuration for this http proxy is as follows. Since this is a one way message we add the property “OUT_ONLY” true and we need to have “FORCE_SC_ACCEPTED” property to send a 202 response to the client who invoked this proxy.


<proxy xmlns="http://ws.apache.org/ns/synapse" name="StockQuoteProxy"
transports="https,http" statistics="disable" trace="disable" startOnLoad="true">
<target>
<inSequence>
<property name="OUT_ONLY" value="true"/>
<property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
<send>
<endpoint>

<address uri="jms:/StockQuotesQueue?
transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.fact

ory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&java.naming.provider.
url=repository/conf/jndi.properties&transport.jms.DestinationType=queue"/>
</endpoint>
</send>
</inSequence>
</target>
<description></description>
</proxy>

Note:- Due to a bug in the ESB source view, you need to replace ‘&’ character of the
above JMS endpoint url by ‘&amp;’

Now we have completed the message flow paths. When the “StockQuoteProxy” is invoked,
it will send the message to the queue. Then that queue will be consumed by the JMS proxy
“StockQuotesQueue” and send it to the actual end point.

Testing


  • With soap UI send the following soap message to the StockQuoteProxy. The endpoint URL of the StockQuoteProxy can be found out in the service dashboard of StockQuoteProxy service. By clicking on “Manage → Services → List” in the left navigation pane, you can access the “Deployed Services” page which shows all proxy services deployed in ESB. You can access the service dashboard of StockQuoteProxy by clicking on the service name. In there, you will find the endpoints of proxy service.

e.g:- http://localhost:8280/services/StockQuoteProxy

<soap:Envelope xmlns:soap="http://www.w3.org/2003/05/soap-envelope"
xmlns:ser="http://services.samples" xmlns:xsd="http://services.samples/xsd">
<soap:Header/>
<soap:Body>
<ser:placeOrder>
<!--Optional:-->
<ser:order>
<!--Optional:-->
<xsd:price>10</xsd:price>
<!--Optional:-->
<xsd:quantity>100</xsd:quantity>
<!--Optional:-->
<xsd:symbol>IBM</xsd:symbol>
</ser:order>
</ser:placeOrder>
</soap:Body>

</soap:Envelope>

As the out put, this message will be logged at ESB console and SimpleAxis2Server’s log will be
as follows;

Tue Jan 15 15:31:28 IST 2013 samples.services.SimpleStockQuoteService :: Accepted order
#2 for : 100 stocks of IBM at $ 10.0

Using message stores and processors

Define a message Store by copy-pasting following config to the source view
(Alternatively you can use message-store UI).

<messageStore name="JMSMS"
class="org.wso2.carbon.message.store.persistence.jms.JMSMessageStore"
xmlns="http://ws.apache.org/ns/synapse">

<parameter
name="java.naming.factory.initial">org.wso2.andes.jndi.PropertiesFileInitialContextF
actory</parameter>
<parameter name="java.naming.provider.url">repository/conf/jndi.properties</
parameter>
<parameter name="store.jms.destination">JMSMS</parameter>
<parameter name="store.jms.JMSSpecVersion">1.1</parameter>
<parameter name="store.jms.cache.connection">false</parameter>
</messageStore>


  • Define an end-point to send the message. In this example we use above backend we just set-up.


<endpoint name="SimpleStockQuoteService">
<address uri="http://127.0.0.1:9000/services/SimpleStockQuoteService"/>
</endpoint>


  • Define a message forwarding processor as below (by copy-pasting or using the Management Console UI).


<messageProcessor
class="org.apache.synapse.message.processors.forward.ScheduledMessageForwardin
gProcessor"
name="Processor1"

messageStore="JMSMS">
<parameter name="max.delivery.attempts">4</parameter>
<parameter name="interval">4000</parameter>
</messageProcessor>


  • Now to test the scenario use a proxy service like below.


<proxy name="InOnlyProxy"
transports="https http"
startOnLoad="true"
trace="disable">
<target>
<inSequence>
<property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
<property name="OUT_ONLY" value="true"/>
<property name="target.endpoint" value="SimpleStockQuoteService"/>
<log level="full"/>
<store messageStore="JMSMS"/>
</inSequence>
</target>
</proxy>

Note the highlighted parts, which are related.
Now whenever a message comes to this proxy service, it will be stored in JMS message store
(which is in MB, JMSMS queue. If you disable message processor and send messages you will
notice in MB management console message count of JMSMS queue being increased.)
This is “In Only” service invocation with Message Forwarding Processor.

Similarly, you can follow the following artile to implement the others.

http://wso2.org/library/articles/2011/12/implementing-store-forward-messaging-patterns-wso2esb-part-2

Testing


  • With soap UI send the following soap message to the InOnlyProxy.


<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:ser="http://services.samples" xmlns:xsd="http://services.samples/xsd">
<soapenv:Header/>

<soapenv:Body>
<ser:getQuote>
<!--Optional:-->
<ser:request>
<!--Optional:-->
<xsd:symbol>IBM</xsd:symbol>
</ser:request>
</ser:getQuote>
</soapenv:Body>
</soapenv:Envelope>


  • Message will be logged at ESB console, SOAP UI will get 202 Accepted message, and at axis2Server console following will be logged. 
                samples.services.SimpleStockQuoteService :: Generating quote for : IBM

Adapt to Your Environment


  • replace

<endpoint name="SimpleStockQuoteService">
<address uri="http://test"/>
</endpoint>
and use a suitable name.

So using both of the above ways , It is possible to integrate wso2 Message Broker and wso2
Enterprise Service Bus.

1 comment:

  1. Hi,

    I followed the steps but my esb starts at port 8243

    It throws error "Can not create the event broker "

    I had posted problem here also but I couldnt make it work

    http://buddhimawijeweera.wordpress.com/2012/10/27/connect-wso2-esb-with-wso2-mb/

    please help me in this regard.

    Thanks and Regards,
    Naveen

    ReplyDelete