Showing posts with label WSO2 Message Broker. Show all posts
Showing posts with label WSO2 Message Broker. Show all posts

Saturday, May 2, 2015

Shared Topic Subscription in WSO2 Message Broker 3.0.0

This post mainly focus on how configure shared topic subscription in WSO2 MB 3.0.0 and discuss use case with WSO2 ESB cluster as JMS consumer and publisher.

Enable shared topic subscription

  1. Extract wso2mb-3.0.0-SNAPSHOT zip archive
  2. Configure it with preferable message store (ex: MySQL, MSSQL, Oracle, Cassandra) - By default it’s distribute with H2. You can play with default message store. But we do not recommend it for clustering and production setup. [1]
  3. Open wso2mb-3.0.0/repository/conf/broker.xml
  4. Set <allowSharedTopicSubscriptions> to true under <amqp enabled="true"> in <transports>.

Now you’ll be able to add multiple durable subscription with same subscription id. Please refer how to write JMS client samples [2].

How shared topic subscription works?

mb_3_0_0_shared_subscription.png


  1. Subscriber 1 create durable subscription to myDurableTopic with id : subId-x
  2. Subscriber 2 create durable subscription to myDurableTopic with id : subId-x
  3. Subscriber 3 create durable subscription to myDurableTopic with id : subid-y
  4. Publisher send messages to myDurableTopic
  5. Message delivery as below among durable subscribers:
    1. Subscriber 1 and Subscriber 2 with id : subId-x get copy of each message in round robin order
    2. Subscriber 3 with id : subId-y get each copy of each message

Rationale of message delivery of shared subscription is, If there are multiple subscribers with same subscription id for particular durable topic, then message delivered among them in round robin order.

Use Case - WSO2 MB shared subscription with WSO2 ESB cluster

mb_3_0_0_shared_subscription_use_case_ESB.png


Setting up WSO2 MB 3.0.0

Default H2 based message store use as it is.
Shared subscription enabled in wso2mb-3.0.0-SNAPSHOT/repository/conf/broker.xml as below

        <amqp enabled="true">
            <!-- most of the AMQP configurations reside in qpid-config.xml since we inherit the Qpid
            messaging model during AMQP.-->
            <port>5672</port>
            <sslPort>8672</sslPort>
            <sendExpiredMessagesToDLC>false</sendExpiredMessagesToDLC>
            <maximumRedeliveryAttempts>10</maximumRedeliveryAttempts>
            <allowSharedTopicSubscriptions>true</allowSharedTopicSubscriptions>
        </amqp>

Go to wso2mb-3.0.0-SNAPSHOT/bin and start broker wso2server.sh start
You can tail logs in wso2mb-3.0.0-SNAPSHOT/repository/logs/wso2carbon.log

Setting up backend server - SimpleStockQuoteService

Go to wso2esb-4.8.1/samples/axis2Server/src/SimpleStockQuoteService
Run ant command and it will build SimpleStockQuoteService axis2 service
Start SimpleStockQuoteService:
  • Go to wso2esb-4.8.1/samples/axis2Server/ and run ./axis2server.sh

Setting up WSO2 ESB 4.8.1

This guide shows you to setting up one ESB node. Please refer [3] to configure ESB cluster.
Offset changed to 1 in wso2esb-4.8.1/repository/conf/carbon.xml

        <!-- Ports offset. This entry will set the value of the ports defined below to
         the define value + Offset.
         e.g. Offset=2 and HTTPS port=9443 will set the effective HTTPS port to 9445
         -->
        <Offset>1</Offset>

JMS transport enabled in wso2esb-4.8.1/repository/conf/axis2/axis2.xml as below

    <!--Uncomment this and configure as appropriate for JMS transport support with WSO2 MB 2.x.x -->
    <transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
        <parameter name="myTopicConnectionFactory" locked="false">
           <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
            <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>
            <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
            <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>
        </parameter>

        <parameter name="myQueueConnectionFactory" locked="false">
            <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
            <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>
            <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
           <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
        </parameter>

        <parameter name="default" locked="false">
            <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
            <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>
            <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
            <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
        </parameter>
    </transportReceiver>

   <!-- uncomment this and configure to use connection pools for sending messages-->
     <transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"/>

JNDI.properties configured to connection lookup in wso2esb-4.8.1/repository/conf/jndi.properties

# register some connection factories
# connectionfactory.[jndiname] = [ConnectionURL]
connectionfactory.QueueConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'
connectionfactory.TopicConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'

Copy below client libraries from wso2mb-3.0.0-SNAPSHOT/client-lib/ to wso2esb-4.8.1/repository/components/lib

andes-client-3.0.0-SNAPSHOT.jar
geronimo-jms_1.1_spec-1.1.0.wso2v1.jar

Go to wso2esb-4.8.1/bin and start service bus wso2server.sh start
You can tail logs in wso2esb-4.8.1/repository/logs/wso2carbon.log


JMS consumer and publisher proxies

Login to ESB management console and create proxy services as below:

StockDataPersistProxy

<proxy xmlns="http://ws.apache.org/ns/synapse"
      name="StockDataPersistProxy"
      transports="http"
      statistics="disable"
      trace="disable"
      startOnLoad="true">
  <target>
     <inSequence>
        <property name="OUT_ONLY" value="true"/>
        <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
        <log level="custom">
           <property name="STATE" value="Stock data publishing..."/>
        </log>
        <send>
           <endpoint>
              <address uri="jms:/stockDataTopic?&amp;java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&amp;java.naming.provider.url=repository/conf/jndi.properties&amp;transport.jms.ConnectionFactoryJNDIName=TopicConnectionFactory&amp;transport.jms.DestinationType=topic"/>
           </endpoint>
        </send>
     </inSequence>
     <outSequence/>
  </target>
  <description/>
</proxy>


StockDataDeliveryProxy

<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
      name="StockDataDeliveryProxy"
      transports="jms"
      statistics="disable"
      trace="disable"
      startOnLoad="true">
  <target>
     <inSequence>
        <property name="OUT_ONLY" value="true"/>
        <log level="custom">
           <property name="STATE" value="Stock data delivering..."/>
        </log>
        <send>
           <endpoint>
              <address uri="http://localhost:9000/services/SimpleStockQuoteService"/>
           </endpoint>
        </send>
     </inSequence>
     <outSequence>
        <send/>
     </outSequence>
  </target>
  <parameter name="transport.jms.ContentType">
     <rules>
        <jmsProperty>contentType</jmsProperty>
        <default>text/xml</default>
     </rules>
  </parameter>
  <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter>
  <parameter name="transport.jms.DestinationType">topic</parameter>
  <parameter name="transport.jms.SubscriptionDurable">true</parameter>
  <parameter name="transport.jms.Destination">stockDataTopic</parameter>
  <parameter name="transport.jms.DurableSubscriberName">subId-x</parameter>
  <parameter name="transport.jms.CacheLevel">consumer</parameter>
  <parameter name="transport.jms.DurableSubscriberClientID">subId-x</parameter>
  <description/>
</proxy>

Test the setup

SoapUI used to call stockDataPersistProxy and send request to SimpleStockQuoteService

Open SoapUI and create New SOAP Project with WSDL URL of StockDataPersistProxy.
Call StockDataPersistProxy with below SOAP envelop and you can monitor logs in wso2esb console and axis2 servers.

<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="http://services.samples">
  <soapenv:Header/>
  <soapenv:Body>
     <ser:getSimpleQuote>
        <ser:symbol>IBM</ser:symbol>
     </ser:getSimpleQuote>
  </soapenv:Body>
</soapenv:Envelope>


Monday, May 5, 2014

WSO2 Message Broker 2.2.0 cluster setup in a single machine

Topic might be bit wired because how come cluster configure in a single machine ? :) But this article will be much helpful if you really want to test the cluster setup when you don't have more than one machine. Let's look at new feature which is going to introduce with WSO2 Message Broker 2.2.0 and try out cluster setup.

WSO2 Message Broker - Carbon Profile

What is carbon profile?

WSO2 carbon platform has number of advance features. Multi-Profile support is one of it. You can specify in which profile your server needs to be started. If you have experience with Apache Maven profile, you may know that it build the project with given profile. So the relevant configuration setting will be applied in the build time according to the specified profile settings. But in the carbon, profile configuration will apply in the run time. Underlying mechanism is based on the OSGi. It specificies what are the bundles which needs to be started based on the profile. One product can be consisted with multiple profile which support different types of standalone operation. Visit following link to understand more implementation details about carbon multi profile.
https://docs.wso2.org/display/Carbon420/Multiple+Profiles

Why carbon profiles in WSO2 Message Broker?

WSO2 Message Broker works with Apache Cassandra and in clustering environment it use the Apache Zookeeper. By default Message Broker has these two artifacts as features. If you started Message Broker in default way, then it start with both of these artifacts. In the production environment it’s not a scalable and reliable approach to use Message Broker with Cassandra and Zookeeper itself. The recommended approach in production environment is to use the external Cassandra and Zookeeper nodes.

WSO2 Message Broker support external Cassandra and Zookeeper. The recommended versions are cassandra 1.3 and zookeeper 3.4.5 to setup the clustering. If you used some different version there might be other configuration issues which needs to be addressed when Message Broker in high concurrency. Generally there is no errors. The purpose of supporting carbon profiles in Message Broker is to avoid these types of issues. You can easily configure cluster of Message Broker with profiles because it 100% compliant with the recommendation.

WSO2 Message Broker version 2.2.0 onward support the multi-profile. It has two profiles namely cassandra and zookeeper. If you didn’t specify the profile at startup the default profile will be used. Only profile related features will be started and run when Message Broker starts with the profile. There won’t be any Message Broker related features started and run when you specify the profile. This will make the clear separation between the default startup and profile startup. 

Setup clustering environment:

3 WSO2 Message Broker 
1 WSO2 Message Broker as Cassandra profile
3 WSO2 Message Broker as Zookeeper profile

First of all we need to create 3 IP aliases in the machine. Use the following commands to do it.

sudo ifconfig eth0:1 192.168.0.10
sudo ifconfig eth0:2 192.168.0.11
sudo ifconfig eth0:3 192.168.0.12

You can check all the above IP aliases created successfully by just typing ifconfig command.

We need 7 Message Broker packs and I used following folder structure to copy them. Each Message Broker node configure with different port offset to avoid the port conflicts. Please follow below link if you don't know about port offset in carbon based server.
https://docs.wso2.org/display/MB210/Port+Offset+Configuration

Cluster
Type Server Offset
mb1 wso2mb-2.2.0 1
mb2 wso2mb-2.2.0 2
mb3 wso2mb-2.2.0 3
cs wso2mb-2.2.0 4
zk1 wso2mb-2.2.0 5
zk2 wso2mb-2.2.0 6
zk3 wso2mb-2.2.0 7

Zookeeper nodes configuration

Let's configure the Zookeeper nodes first. You have few configuration settings to be done before start the Message Broker in zookeeper profile.

Copy and paste the following settings to the wso2mb-2.2.0/repository/conf/etc/zoo.cfg in each Message Broker node which is going to be started as Zookeeper profile (zk1, zk2, zk3).

Please REMEMBER to CHANGE the clientPort in each Message Broker node as follows to avoid the port conflicts before you save the changes in zoo.cfg file.

in zk1 chnage to clientPort=2181
in zk2 change to clientPort=2182
in zk3 change to clientPort=2183

# The number of milliseconds of each tick
tickTime=2000

# the directory where the snapshot is stored.
# Choose appropriately for your environment
dataDir=repository/data/zookeeper

# the port at which the clients will connect
clientPort=2181

# mb related configuration to handle zookeeper startup
start_zk_server=true

# additional authentication plugin
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

# optional: if it is set to any value, 
# it will only allow non-authenticated clients to ping, create session, close session, or sasl-authenticate
#requireClientAuthScheme=sasl

# renew server-side ticket once an hour. 1000*60*60 = 3600000 milliseconds
jaasLoginRenew=3600000

# The number of ticks that the initial synchronization phase can take
initLimit=5

# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=2

# ZooKeeper server and its port no.
# ZooKeeper ensemble should know about every other machine in the ensemble
# specify server id by creating 'myid' file in the dataDir
# use hostname instead of IP address for convenient maintenance
server.1=192.168.0.10:2888:3888
server.2=192.168.0.11:2889:3889
server.3=192.168.0.12:2890:3890

Next we have to create a file call myid which is defining the id of the zookeeper server to identify it in the cluster environment.

Note that you have to create a folder call zookeeper inside the wso2mb-2.2.0/repository/data/
Then go to the wso2mb-2.2.0/repository/data/zookeeper
Create a file call myid and define id of the zookeeper server and save it. I defined the server id in the myid file as below according to the above scenario.

in zk1 myid file value define as 1
in zk2 myid file value define as 2
in zk3 myid file value define as 3

Now we done with the Zookeeper cluster configuration. Let's start the each Message Broker node as Zookeeper profile with below command.

cluster/zk1/wso2mb-2.2.0/bin$ ./wso2server.sh -Dprofile=zookeeper
cluster/zk2/wso2mb-2.2.0/bin$ ./wso2server.sh -Dprofile=zookeeper
cluster/zk3/wso2mb-2.2.0/bin$ ./wso2server.sh -Dprofile=zookeeper

You will see that few exception will throw when zk1 and zk2 start up complaining that remaining nodes in the zookeeper cluster are not connected. But those exception throwing will stop once all nodes started successfully.

Cassandra node configuration

Next we'll start the Cassandra node. You have nothing to configure, just start the it as below.

cluster/cs/wso2mb-2.2.0/bin$ ./wso2server.sh -Dprofile=cassandra

Broker nodes configuration

There are few settings to be done in the 3 broker nodes. First we have to enable the clustering in each broker node and enable the broker to use the external cassandra and external zookeeper. Change the below settings in each of the following files. Please note that given configuration is just the part which needs to be changed in the andes-config.xml

cluster/mb1/wso2mb-2.2.0/repository/conf/advanced/andes-config.xml
cluster/mb2/wso2mb-2.2.0/repository/conf/advanced/andes-config.xml
cluster/mb3/wso2mb-2.2.0/repository/conf/advanced/andes-config.xml

<clustering>

        <enabled>true</enabled>
        <!--To enable External Cassandra server ? true|false-->
        <externalCassandraServerRequired>true</externalCassandraServerRequired>
        <!--To enable External Zookeeper server ? true|false -->
        <externalZookeeperServerRequired>true</externalZookeeperServerRequired>
        <GlobalQueueCount>10</GlobalQueueCount>
        <coordination>
            <!-- Apache Zookeeper Address -->
            <ZooKeeperConnection>192.168.0.10:2181,192.168.0.11:2182,192.168.0.12:2183</ZooKeeperConnection>

Finally we have to configure the cassandra node virtual host setting in the below locations.

cluster/mb1/wso2mb-2.2.0/repository/conf/advanced/andes-virtualhosts.xml
cluster/mb2/wso2mb-2.2.0/repository/conf/advanced/andes-virtualhosts.xml
cluster/mb3/wso2mb-2.2.0/repository/conf/advanced/andes-virtualhosts.xml

<store>
                <class>org.wso2.andes.server.store.CassandraMessageStore</class>
                <username>admin</username>
                <password>admin</password>
                <cluster>ClusterOne</cluster>
                <idGenerator>org.wso2.andes.server.cluster.coordination.TimeStampBasedMessageIdGenerator</idGenerator>
                <connectionString>localhost:9164</connectionString>

We are done with all broker nodes configurations. Let's start each Message Broker node as below.

cluster/mb1/wso2mb-2.2.0/bin$ ./wso2server.sh
cluster/mb2/wso2mb-2.2.0/bin$ ./wso2server.sh
cluster/mb3/wso2mb-2.2.0/bin$ ./wso2server.sh

Now our Message Broker cluster up and running. You can test any scenario as you want in this cluster setup.

You can visit following link to get more information about the WSO2 Message Broker clustering pattern.