Saturday, May 2, 2015

Shared Topic Subscription in WSO2 Message Broker 3.0.0

This post mainly focus on how configure shared topic subscription in WSO2 MB 3.0.0 and discuss use case with WSO2 ESB cluster as JMS consumer and publisher.

Enable shared topic subscription

  1. Extract wso2mb-3.0.0-SNAPSHOT zip archive
  2. Configure it with preferable message store (ex: MySQL, MSSQL, Oracle, Cassandra) - By default it’s distribute with H2. You can play with default message store. But we do not recommend it for clustering and production setup. [1]
  3. Open wso2mb-3.0.0/repository/conf/broker.xml
  4. Set <allowSharedTopicSubscriptions> to true under <amqp enabled="true"> in <transports>.

Now you’ll be able to add multiple durable subscription with same subscription id. Please refer how to write JMS client samples [2].

How shared topic subscription works?

mb_3_0_0_shared_subscription.png


  1. Subscriber 1 create durable subscription to myDurableTopic with id : subId-x
  2. Subscriber 2 create durable subscription to myDurableTopic with id : subId-x
  3. Subscriber 3 create durable subscription to myDurableTopic with id : subid-y
  4. Publisher send messages to myDurableTopic
  5. Message delivery as below among durable subscribers:
    1. Subscriber 1 and Subscriber 2 with id : subId-x get copy of each message in round robin order
    2. Subscriber 3 with id : subId-y get each copy of each message

Rationale of message delivery of shared subscription is, If there are multiple subscribers with same subscription id for particular durable topic, then message delivered among them in round robin order.

Use Case - WSO2 MB shared subscription with WSO2 ESB cluster

mb_3_0_0_shared_subscription_use_case_ESB.png


Setting up WSO2 MB 3.0.0

Default H2 based message store use as it is.
Shared subscription enabled in wso2mb-3.0.0-SNAPSHOT/repository/conf/broker.xml as below

        <amqp enabled="true">
            <!-- most of the AMQP configurations reside in qpid-config.xml since we inherit the Qpid
            messaging model during AMQP.-->
            <port>5672</port>
            <sslPort>8672</sslPort>
            <sendExpiredMessagesToDLC>false</sendExpiredMessagesToDLC>
            <maximumRedeliveryAttempts>10</maximumRedeliveryAttempts>
            <allowSharedTopicSubscriptions>true</allowSharedTopicSubscriptions>
        </amqp>

Go to wso2mb-3.0.0-SNAPSHOT/bin and start broker wso2server.sh start
You can tail logs in wso2mb-3.0.0-SNAPSHOT/repository/logs/wso2carbon.log

Setting up backend server - SimpleStockQuoteService

Go to wso2esb-4.8.1/samples/axis2Server/src/SimpleStockQuoteService
Run ant command and it will build SimpleStockQuoteService axis2 service
Start SimpleStockQuoteService:
  • Go to wso2esb-4.8.1/samples/axis2Server/ and run ./axis2server.sh

Setting up WSO2 ESB 4.8.1

This guide shows you to setting up one ESB node. Please refer [3] to configure ESB cluster.
Offset changed to 1 in wso2esb-4.8.1/repository/conf/carbon.xml

        <!-- Ports offset. This entry will set the value of the ports defined below to
         the define value + Offset.
         e.g. Offset=2 and HTTPS port=9443 will set the effective HTTPS port to 9445
         -->
        <Offset>1</Offset>

JMS transport enabled in wso2esb-4.8.1/repository/conf/axis2/axis2.xml as below

    <!--Uncomment this and configure as appropriate for JMS transport support with WSO2 MB 2.x.x -->
    <transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
        <parameter name="myTopicConnectionFactory" locked="false">
           <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
            <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>
            <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
            <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>
        </parameter>

        <parameter name="myQueueConnectionFactory" locked="false">
            <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
            <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>
            <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
           <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
        </parameter>

        <parameter name="default" locked="false">
            <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
            <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>
            <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
            <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
        </parameter>
    </transportReceiver>

   <!-- uncomment this and configure to use connection pools for sending messages-->
     <transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"/>

JNDI.properties configured to connection lookup in wso2esb-4.8.1/repository/conf/jndi.properties

# register some connection factories
# connectionfactory.[jndiname] = [ConnectionURL]
connectionfactory.QueueConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'
connectionfactory.TopicConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'

Copy below client libraries from wso2mb-3.0.0-SNAPSHOT/client-lib/ to wso2esb-4.8.1/repository/components/lib

andes-client-3.0.0-SNAPSHOT.jar
geronimo-jms_1.1_spec-1.1.0.wso2v1.jar

Go to wso2esb-4.8.1/bin and start service bus wso2server.sh start
You can tail logs in wso2esb-4.8.1/repository/logs/wso2carbon.log


JMS consumer and publisher proxies

Login to ESB management console and create proxy services as below:

StockDataPersistProxy

<proxy xmlns="http://ws.apache.org/ns/synapse"
      name="StockDataPersistProxy"
      transports="http"
      statistics="disable"
      trace="disable"
      startOnLoad="true">
  <target>
     <inSequence>
        <property name="OUT_ONLY" value="true"/>
        <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
        <log level="custom">
           <property name="STATE" value="Stock data publishing..."/>
        </log>
        <send>
           <endpoint>
              <address uri="jms:/stockDataTopic?&amp;java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&amp;java.naming.provider.url=repository/conf/jndi.properties&amp;transport.jms.ConnectionFactoryJNDIName=TopicConnectionFactory&amp;transport.jms.DestinationType=topic"/>
           </endpoint>
        </send>
     </inSequence>
     <outSequence/>
  </target>
  <description/>
</proxy>


StockDataDeliveryProxy

<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
      name="StockDataDeliveryProxy"
      transports="jms"
      statistics="disable"
      trace="disable"
      startOnLoad="true">
  <target>
     <inSequence>
        <property name="OUT_ONLY" value="true"/>
        <log level="custom">
           <property name="STATE" value="Stock data delivering..."/>
        </log>
        <send>
           <endpoint>
              <address uri="http://localhost:9000/services/SimpleStockQuoteService"/>
           </endpoint>
        </send>
     </inSequence>
     <outSequence>
        <send/>
     </outSequence>
  </target>
  <parameter name="transport.jms.ContentType">
     <rules>
        <jmsProperty>contentType</jmsProperty>
        <default>text/xml</default>
     </rules>
  </parameter>
  <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter>
  <parameter name="transport.jms.DestinationType">topic</parameter>
  <parameter name="transport.jms.SubscriptionDurable">true</parameter>
  <parameter name="transport.jms.Destination">stockDataTopic</parameter>
  <parameter name="transport.jms.DurableSubscriberName">subId-x</parameter>
  <parameter name="transport.jms.CacheLevel">consumer</parameter>
  <parameter name="transport.jms.DurableSubscriberClientID">subId-x</parameter>
  <description/>
</proxy>

Test the setup

SoapUI used to call stockDataPersistProxy and send request to SimpleStockQuoteService

Open SoapUI and create New SOAP Project with WSDL URL of StockDataPersistProxy.
Call StockDataPersistProxy with below SOAP envelop and you can monitor logs in wso2esb console and axis2 servers.

<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="http://services.samples">
  <soapenv:Header/>
  <soapenv:Body>
     <ser:getSimpleQuote>
        <ser:symbol>IBM</ser:symbol>
     </ser:getSimpleQuote>
  </soapenv:Body>
</soapenv:Envelope>