Sunday, June 30, 2013

Prepare your Event Driven, Runtime Governance Platform POC in 30 minutes: WSO2 way


Prepare your Event Driven Runtime Governance platform POC to your management in 30 minutes.

This post will show you on how to achieve Runtime Governance in an Event Driven way using WSO2 Carbon Platform.

 Here I will be explaining how to achieve this using two key WSO2 products.
 - WSO2 Governance Registry (G-Reg)
 - WSO2 Complex Event Processing Server. (CEP)

 G-Reg is the governance registry and repository product for WSO2 while CEP will take care of processing complex events in high throughput.

 Pre requisits
- Download WSO2 G-Reg 4.5.3 [1].
- Download WSO2 CEP 2.1.0 [2].

 What is our POC.

 - I have an API metadata artifact created in GReg side. (for more information on what is an artifact[3])
 - It has a LifeCycle that has Development,Testing and Production states.
 - If any API comes to the Production state, and if the production API gets updated frequently, demote the API to Testing state. ( In this case, if API is getting updated more than 3 times within an 2 minutes I am demoting the API back to Testing state. Which means it should be tested more.

 NOTE: This use case is just a simple use case on how to achieve this.This might not be a ideal practical governance example.

 How it works:

 #1. G-Reg publishes all registry events to CEP.( through Apache Thrift protocol).
 #2. In CEP, a query is waiting for a pattern/condition based on the events decides what to do.
 #3. When CEP fires an event, it calls to G-Reg remotely and demote the API artifact from Staging state to Testing state.



 Lets see how to achieve above 3 steps in detail.

 Step #1

- Navigate to GREG_HOME/ samples/handler/src to find the source code of the Handler Sample.
- Add the following dependencies to your POM file:

    org.wso2.carbon
    org.wso2.carbon.databridge.agent.thrift
    4.0.1


    org.wso2.carbon
    org.wso2.carbon.databridge.commons
    4.0.0

- Comment-out the following exclusions in your POM file:


- Add the following plugin to your POM file:

    org.apache.felix
    maven-scr-plugin

- Add a new Java Class named StatisticsCollectorServiceComponent at GREG_HOME/samples/handler/src/src/main/java/org/wso2/carbon/registry/samples/statistics/StatisticsCollectorServiceComponent.java with the following source:
package org.wso2.carbon.registry.samples.statistics;
  
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.ComponentContext;
import org.wso2.carbon.context.CarbonContext;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.databridge.agent.thrift.Agent;
import org.wso2.carbon.databridge.agent.thrift.DataPublisher;
import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.exception.NoStreamDefinitionExistException;
import org.wso2.carbon.registry.core.service.RegistryService;
import org.wso2.carbon.registry.core.statistics.StatisticsCollector;
import org.wso2.carbon.registry.core.utils.RegistryUtils;
import org.wso2.carbon.registry.samples.statistics.util.ArtifactUtil;
import org.wso2.carbon.utils.NetworkUtils;
  
/**
 * @scr.component name="org.wso2.carbon.registry.samples.statistics" immediate="true"
 * @scr.reference name="registry.service" interface="org.wso2.carbon.registry.core.service.RegistryService"
 * cardinality="1..1" policy="dynamic" bind="setRegistryService" unbind="unsetRegistryService"
 */
public class StatisticsCollectorServiceComponent {
  
    public static final String REGISTRY_ACTIVITY_STREAM = "govstream1";
    public static final String VERSION = "1.0.0";
    private RegistryService registryService;
  
    private ServiceRegistration serviceRegistration;
  
    protected void activate(ComponentContext context) {
        serviceRegistration = context.getBundleContext().registerService(
                StatisticsCollector.class.getName(), new StatisticsCollector() {
            public void collect(Object... objects) {
                try {
                    // Create Data Publisher
                    RegistryUtils.setTrustStoreSystemProperties();
                    DataPublisher dataPublisher = new DataPublisher(
                            "tcp://" + NetworkUtils.getLocalHostname() + ":7612", "admin", "admin",
                            new Agent(new AgentConfiguration()));
  
                    // Find Data Stream
                    String streamId;
                    try {
                        streamId = dataPublisher.findStream(REGISTRY_ACTIVITY_STREAM, VERSION);
                    } catch (NoStreamDefinitionExistException ignored) {
                        streamId = dataPublisher.defineStream("{" +
                                "  'name':'" + REGISTRY_ACTIVITY_STREAM + "'," +
                                "  'version':'" + VERSION + "'," +
                                "  'nickName': 'Registry_Activity'," +
                                "  'description': 'Registry Activities'," +
                                "  'metaData':[" +
                                "          {'name':'clientType','type':'STRING'}" +
                                "  ]," +
                                "  'payloadData':[" +
                                "          {'name':'operation','type':'STRING'}," +
                                "          {'name':'user','type':'STRING'}," +
                                "          {'name':'rpath','type':'STRING'}" +
                                "  ]" +
                                "}");
                    }
  
                    if (!streamId.isEmpty() && objects.length>=2) {
                        // Publish Event to Stream
                        Event event = new Event(
                                streamId, System.currentTimeMillis(),
                                new Object[]{"external"}, null,
                                new Object[]{
                                ArtifactUtil.isUpdate(objects),
                                ArtifactUtil.getLCState(registryService.getRegistry(CarbonContext.getCurrentContext().getUsername(), PrivilegedCarbonContext.getCurrentContext().getTenantId()),ArtifactUtil.getArtifactAttribute(objects[2] != null?objects[2].toString():"nil","name")),
                                ArtifactUtil.getAPIResourcePath(objects[2] != null?objects[2].toString():"nil",objects)
                                }
                        );
                        dataPublisher.publish(event);
                        dataPublisher.stop();
                        System.out.println("Successfully Published Event");
                    }
  
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, null);
    }
  
    protected void deactivate(ComponentContext context) {
        serviceRegistration.unregister();
    }
  
    protected void setRegistryService(RegistryService registryService) {
        this.registryService=registryService;
        // The Maven SCR Plugin Needs These

    }
  
    protected void unsetRegistryService(RegistryService registryService) {
        this.registryService=null;
        // The Maven SCR Plugin Needs These
    }
}

- Compile the source code by running the following command inside GREG_HOME/ samples/handler/src: mvn clean install. And you will get the built jar inside target folder.
 - Now copy the to GREG_HOME/repository/component/plugins folder.
 - Start the G-Reg server.

 Step #2.

 - Now we have to define a rule in CEP server which will listen to our pattern mentioned in the POC scenario. For that in CEP we can configure something called CEP bucket which defines a query to listens to events and fire events when query is satisfied.

Following is the bucket confuguration xml file.
 - Create a file called GovernanceAnalizer.xml with the following xml content.
 - Copy the file inside CEP_HOME/repository/deployment/server/cepbuckets folder

  testsss

  
    0
    false
  

  
    
      
      
      
    
  

  
    from govstream1[operation=='update' and user=='Production']#window.timeBatch( 1 min )
                         insert into outStream2 count(operation) as countProp ,user,rpath having countProp==3;
    
      
        
        
        
          
          
          
        
      
    
  


....................................Description of the cep bucket configuration.............................................

- "engineProviderConfiguration" element describes the CEP runtime engine details. This has nothing related to the flow we have, but some static configs.

- We define a broker to listen to the events send by G-Reg.
   
  
    
      
      
      
    
  
Here this configuration is defined to match with the event schema sent from the G-Reg side. In this G-Reg sends events in an stream called "govstream1" with the version "1.0.0" which has three inputs in the payload named "operation", "user" and "rpath". Now we define the CEP query [4].
from govstream1[operation=='update' and user=='Production']#window.timeBatch( 1 min )
                         insert into outStream2 count(operation) as countProp ,user,rpath having countProp==3; 

This means listen to events whose operation attribute value is 'update' and user attribute value is 'Production' with in a 2 minutes sliding time window and fire and event if 3 'update' operations comes within that time window.

Finally we define the outout stream as follows where you can do further processing in the call back method. Here we define a particular broker called "governance_agent_broker" who is a custom broker created specifically for this case which will remotely call G-Reg and demote the API artifact.

      
        
        
        
          
          
          
        
      
    
..................................End of Description of the CEP bucket configuration............................

 Step #3 

This step will call G-Reg remotely and demote API artifact from Production state to Testing. This is where we define a custom event broker named "governance_agent_broker" as explained in the description.

Please follow [5] on how to cretae a custom event broker. There what you have to do is write a cutom broker class and a corresponding factory class and deploy them. Please use following Custom Broker class and Factory class for this case.

public class GovernanceBroker implements BrokerType {

    public BrokerTypeDto getBrokerTypeDto() {
        BrokerTypeDto brokerTypeDto = new BrokerTypeDto();
        brokerTypeDto.setName("GovernanceAgent");
        return brokerTypeDto ;
    }

    public String subscribe(String s, BrokerListener brokerListener, BrokerConfiguration brokerConfiguration, AxisConfiguration axisConfiguration) throws BrokerEventProcessingException {
        return null;  //To change body of implemented methods use File | Settings | File Templates.
    }

    public void publish(String s, Object o, BrokerConfiguration brokerConfiguration) throws BrokerEventProcessingException {
        Event event = (Event) ((Object[]) o)[0];
       Object [] objects = event.getPayloadData();
       String operation= objects[0].toString();
        String user= objects[1].toString();
        String path= objects[2].toString();

         for(Object oo:objects){
           if(oo != null){
               System.out.println("+++"+oo.toString());
           }
         }

        String CARBON_HOME = "PATH TO CEP HOME";
        String username = "admin";
        String password = "admin";
        String serverURL = "https://localhost:9443/registry";
        String axis2Conf = ServerConfiguration.getInstance().getFirstProperty("Axis2Config.clientAxis2XmlLocation");
        String axis2Repo = CARBON_HOME + File.separator + "repository" +
                File.separator + "deployment" + File.separator + "client";
        System.setProperty("javax.net.ssl.trustStore", CARBON_HOME + File.separator + "repository" +
                File.separator + "resources" + File.separator + "security" + File.separator +
                "wso2carbon.jks");
        System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
        System.setProperty("javax.net.ssl.trustStoreType", "JKS");
        System.setProperty("carbon.repo.write.mode", "true");


        try {
            RemoteRegistry registry = new RemoteRegistry(new URL(serverURL), username,password);
            registry.invokeAspect(path,"SampleLifeCycle","Demote");
            System.out.println("Successfully Demoted the API at "+path);
        } catch (RegistryException e) {
            System.out.println("ERR" +e.getMessage());
        } catch (MalformedURLException e) {
            System.out.println("ERR" +e.getMessage());
        }

    }

    public void testConnection(BrokerConfiguration brokerConfiguration) throws BrokerEventProcessingException {
        //To change body of implemented methods use File | Settings | File Templates.
    }

    public void unsubscribe(String s, BrokerConfiguration brokerConfiguration, AxisConfiguration axisConfiguration, String s1) throws BrokerEventProcessingException {
        //To change body of implemented methods use File | Settings | File Templates.
    }
}

public class GovernanceBrokerFactoryImpl implements BrokerTypeFactory {

    public BrokerType getBrokerType() {
        return new GovernanceBroker();
    }
}
After Step 3 is completed we are all done with our POC.

Now open CEP_HOME/repository/conf/carbon.xml and set the value to 1 and start the CEP server. 

Lets test this.

 - Login to G-Reg management console https://localhost:9443/carbon/ with admin, admin as username and password.

- Goto Home > Extensions > Configure > Lifecycles. Click Add new Lifecycle and you will see a configuration xml file. Just click on save without doing anything. Now you have created a life cycle called "SampleLifecycle"

 - Goto Home > Metadata > Add > API and create and API by filling some dummy meta data values. enough to only fill the required fields.

 - Now attach the previouly creatd "SimpleLifycycle" to the API we just saved . Refer [6] on how to achieve this in more details.

 - Once a Lifecycle is attached to the API, you will see it is in Development state. Now click "promote" to move it to "Testing" and do the same to push it to "Production".

 - Once it is in Production, this is our scenario. If I edit the API artifact (which is now in Production state) 3 times, you will see that it will automatically demoted back to "Testing" state. The magic is done through CEP ;-).


And if you are further interested in extending this to a wider integrated platform, following is a better deployment, where you have a Business Activity Monitor to monitor and analyze all the business events.



That's it. The post is lengthy since I am explaining this in more details for your understanding.



[1] - http://wso2.com/products/governance-registry
[2] - http://wso2.com/products/complex-event-processor
[3] - http://docs.wso2.org/wiki/display/Governance453/Governance+Artifacts.
[4] - http://docs.wso2.org/wiki/display/CEP210/Queries
[5] - http://docs.wso2.org/wiki/display/CEP210/Writing+Custom+Broker
[6] - http://docs.wso2.org/wiki/display/Governance453/Managing+Lifecycle



No comments:

Post a Comment