Wednesday, January 9, 2013

WSO2 ESB mediator to publish data to WSO2 BAM

This post will explain, using another carbon OOTB(Out Of The Box) functionality related to carbon data publishing and monitoring.





Prerequisites: 

- Download and install WSO2 ESB 4.5.1(it is required to use the ESB version as >= 4.5.1)
- Download and install WSO2 BAM 2.0.1

What we're going to do:

Figure out way to send/publish WSO2 Enterprise Service Bus (ESB) mediation data to WSO2 Business Activity Monitor (BAM). In this particular case we will consider capturing mediation data of the WSO2 ESB and publish them to BAM.

How to:

 1. Through WSO2 ESB, we create a WSO2 BAM server profile(you can create multiple profiles), and there we define WSO2 BAM server related all required details including stream definitions and etc.
 2.1 We use a pre built WSO2 ESB mediator which is specifically designed to publish data to BAM. It stores the mediation data in to WSO2 BAM default secondary storage(Cassandra database). Inside the aforementioned pre defined bam-mediator, we point to the corresponding server profile.
 2.2. Once step 1 is done, add this mediator to an in-sequence of a ESB proxy service called FooProxy.
 3. When invoking the above created proxy service, the bam mediator will store the mediation data on WSO2 BAM Cassandra database.

Steps in detail :

Step -1. To create a WSO2 BAM server profile in WSO2 ESB, we need to install  BAM Mediator Aggregate feature in WSO2 ESB.

To do so,
- Start WSO2 ESB server with port off set as 1 and log in as admin, and go to Home > Configure > Features > Feature Management. There under the Repository Management tab, click on Add Repository.
- Provide the corresponding p2 repository and save the repository. In this case the corresponding repository url is http://dist.wso2.org/p2/carbon/releases/4.0.2. 
- Once done, move to Available Features tab and find and install the feature called "BAM Mediator Aggregate".

Now you can go to  Home > Configure > BAM Server Profile in WSO2 ESB management console and Add Profile. Once done as follows update and save the configuration.
i.e
  • Profile Name: Name for BAM server profile. (i.e profile1)
  • Server Credential: admin,admin
  • Server Transport: Thrift as the default Protocol.
  • Enable Security: If message confidentiality is required from ESB server to BAM server, select this option.
  • IP Address: IP of the BAM server's Thrift server. i.e localhost
  • Receiver Port:  If security is not enabled, this option will have to be given. Enter 7611 by default, which is the Thrift server port.
  • Authentication Port: Port number is 100 times greater than the Receiver Port, if the latter exists. Default Authentication Port number is 7711.
Now in your profile, you can define a stream. In this case i.e.

Name - stream_1
Version - 1.0.0
Nick Name - my stream
Description - test stream definition

Now click on edit stream on the created stream. You will see some fields that you are asked to fill Stream payload and stream properties.

i.e
When editing a stream, check on "Dump header" or "Dump body," to record SOAP header or SOAP body of messages respectively. Using "Stream Properties" user can extract several types of properties from the incoming message.
  • Value: A constant alpha-numeric string value entered to Value field is set as the property.
  • Expression: Is considered as an expression and executed on the message to get the property. XPath properties and functions available in ESB are valid in the expression. 
i.e



Now we are done with the BAM server profile configuration.

Step-2 : We need to simple create a proxy service that has the BAM mediator as follows. And refer the created server profile in it. If you do not like XML, you can create the BAM mediator from the UI as well. For that please refer [1].

In this particular case I am creating the a WSDL proxy service from the SimpleStockQuoteService sample in WSO2 ESB. (If you are not familiar with WSO2 ESB proxy services refer WSO2 ESB wiki docs[2] for more information.)

To host the SimpleStockQuoteService, 
  - Go to  $ESB_HOME/samples/axis2Server/src/SimpleStockQuoteService and run ant. 
  -  Move to  $ESB_HOME/samples/axis2Server and run  ./axis2server.sh. Now this will start the axis2 service with the SimpleStockQuoteService. You can find the wsdl url at
http://localhost:9000/services/SimpleStockQuoteService?wsdl. 

Final proxy service created out from the above WSDL.

   
      
         
            
               
            
         
      
      
         
      
      
         
      
   
                              
Now we are all done and good to test.

- Start the WSO2 BAM server.
- Invoke the proxy service through sample axis2 client. Go to  $ESB_HOME/samples/axis2Client 
and run the following command . i.e https://localhost:8244/services/Simple_Stock_Quote_Service_Proxy is the created proxy service endpoint url. You can pass any String for symbol.

ant stockquote -Daddurl=https://localhost:8244/services/Simple_Stock_Quote_Service_Proxy -Dmode=fullquote -Dsymbol=testString

Once done the data will be stored in Cassandra database from our BAM mediator. Now BAM mediator in the proxy service, Simple_Stock_Quote_Service_Proxy should have dumped data extracted from the ESB to the key-space, EVENT_KS in the Cassandra database, with column family name same as the Stream Name. Data in the Cassandra database can be seen from the Cassandra Explorer in the BAM server.

[1] - http://docs.wso2.org/wiki/display/BAM200/Setting+up+BAM+Mediator
[2] - http://docs.wso2.org/wiki/display/ESB451/Enterprise+Service+Bus+Documentation

Wednesday, January 2, 2013

Publish WSO2 Governance Registry Data to WSO2 BAM


This post will explain how to publish WSO2 G-Reg generated events to WSO2 BAM server. In other words, this is a custom data publisher for G-Reg/BAM.


   

Pre-requisites

- Install WSO2 Business Activity Monitor (BAM 2.0.0)
- Install WSO2 Governance Registry(G-Reg 4.5.3)

What needs to done:
a) Configure WSO2 BAM to accept events generated from WSO2 G-Reg
b) Publish events generated from G-Reg to the BAM event listeners.

How to:
- To achieve (a), we need to install a relevant Toolbox to BAM.
(A BAM Toolbox is an installable archive which contains stream definitions, dashboard components and analytics for WSO2 Business Activity Monitor. KPI_Registry_Activity.tbox is a such pre-built BAM Toolbox based on the KPI Monitoring sample of WSO2 BAM. Further you can read the KPI Monitoring sample of WSO2 BAM to learn how to make changes to this BAM Toolbox or create your own.)

 - To achieve (b), in WSO2 Carbon Kernel, we are given an extension point to collect stats from a running carbon server. To accomplish this, we need to write a class that extends StatisticsCollector interface and override the collect method such that it will publish the collected event to WSO2 BAM.  

I hope now you got a brief understanding of how to achieve this goal. So lets figure out what needs to be done to achieve (a) and (b)  in detail.


a) Configure WSO2 BAM to accept events generated from WSO2 G-Reg

- Download the KPI_Registry_Activity.tbox .
- Start WSO2 BAM server and login as admin (username - admin password -admin).
- In BAM management console go to BAM Toolbox > Custom Toolbox   select the downloaded toolbox from the file system and click Install.

Now you have the tool box installed which awaits and listens to the events coming through its tcp event listener port.

b)  To add a stat collector which publish the collected stats to WSO2 BAM, we need to write a custom OSGi bundle which register the statistic collector service at bundle startup.

To write a new OSGi bundle we can reuse the greg handler sample code. Find the handler sample in
GREG_HOME/samples/handler.

There add a new class called RegistryStatCollectorServiceComponent under the package  org.wso2.carbon.registry.samples.statistics.

In the sample pom file, you need to do the following changes to make the code work.
Add following dependencies to the pom.


    org.wso2.carbon
    org.wso2.carbon.databridge.agent.thrift
    4.0.1


    org.wso2.carbon
    org.wso2.carbon.databridge.commons
    4.0.0


Add the following plugin.

    org.apache.felix
    maven-scr-plugin


Commet-out the following exclusions in the pom file.



Now edit the previously created RegistryStatCollectorServiceComponent class as follows.
package org.wso2.carbon.registry.samples.statistics;

 

import org.osgi.framework.ServiceRegistration;

import org.osgi.service.component.ComponentContext;

import org.wso2.carbon.context.CarbonContext;
import org.wso2.carbon.databridge.agent.thrift.Agent;
import org.wso2.carbon.databridge.agent.thrift.DataPublisher;
import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.exception.NoStreamDefinitionExistException;
import org.wso2.carbon.registry.core.service.RegistryService;
import org.wso2.carbon.registry.core.statistics.StatisticsCollector;
import org.wso2.carbon.registry.core.utils.RegistryUtils;
import org.wso2.carbon.utils.NetworkUtils;
 

/**

 * @scr.component name="org.wso2.carbon.registry.samples.statistics" immediate="true"

 * @scr.reference name="registry.service" interface="org.wso2.carbon.registry.core.service.RegistryService"

 * cardinality="1..1" policy="dynamic" bind="setRegistryService" unbind="unsetRegistryService"

 */

public class RegistryStatCollectorServiceComponent {

 

    public static final String REGISTRY_ACTIVITY_STREAM = "org.wso2.bam.registry.activity.kpi";

    public static final String VERSION = "1.0.0";

 

    private ServiceRegistration serviceRegistration;

 

    protected void activate(ComponentContext context) {

        serviceRegistration = context.getBundleContext().registerService(

                StatisticsCollector.class.getName(), new StatisticsCollector() {

            public void collect(Object... objects) {

                try {

                    // Create Data Publisher

                    RegistryUtils.setTrustStoreSystemProperties();

                    DataPublisher dataPublisher = new DataPublisher(

                            "tcp://" + NetworkUtils.getLocalHostname() + ":7612", "admin", "admin",

                            new Agent(new AgentConfiguration()));

 

                    // Find Data Stream

                    String streamId;

                    try {

                        streamId = dataPublisher.findStream(REGISTRY_ACTIVITY_STREAM, VERSION);

                    } catch (NoStreamDefinitionExistException ignored) {

                        streamId = dataPublisher.defineStream("{" +

                                "  'name':'" + REGISTRY_ACTIVITY_STREAM + "'," +

                                "  'version':'" + VERSION + "'," +

                                "  'nickName': 'Registry_Activity'," +

                                "  'description': 'Registry Activities'," +

                                "  'metaData':[" +

                                "          {'name':'clientType','type':'STRING'}" +

                                "  ]," +

                                "  'payloadData':[" +

                                "          {'name':'operation','type':'STRING'}," +

                                "          {'name':'user','type':'STRING'}" +

                                "  ]" +

                                "}");

                    }

 

                    if (!streamId.isEmpty()) {

                        // Publish Event to Stream

                        dataPublisher.publish(new Event(

                                streamId, System.currentTimeMillis(),

                                new Object[]{"external"}, null, new Object[]{

                                Thread.currentThread().getStackTrace()[3].getMethodName(),

                                CarbonContext.getCurrentContext().getUsername()}));

                        dataPublisher.stop();

                        System.out.println("Successfully Published Event");

                    }

 

                } catch (Exception e) {

                    e.printStackTrace();

                }

            }

        }, null);

    }

 

    protected void deactivate(ComponentContext context) {

        serviceRegistration.unregister();

    }

 

    protected void setRegistryService(RegistryService registryService) {

        // The Maven SCR Plugin Needs These

    }

 

    protected void unsetRegistryService(RegistryService registryService) {

        // The Maven SCR Plugin Needs These

    }

}

- Now go to  REG_HOME/ samples/handler/src and do a maven clean install to build the project. Once done inside the target folder of the sample, you can find the updated jar file.

- Copy the jar  org.wso2.carbon.registry.samples.handler-4.5.3.jar in to  REG_HOME/repository/components/dropins folder.

- Go to REG_HOME/repository/conf/carbon.xml and change the offset value to 1 as follows.
- Now start the server and do registry operations such as adding WSDLs/Services /Resources and etc (these actions will create events from WSO2 G-Reg side and publish them to WSO2 BAM) and check the WSO2 BAM  Gadget Portal and you will find the operations you did published as events on WSO2 BAM dashboard.

For more info:visit WSO2 G-Reg wiki docs - http://docs.wso2.org/wiki/display/Governance453/WSO2+Governance+Registry+Documentation