Sunday, June 30, 2013

Prepare your Event Driven, Runtime Governance Platform POC in 30 minutes: WSO2 way


Prepare your Event Driven Runtime Governance platform POC to your management in 30 minutes.

This post will show you on how to achieve Runtime Governance in an Event Driven way using WSO2 Carbon Platform.

 Here I will be explaining how to achieve this using two key WSO2 products.
 - WSO2 Governance Registry (G-Reg)
 - WSO2 Complex Event Processing Server. (CEP)

 G-Reg is the governance registry and repository product for WSO2 while CEP will take care of processing complex events in high throughput.

 Pre requisits
- Download WSO2 G-Reg 4.5.3 [1].
- Download WSO2 CEP 2.1.0 [2].

 What is our POC.

 - I have an API metadata artifact created in GReg side. (for more information on what is an artifact[3])
 - It has a LifeCycle that has Development,Testing and Production states.
 - If any API comes to the Production state, and if the production API gets updated frequently, demote the API to Testing state. ( In this case, if API is getting updated more than 3 times within an 2 minutes I am demoting the API back to Testing state. Which means it should be tested more.

 NOTE: This use case is just a simple use case on how to achieve this.This might not be a ideal practical governance example.

 How it works:

 #1. G-Reg publishes all registry events to CEP.( through Apache Thrift protocol).
 #2. In CEP, a query is waiting for a pattern/condition based on the events decides what to do.
 #3. When CEP fires an event, it calls to G-Reg remotely and demote the API artifact from Staging state to Testing state.



 Lets see how to achieve above 3 steps in detail.

 Step #1

- Navigate to GREG_HOME/ samples/handler/src to find the source code of the Handler Sample.
- Add the following dependencies to your POM file:

    org.wso2.carbon
    org.wso2.carbon.databridge.agent.thrift
    4.0.1


    org.wso2.carbon
    org.wso2.carbon.databridge.commons
    4.0.0

- Comment-out the following exclusions in your POM file:


- Add the following plugin to your POM file:

    org.apache.felix
    maven-scr-plugin

- Add a new Java Class named StatisticsCollectorServiceComponent at GREG_HOME/samples/handler/src/src/main/java/org/wso2/carbon/registry/samples/statistics/StatisticsCollectorServiceComponent.java with the following source:
package org.wso2.carbon.registry.samples.statistics;
  
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.ComponentContext;
import org.wso2.carbon.context.CarbonContext;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.databridge.agent.thrift.Agent;
import org.wso2.carbon.databridge.agent.thrift.DataPublisher;
import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.exception.NoStreamDefinitionExistException;
import org.wso2.carbon.registry.core.service.RegistryService;
import org.wso2.carbon.registry.core.statistics.StatisticsCollector;
import org.wso2.carbon.registry.core.utils.RegistryUtils;
import org.wso2.carbon.registry.samples.statistics.util.ArtifactUtil;
import org.wso2.carbon.utils.NetworkUtils;
  
/**
 * @scr.component name="org.wso2.carbon.registry.samples.statistics" immediate="true"
 * @scr.reference name="registry.service" interface="org.wso2.carbon.registry.core.service.RegistryService"
 * cardinality="1..1" policy="dynamic" bind="setRegistryService" unbind="unsetRegistryService"
 */
public class StatisticsCollectorServiceComponent {
  
    public static final String REGISTRY_ACTIVITY_STREAM = "govstream1";
    public static final String VERSION = "1.0.0";
    private RegistryService registryService;
  
    private ServiceRegistration serviceRegistration;
  
    protected void activate(ComponentContext context) {
        serviceRegistration = context.getBundleContext().registerService(
                StatisticsCollector.class.getName(), new StatisticsCollector() {
            public void collect(Object... objects) {
                try {
                    // Create Data Publisher
                    RegistryUtils.setTrustStoreSystemProperties();
                    DataPublisher dataPublisher = new DataPublisher(
                            "tcp://" + NetworkUtils.getLocalHostname() + ":7612", "admin", "admin",
                            new Agent(new AgentConfiguration()));
  
                    // Find Data Stream
                    String streamId;
                    try {
                        streamId = dataPublisher.findStream(REGISTRY_ACTIVITY_STREAM, VERSION);
                    } catch (NoStreamDefinitionExistException ignored) {
                        streamId = dataPublisher.defineStream("{" +
                                "  'name':'" + REGISTRY_ACTIVITY_STREAM + "'," +
                                "  'version':'" + VERSION + "'," +
                                "  'nickName': 'Registry_Activity'," +
                                "  'description': 'Registry Activities'," +
                                "  'metaData':[" +
                                "          {'name':'clientType','type':'STRING'}" +
                                "  ]," +
                                "  'payloadData':[" +
                                "          {'name':'operation','type':'STRING'}," +
                                "          {'name':'user','type':'STRING'}," +
                                "          {'name':'rpath','type':'STRING'}" +
                                "  ]" +
                                "}");
                    }
  
                    if (!streamId.isEmpty() && objects.length>=2) {
                        // Publish Event to Stream
                        Event event = new Event(
                                streamId, System.currentTimeMillis(),
                                new Object[]{"external"}, null,
                                new Object[]{
                                ArtifactUtil.isUpdate(objects),
                                ArtifactUtil.getLCState(registryService.getRegistry(CarbonContext.getCurrentContext().getUsername(), PrivilegedCarbonContext.getCurrentContext().getTenantId()),ArtifactUtil.getArtifactAttribute(objects[2] != null?objects[2].toString():"nil","name")),
                                ArtifactUtil.getAPIResourcePath(objects[2] != null?objects[2].toString():"nil",objects)
                                }
                        );
                        dataPublisher.publish(event);
                        dataPublisher.stop();
                        System.out.println("Successfully Published Event");
                    }
  
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, null);
    }
  
    protected void deactivate(ComponentContext context) {
        serviceRegistration.unregister();
    }
  
    protected void setRegistryService(RegistryService registryService) {
        this.registryService=registryService;
        // The Maven SCR Plugin Needs These

    }
  
    protected void unsetRegistryService(RegistryService registryService) {
        this.registryService=null;
        // The Maven SCR Plugin Needs These
    }
}

- Compile the source code by running the following command inside GREG_HOME/ samples/handler/src: mvn clean install. And you will get the built jar inside target folder.
 - Now copy the to GREG_HOME/repository/component/plugins folder.
 - Start the G-Reg server.

 Step #2.

 - Now we have to define a rule in CEP server which will listen to our pattern mentioned in the POC scenario. For that in CEP we can configure something called CEP bucket which defines a query to listens to events and fire events when query is satisfied.

Following is the bucket confuguration xml file.
 - Create a file called GovernanceAnalizer.xml with the following xml content.
 - Copy the file inside CEP_HOME/repository/deployment/server/cepbuckets folder

  testsss

  
    0
    false
  

  
    
      
      
      
    
  

  
    from govstream1[operation=='update' and user=='Production']#window.timeBatch( 1 min )
                         insert into outStream2 count(operation) as countProp ,user,rpath having countProp==3;
    
      
        
        
        
          
          
          
        
      
    
  


....................................Description of the cep bucket configuration.............................................

- "engineProviderConfiguration" element describes the CEP runtime engine details. This has nothing related to the flow we have, but some static configs.

- We define a broker to listen to the events send by G-Reg.
   
  
    
      
      
      
    
  
Here this configuration is defined to match with the event schema sent from the G-Reg side. In this G-Reg sends events in an stream called "govstream1" with the version "1.0.0" which has three inputs in the payload named "operation", "user" and "rpath". Now we define the CEP query [4].
from govstream1[operation=='update' and user=='Production']#window.timeBatch( 1 min )
                         insert into outStream2 count(operation) as countProp ,user,rpath having countProp==3; 

This means listen to events whose operation attribute value is 'update' and user attribute value is 'Production' with in a 2 minutes sliding time window and fire and event if 3 'update' operations comes within that time window.

Finally we define the outout stream as follows where you can do further processing in the call back method. Here we define a particular broker called "governance_agent_broker" who is a custom broker created specifically for this case which will remotely call G-Reg and demote the API artifact.

      
        
        
        
          
          
          
        
      
    
..................................End of Description of the CEP bucket configuration............................

 Step #3 

This step will call G-Reg remotely and demote API artifact from Production state to Testing. This is where we define a custom event broker named "governance_agent_broker" as explained in the description.

Please follow [5] on how to cretae a custom event broker. There what you have to do is write a cutom broker class and a corresponding factory class and deploy them. Please use following Custom Broker class and Factory class for this case.

public class GovernanceBroker implements BrokerType {

    public BrokerTypeDto getBrokerTypeDto() {
        BrokerTypeDto brokerTypeDto = new BrokerTypeDto();
        brokerTypeDto.setName("GovernanceAgent");
        return brokerTypeDto ;
    }

    public String subscribe(String s, BrokerListener brokerListener, BrokerConfiguration brokerConfiguration, AxisConfiguration axisConfiguration) throws BrokerEventProcessingException {
        return null;  //To change body of implemented methods use File | Settings | File Templates.
    }

    public void publish(String s, Object o, BrokerConfiguration brokerConfiguration) throws BrokerEventProcessingException {
        Event event = (Event) ((Object[]) o)[0];
       Object [] objects = event.getPayloadData();
       String operation= objects[0].toString();
        String user= objects[1].toString();
        String path= objects[2].toString();

         for(Object oo:objects){
           if(oo != null){
               System.out.println("+++"+oo.toString());
           }
         }

        String CARBON_HOME = "PATH TO CEP HOME";
        String username = "admin";
        String password = "admin";
        String serverURL = "https://localhost:9443/registry";
        String axis2Conf = ServerConfiguration.getInstance().getFirstProperty("Axis2Config.clientAxis2XmlLocation");
        String axis2Repo = CARBON_HOME + File.separator + "repository" +
                File.separator + "deployment" + File.separator + "client";
        System.setProperty("javax.net.ssl.trustStore", CARBON_HOME + File.separator + "repository" +
                File.separator + "resources" + File.separator + "security" + File.separator +
                "wso2carbon.jks");
        System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
        System.setProperty("javax.net.ssl.trustStoreType", "JKS");
        System.setProperty("carbon.repo.write.mode", "true");


        try {
            RemoteRegistry registry = new RemoteRegistry(new URL(serverURL), username,password);
            registry.invokeAspect(path,"SampleLifeCycle","Demote");
            System.out.println("Successfully Demoted the API at "+path);
        } catch (RegistryException e) {
            System.out.println("ERR" +e.getMessage());
        } catch (MalformedURLException e) {
            System.out.println("ERR" +e.getMessage());
        }

    }

    public void testConnection(BrokerConfiguration brokerConfiguration) throws BrokerEventProcessingException {
        //To change body of implemented methods use File | Settings | File Templates.
    }

    public void unsubscribe(String s, BrokerConfiguration brokerConfiguration, AxisConfiguration axisConfiguration, String s1) throws BrokerEventProcessingException {
        //To change body of implemented methods use File | Settings | File Templates.
    }
}

public class GovernanceBrokerFactoryImpl implements BrokerTypeFactory {

    public BrokerType getBrokerType() {
        return new GovernanceBroker();
    }
}
After Step 3 is completed we are all done with our POC.

Now open CEP_HOME/repository/conf/carbon.xml and set the value to 1 and start the CEP server. 

Lets test this.

 - Login to G-Reg management console https://localhost:9443/carbon/ with admin, admin as username and password.

- Goto Home > Extensions > Configure > Lifecycles. Click Add new Lifecycle and you will see a configuration xml file. Just click on save without doing anything. Now you have created a life cycle called "SampleLifecycle"

 - Goto Home > Metadata > Add > API and create and API by filling some dummy meta data values. enough to only fill the required fields.

 - Now attach the previouly creatd "SimpleLifycycle" to the API we just saved . Refer [6] on how to achieve this in more details.

 - Once a Lifecycle is attached to the API, you will see it is in Development state. Now click "promote" to move it to "Testing" and do the same to push it to "Production".

 - Once it is in Production, this is our scenario. If I edit the API artifact (which is now in Production state) 3 times, you will see that it will automatically demoted back to "Testing" state. The magic is done through CEP ;-).


And if you are further interested in extending this to a wider integrated platform, following is a better deployment, where you have a Business Activity Monitor to monitor and analyze all the business events.



That's it. The post is lengthy since I am explaining this in more details for your understanding.



[1] - http://wso2.com/products/governance-registry
[2] - http://wso2.com/products/complex-event-processor
[3] - http://docs.wso2.org/wiki/display/Governance453/Governance+Artifacts.
[4] - http://docs.wso2.org/wiki/display/CEP210/Queries
[5] - http://docs.wso2.org/wiki/display/CEP210/Writing+Custom+Broker
[6] - http://docs.wso2.org/wiki/display/Governance453/Managing+Lifecycle



Thursday, June 20, 2013

10 Benefits of having a Proper Governance Platform for your Enterprise


The content in this post is only based on my personal views on enterprise governance with latest industry trends. In the latter part I will further discuss a single platform from which you can accomplish all these benefits.

                                                                                 

1. Attracts investors. Investors always put their money behind companies that enforce high internal governance standards. Proper governance layer on top of your enterprise assets will ensure attract more investors.

2. Ensure your enterprise assets behaves exactly as you expects them to be. Helps identify hot spots of your enterprise assets. When things are not going in your way, proper governance layer helps you to take most accurate instant decisions. (helps you easily locate what are the red spots and which assets best performing, which are not, which assets consume more resource and which are in need more resources and etc.)

3. Change Management. When every single asset of your enterprise is under your control with couple of clicks away,  rapid changes for your enterprise will  no more be a nightmare. Governance can facilitate fast, effective decision making across both business and IT sectors.

4. Best way to govern non electronic physical assets. Governance isn't something exclusive to IT, it’s applied throughout the industry. You can create a virtual representation of any enterprise asset and govern them in a meaningful way.

5. Ability to govern enterprise assets with human interactions involved. No need of an entire automated system but if required, can have some break points to involve people to interact and make decisions and continue further.

6. Decide the lifecycle of each enterprise asset or a group of enterprise assets. Ability to define and develop an entire lifecycle of an enterprise asset and apply policies in design time as well as in runtime.

7. Let your legacy software systems also be part governing your assets in a smooth and controllable manner.  Provides standard interfaces which lets existing legacy systems also be part of governing your enterprise assets along with the main governance platform layer.

8. Restructure your unstructured assets and start reusing common services and save your time. Helps you identify what are the common business services across your enterprise and facilitate to identify them and reuse them.

9. Impact Analysis. Allows to analyze the impact of an absence of any enterprise asset.

10. Event Driven Governance - Govern your enterprise assets based on business events. Allows your enterprise to take Automated Real Time Decisions based on thousands of  business event streams you have. This will allows you to take very sensitive decisions which  can make huge impact to your enterprise.   

So now you should have a better understanding on how a proper enterprise governance layer can lift you from the rest.

Were you ever able/tried find a single product to achieve all these benefits ?  

Award winning WSO2[1] Carbon Platform provides you a comprehensive set of features which facilitates the entire spectrum of current enterprise governance requirements. Yes it is that much extensible and flexible to your needs.

If you know your governance requirements in detail, you will always find a matching WSO2 Carbon feature which accomplish your requirements.

So to accomplish a proper governance platform based on your requirements, mainly you can use following set of well known products/features from WSO2.

WSO2 Governence Registry [2]
WSO2 Business Activity Monitor [3]
WSO2 Complex Event Processing Server [4]

Yes. Coming back to the question. Thanks to WSO2 Carbon, it will do the magic for you.

You can combine all three products and create a one product which will be your governance layer. It is simple as that. Based on the traffic you have, if you want, you can even separate them out in to different VMs/clusters. You can have your governance layer in the form of on premise as well as in the cloud.

Even if you like to use these as separate products it will give out a similar experience and complexity as you have them all in one binary. Because the integration among WSO2 products is quite smooth.

[1]-  http://wso2.com/
[2] - http://wso2.com/products/governance-registry
[3] - http://wso2.com/products/business-activity-monitor
[4] - http://wso2.com/products/complex-event-processor

Wednesday, May 22, 2013

Intercept your JPA (OpenJPA) calls with Entity Listeners


This is a quite a simple thing to achieve, but also quite useful.

1. You can configure listeners  per entity. Can have multiple callbacks for one entity.
2. Register a default listener which will intercept call JPA calls.


There are callback methods supported by JPA as follows. Refer [1] for more info.

  •  PrePersist: Methods marked with this annotation will be invoked before an object is persisted. 
  •  PostPersist: Methods marked with this annotation will be invoked after an object has transitioned to the persistent state.
  •  PostLoad: Methods marked with this annotation will be invoked after all eagerly fetched fields of your class have been loaded from the datastore. No other persistent fields can be accessed in this method. 
  •  PreUpdate: Methods marked with this annotation will be invoked just the persistent values in your objects are flushed to the datastore. 
  •  PostUpdate: Methods marked with this annotation will be invoked after changes to a given instance have been stored to the datastore. 
  •  PreRemove: Methods marked with this annotation will be invoked before an object transactions to the deleted state. Access to persistent fields is valid within this method. 
  •  PostRemove: Methods marked with this annotation will be invoked after an object has been marked as to be deleted. This is equivalent to the XML element tag post-remove.

So in your Entity class you can have a annotation as follows and register multiple listener classes.

@EntityListeners({ MyListenerOne.class, ... })

And once those are defined as above, in OpenJPA,  there is this configuration file  
META-INF/persistence.xml  in your class path. There you define the Persistence provider and etc. 
Meanwhile you can provide a pointer to a  mapping file which has the details of the entity listeners and for what callbacks these listeners should listen.

That is META-INF/orm.xml. So the orm.xml file is the file that you should define the mapping of the entity listeners and callback methods.

i.e in orm you can add following.

    
        
        
        addData
        removeData
        
    
So as you expected, the listener class "MyListener" should be as follows.
    public class MagazineLogger {

            @PostPersist
            public void logAddition(Object pc) {
            //
            }

            @PreRemove
            public void logDeletion(Object pc) {
            //
            }
            }
If you want to register a default listener that listen to all JPA calls, your orm.xml should add following 
    
        
            
                
                    
                        
                            
                            
                        
                    
                
            
        
    
Download OpenJPA runtime and try your self. ;-)

Wednesday, April 17, 2013

java.lang.UnsupportedOperationException: The new addressing based sytanx is not supported for AMQP 0-8/0-9 versions at WSO2 ESB and MB integration scenario


Hi all,
I just wanna mention some couple of facts that can cause $subject since this is a error which occurs quite frequently for users. The reason behind this issue is bit tricky because even though the exception tells something, but actually what could happen is something else.

1. If you have missed any configuration related to WSO2 ESB and MB integration i.e adding queue info in jndi properties files and etc ESB will give this error since ESB is the JMS client. Pleas refer [1]

2. If you are sure that all the configurations are done but still getting this error, there is one other reason for this. When you enable axis2 JMS transports in ESB for the MB ESB integration scenario as per docs, by default the JMS transport will be enabled to all the services you have hosted there.For instance  you might have axis2 services/data services inside. So for those it expect those to inherit the JMS transport also. But your already deployed such axis2 services might not need JMS transport.

The solution that comes to mind at first is to add JMS transport to all other services deployed which is also a solution. But it is not practical to change existing service due to MB integratoin.

So the possible best solution is as follows.
 So in such case what you need to do is for each such service create a axis2 services xml file such that it specifically
mention that it needs only HTTP/HTTPs like transports.

i.e


    
        
            https
            http
             
    



[1] - http://docs.wso2.org/wiki/display/MB201/Integrating+WSO2+ESB

Wednesday, January 9, 2013

WSO2 ESB mediator to publish data to WSO2 BAM

This post will explain, using another carbon OOTB(Out Of The Box) functionality related to carbon data publishing and monitoring.





Prerequisites: 

- Download and install WSO2 ESB 4.5.1(it is required to use the ESB version as >= 4.5.1)
- Download and install WSO2 BAM 2.0.1

What we're going to do:

Figure out way to send/publish WSO2 Enterprise Service Bus (ESB) mediation data to WSO2 Business Activity Monitor (BAM). In this particular case we will consider capturing mediation data of the WSO2 ESB and publish them to BAM.

How to:

 1. Through WSO2 ESB, we create a WSO2 BAM server profile(you can create multiple profiles), and there we define WSO2 BAM server related all required details including stream definitions and etc.
 2.1 We use a pre built WSO2 ESB mediator which is specifically designed to publish data to BAM. It stores the mediation data in to WSO2 BAM default secondary storage(Cassandra database). Inside the aforementioned pre defined bam-mediator, we point to the corresponding server profile.
 2.2. Once step 1 is done, add this mediator to an in-sequence of a ESB proxy service called FooProxy.
 3. When invoking the above created proxy service, the bam mediator will store the mediation data on WSO2 BAM Cassandra database.

Steps in detail :

Step -1. To create a WSO2 BAM server profile in WSO2 ESB, we need to install  BAM Mediator Aggregate feature in WSO2 ESB.

To do so,
- Start WSO2 ESB server with port off set as 1 and log in as admin, and go to Home > Configure > Features > Feature Management. There under the Repository Management tab, click on Add Repository.
- Provide the corresponding p2 repository and save the repository. In this case the corresponding repository url is http://dist.wso2.org/p2/carbon/releases/4.0.2. 
- Once done, move to Available Features tab and find and install the feature called "BAM Mediator Aggregate".

Now you can go to  Home > Configure > BAM Server Profile in WSO2 ESB management console and Add Profile. Once done as follows update and save the configuration.
i.e
  • Profile Name: Name for BAM server profile. (i.e profile1)
  • Server Credential: admin,admin
  • Server Transport: Thrift as the default Protocol.
  • Enable Security: If message confidentiality is required from ESB server to BAM server, select this option.
  • IP Address: IP of the BAM server's Thrift server. i.e localhost
  • Receiver Port:  If security is not enabled, this option will have to be given. Enter 7611 by default, which is the Thrift server port.
  • Authentication Port: Port number is 100 times greater than the Receiver Port, if the latter exists. Default Authentication Port number is 7711.
Now in your profile, you can define a stream. In this case i.e.

Name - stream_1
Version - 1.0.0
Nick Name - my stream
Description - test stream definition

Now click on edit stream on the created stream. You will see some fields that you are asked to fill Stream payload and stream properties.

i.e
When editing a stream, check on "Dump header" or "Dump body," to record SOAP header or SOAP body of messages respectively. Using "Stream Properties" user can extract several types of properties from the incoming message.
  • Value: A constant alpha-numeric string value entered to Value field is set as the property.
  • Expression: Is considered as an expression and executed on the message to get the property. XPath properties and functions available in ESB are valid in the expression. 
i.e



Now we are done with the BAM server profile configuration.

Step-2 : We need to simple create a proxy service that has the BAM mediator as follows. And refer the created server profile in it. If you do not like XML, you can create the BAM mediator from the UI as well. For that please refer [1].

In this particular case I am creating the a WSDL proxy service from the SimpleStockQuoteService sample in WSO2 ESB. (If you are not familiar with WSO2 ESB proxy services refer WSO2 ESB wiki docs[2] for more information.)

To host the SimpleStockQuoteService, 
  - Go to  $ESB_HOME/samples/axis2Server/src/SimpleStockQuoteService and run ant. 
  -  Move to  $ESB_HOME/samples/axis2Server and run  ./axis2server.sh. Now this will start the axis2 service with the SimpleStockQuoteService. You can find the wsdl url at
http://localhost:9000/services/SimpleStockQuoteService?wsdl. 

Final proxy service created out from the above WSDL.

   
      
         
            
               
            
         
      
      
         
      
      
         
      
   
                              
Now we are all done and good to test.

- Start the WSO2 BAM server.
- Invoke the proxy service through sample axis2 client. Go to  $ESB_HOME/samples/axis2Client 
and run the following command . i.e https://localhost:8244/services/Simple_Stock_Quote_Service_Proxy is the created proxy service endpoint url. You can pass any String for symbol.

ant stockquote -Daddurl=https://localhost:8244/services/Simple_Stock_Quote_Service_Proxy -Dmode=fullquote -Dsymbol=testString

Once done the data will be stored in Cassandra database from our BAM mediator. Now BAM mediator in the proxy service, Simple_Stock_Quote_Service_Proxy should have dumped data extracted from the ESB to the key-space, EVENT_KS in the Cassandra database, with column family name same as the Stream Name. Data in the Cassandra database can be seen from the Cassandra Explorer in the BAM server.

[1] - http://docs.wso2.org/wiki/display/BAM200/Setting+up+BAM+Mediator
[2] - http://docs.wso2.org/wiki/display/ESB451/Enterprise+Service+Bus+Documentation

Wednesday, January 2, 2013

Publish WSO2 Governance Registry Data to WSO2 BAM


This post will explain how to publish WSO2 G-Reg generated events to WSO2 BAM server. In other words, this is a custom data publisher for G-Reg/BAM.


   

Pre-requisites

- Install WSO2 Business Activity Monitor (BAM 2.0.0)
- Install WSO2 Governance Registry(G-Reg 4.5.3)

What needs to done:
a) Configure WSO2 BAM to accept events generated from WSO2 G-Reg
b) Publish events generated from G-Reg to the BAM event listeners.

How to:
- To achieve (a), we need to install a relevant Toolbox to BAM.
(A BAM Toolbox is an installable archive which contains stream definitions, dashboard components and analytics for WSO2 Business Activity Monitor. KPI_Registry_Activity.tbox is a such pre-built BAM Toolbox based on the KPI Monitoring sample of WSO2 BAM. Further you can read the KPI Monitoring sample of WSO2 BAM to learn how to make changes to this BAM Toolbox or create your own.)

 - To achieve (b), in WSO2 Carbon Kernel, we are given an extension point to collect stats from a running carbon server. To accomplish this, we need to write a class that extends StatisticsCollector interface and override the collect method such that it will publish the collected event to WSO2 BAM.  

I hope now you got a brief understanding of how to achieve this goal. So lets figure out what needs to be done to achieve (a) and (b)  in detail.


a) Configure WSO2 BAM to accept events generated from WSO2 G-Reg

- Download the KPI_Registry_Activity.tbox .
- Start WSO2 BAM server and login as admin (username - admin password -admin).
- In BAM management console go to BAM Toolbox > Custom Toolbox   select the downloaded toolbox from the file system and click Install.

Now you have the tool box installed which awaits and listens to the events coming through its tcp event listener port.

b)  To add a stat collector which publish the collected stats to WSO2 BAM, we need to write a custom OSGi bundle which register the statistic collector service at bundle startup.

To write a new OSGi bundle we can reuse the greg handler sample code. Find the handler sample in
GREG_HOME/samples/handler.

There add a new class called RegistryStatCollectorServiceComponent under the package  org.wso2.carbon.registry.samples.statistics.

In the sample pom file, you need to do the following changes to make the code work.
Add following dependencies to the pom.


    org.wso2.carbon
    org.wso2.carbon.databridge.agent.thrift
    4.0.1


    org.wso2.carbon
    org.wso2.carbon.databridge.commons
    4.0.0


Add the following plugin.

    org.apache.felix
    maven-scr-plugin


Commet-out the following exclusions in the pom file.



Now edit the previously created RegistryStatCollectorServiceComponent class as follows.
package org.wso2.carbon.registry.samples.statistics;

 

import org.osgi.framework.ServiceRegistration;

import org.osgi.service.component.ComponentContext;

import org.wso2.carbon.context.CarbonContext;
import org.wso2.carbon.databridge.agent.thrift.Agent;
import org.wso2.carbon.databridge.agent.thrift.DataPublisher;
import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.exception.NoStreamDefinitionExistException;
import org.wso2.carbon.registry.core.service.RegistryService;
import org.wso2.carbon.registry.core.statistics.StatisticsCollector;
import org.wso2.carbon.registry.core.utils.RegistryUtils;
import org.wso2.carbon.utils.NetworkUtils;
 

/**

 * @scr.component name="org.wso2.carbon.registry.samples.statistics" immediate="true"

 * @scr.reference name="registry.service" interface="org.wso2.carbon.registry.core.service.RegistryService"

 * cardinality="1..1" policy="dynamic" bind="setRegistryService" unbind="unsetRegistryService"

 */

public class RegistryStatCollectorServiceComponent {

 

    public static final String REGISTRY_ACTIVITY_STREAM = "org.wso2.bam.registry.activity.kpi";

    public static final String VERSION = "1.0.0";

 

    private ServiceRegistration serviceRegistration;

 

    protected void activate(ComponentContext context) {

        serviceRegistration = context.getBundleContext().registerService(

                StatisticsCollector.class.getName(), new StatisticsCollector() {

            public void collect(Object... objects) {

                try {

                    // Create Data Publisher

                    RegistryUtils.setTrustStoreSystemProperties();

                    DataPublisher dataPublisher = new DataPublisher(

                            "tcp://" + NetworkUtils.getLocalHostname() + ":7612", "admin", "admin",

                            new Agent(new AgentConfiguration()));

 

                    // Find Data Stream

                    String streamId;

                    try {

                        streamId = dataPublisher.findStream(REGISTRY_ACTIVITY_STREAM, VERSION);

                    } catch (NoStreamDefinitionExistException ignored) {

                        streamId = dataPublisher.defineStream("{" +

                                "  'name':'" + REGISTRY_ACTIVITY_STREAM + "'," +

                                "  'version':'" + VERSION + "'," +

                                "  'nickName': 'Registry_Activity'," +

                                "  'description': 'Registry Activities'," +

                                "  'metaData':[" +

                                "          {'name':'clientType','type':'STRING'}" +

                                "  ]," +

                                "  'payloadData':[" +

                                "          {'name':'operation','type':'STRING'}," +

                                "          {'name':'user','type':'STRING'}" +

                                "  ]" +

                                "}");

                    }

 

                    if (!streamId.isEmpty()) {

                        // Publish Event to Stream

                        dataPublisher.publish(new Event(

                                streamId, System.currentTimeMillis(),

                                new Object[]{"external"}, null, new Object[]{

                                Thread.currentThread().getStackTrace()[3].getMethodName(),

                                CarbonContext.getCurrentContext().getUsername()}));

                        dataPublisher.stop();

                        System.out.println("Successfully Published Event");

                    }

 

                } catch (Exception e) {

                    e.printStackTrace();

                }

            }

        }, null);

    }

 

    protected void deactivate(ComponentContext context) {

        serviceRegistration.unregister();

    }

 

    protected void setRegistryService(RegistryService registryService) {

        // The Maven SCR Plugin Needs These

    }

 

    protected void unsetRegistryService(RegistryService registryService) {

        // The Maven SCR Plugin Needs These

    }

}

- Now go to  REG_HOME/ samples/handler/src and do a maven clean install to build the project. Once done inside the target folder of the sample, you can find the updated jar file.

- Copy the jar  org.wso2.carbon.registry.samples.handler-4.5.3.jar in to  REG_HOME/repository/components/dropins folder.

- Go to REG_HOME/repository/conf/carbon.xml and change the offset value to 1 as follows.
- Now start the server and do registry operations such as adding WSDLs/Services /Resources and etc (these actions will create events from WSO2 G-Reg side and publish them to WSO2 BAM) and check the WSO2 BAM  Gadget Portal and you will find the operations you did published as events on WSO2 BAM dashboard.

For more info:visit WSO2 G-Reg wiki docs - http://docs.wso2.org/wiki/display/Governance453/WSO2+Governance+Registry+Documentation