Skip to content

Super Size EMF Fast Food Demo: Add Complex Event Processing (Part 2)

Where were we?

In the last post we considered the problem of the Integrated fast food management. In step 1, we designed the models and in step 2 we weaved them. As a result the application were developed just by weaving live-models. We finished the last post by asking ourselves how could we answer the Fast Food manager’s issues:  the boss wanted to control the burger cooking rate according to current demand. In addition, soon, he will ask to include other contextual parameters, such as the number of available seats, or the location of the truck which delivers salads and burgers.  How could we define a flexible, maintainable, and intelligent system? We proposed to use CEP concepts.

What the hell is the CEP?

CEP stands for Complex Event Processing, I will let you read the really good introduction given by Tim Bass from TIBCO in [1], then  browse [2,3] to get a good idea of what it can bring. Basically, the idea is that we can perceive the real-time current state of the world by listening all the events the world emits.  For instance, we could improve the airport schedule by taking into account in real time and in each airport: the petrol delivery, the existing delays, the weather forecast, information from airlines companies, etc. All those systems emit events, if we could collect all of them in a central backbone, we could process this “cloud of events” in order to recognize patterns. Once a pattern is recognized, it means that a particular situation has been achieved, and then, we can take action. That is an Event-Driven Architecture (EDA). Formal definition: “Complex Event Processing is the subject of processing data in an event-driven application. The idea is to have a central processor, the CEP engine, which will be given the task to handle every occurring event of the system in order to have a better view of that system, which in turn, takes actions. It is called complex because it’s purpose is to be able to recognize complicated patterns among events and correlate them in order to infer new knowledge. Any kind of relations between events could be used, in particular temporal relationships. Due to the goal of being able to take action, CEP are meant to process and analyse data in real-time. In order to ease the creation of such processors, CEP engines have emerged as to provide an efficient way of developing and deploying applications requiring complex event processing”.

Where are you going? Starting with live-models is OK, but what the hell is the point with CEP?

Well, we were asking ourselves how we could bring intelligence on our application. Actually the CEP paradigm is an elegant solution and suits well. Indeed, the weaver enables to listen model changes, let’s then sending all modification events on a collector. Therefore, we only need to analyze all of them and recognize some patterns such as “more than 3 burgers has been ordered within 2 mins”. That is a new paradigm in live-models, by the notification system of the weaver we are able to take decisions according to contextual information, in this case, the model states.

Step 3: Collecting Events

OK, so, the next step is then to collect events. The collector could be a simple pipe, list or something more complex such as Enterprise Service Bus, JMS queue, etc. In our case, let’s use directly the CEP flow entry without using any median such as a queue.

collector
Collecting modification event in a collector

The aim of the event collector is to collect all modification events from Orders. Notice that later on, we can add any other events which could give us a better view of the real time context.

Step 4: Inferring information from real-time data (the context)

The last step is to put in place a CEP which will analyze all notification events from Orders. In our case , when a notification about a burger arrives, we will have to check if we have received more than 3 burgers within the 2 mins. Then, we need to select all events received within this time frame and counting each quantity ordered for each event.

[sourcecode language='sql']
#Pattern definition select irstream sum(quantity) as totalQuantity from OrderBurgerEvent(type='superburger').win:time(2 min) having sum(quantity) > 3
[/sourcecode]

 

This rule is a pattern definition in ESPER, the open source CEP. Therefore, we are able to listen the event cloud and to infer that “the conditions are met” for increasing the burger rate.

Where are the snippets?

Oh, yes, we lack of code here ! So, let’s code all those things.

Weaving Order and Bill

First of all we need to see how we can weave the bill and order. Each time a new order is created, we create the corresponding Bill and we weave them each other. This means that the attribute burgerPriceTotal must be calculated each time an OrderEntry is added or removed from an Order. Therefore, we need a Weaving Process which listens to ADDED and REMOVED events of the burgerEntries Collection of the Order Object. Let’s see some code:

[sourcecode language='JAVA']
//Creatiion of Order
Order order = BurgerFactory.eInstance().createOrder();
Bill bill = BurgerFactory.eInstance().createBill();

// create one event for each case : add and remove in burgerEntries
ChangedEvent addEvent = ProcessFactory.eINSTANCE.createChangedEvent();
addEvent.setPath(“burgerEntries”);
addEvent.setEventType(EventType.ADD);
ChangedEvent removeEvent = ProcessFactory.eINSTANCE.createChangedEvent();
removeEvent.setPath(“burgerEntries”);
removeEvent.setEventType(EventType.REMOVE);

// create the activity processed by the event
command.setcommand (“platform:/plugin/burgerFactory/org.euranova.burgerfactory.handlers.ReCalculateBurgerPriceActivity”);
//Reference the Bill model that will be used in runtime
command.getParameterLocations ().put (Bill.class, bill);

// attach activity to event through sequence flows
SequenceFlow sequence = ProcessFactory.eINSTANCE.createSequenceFlow();
sequence.setSource(addEvent);
sequence.setTarget(command);
sequence.setSource(removeEvent);
sequence.setTarget(command);

// create the process
WeavingProcess wp = ProcessFactory.eINSTANCE.createBindingProcess();
wp.getElements().add(addEvent);
wp.getElements().add(removeEvent);
wp.getElements().add(command);
wp.getElements().add(sequence);

// create a weaver and activate it
Weaver w = new Weaver();
w.getProcesses().add(wp);
// set the model on which we attach the weaving process
w.getModels().add(Order);
w.getModels().add(bill);
w.activate();
[/sourcecode]

A weaving process is composed by sequences of commands, on each command we have to define an Activity, which could be JAVA, ATL or any other custom types. We set the Bill model in the context and as parameter of the command. This mechanism enables to re-use a model in several commands.
Now we will be notified in the AddBurgerEntryActivity that the burgerEntries collection has been modified, but what happens when we receive this event? We will get the Order on which the list has been modified. We also get the Bill we have registered as parameter of the first command of the weaving process. Therefore, we just re-calculate the price.

[sourcecode language='JAVA']
public class ReCalculateBurgerPriceActivity {
// we plan to replace HashMap<String, Object> by 'Context' class
public void execute (HashMap<String, Object> context) {
// the context contains the initial EMF notification
Notification notification = (Notification) context.get(CONTEXT_NOTIFICATION);
// that's the burger entry modified, now get the Order
BurgerEntry newBurgerEntry = (BurgerEntry)notification.getNewValue();
Order fromOrder = newBurgerEntry.getOrder();
// Get the bill to weave with the Order ID
Bill currentBill = context.getModels(Bill.getClass());
//Calculate the price on the burgerEntries
float price = calculateTotal(fromOrder);
//Set the buger price
currentBill.setBurgerPriceTotal(price);
}
}
[/sourcecode].

Collecting event for the CEP

Each time an Order is Validated and ready to be sent to the KitchenPlanning, we need to send the OrderEvent on the Event Collector or directly to the CEP engine. Let’s see how we can listen the TimeStamp attribute for Order and fire an OrderBurgerEvent on the CEP:

[sourcecode language='JAVA']
// create one event for each case : add and remove in burgerEntries
ChangedEvent setEvent = ProcessFactory.eINSTANCE.createChangedEvent();
setEvent.setPath("timeStamp");
setEvent.setEventType(EventType.SET);
ChangedEvent removeEvent = ProcessFactory.eINSTANCE.createChangedEvent();

// create the activity processed by the event
setCommand.setaddCommand (“platform:/plugin/burgerFactory/org.euranova.burgerfactory.handlers.OrderValidatedActivity”);

// attach activity to event through sequence flows
SequenceFlow asetSeq = ProcessFactory.eINSTANCE.createSequenceFlow();
setSeq.setSource(setEvent);
setSeq.setTarget(setCommand);

// create the process
WeavingProcess wp = ProcessFactory.eINSTANCE.createBindingProcess();
wp.getElements().add(setEvent);
wp.getElements().add(setCommand);
wp.getElements().add(setSeq);

// create a weaver and activate it
Weaver w = new Weaver();
w.getProcesses().add(wp);
// set the model on which we attach the weaving process
w.getModels().add(Order);
w.activate();

[/sourcecode]

 

Quiet the same as the code we saw before. Now what’s the content of the Activity:

[sourcecode language='JAVA']
public class OrderValidatedActivity{
// we plan to replace HashMap<String, Object>by 'Context' class
public void execute (HashMap<String, Object>context) {
// the context contains the initial EMF notification
Notification notification = (Notification) context.get(CONTEXT_NOTIFICATION);
// that's the burger entry modified
BurgerEntry newBurgerEntry = (BurgerEntry)notification.getNewValue();
//Fire the event on the CEP engine
epService.getEPRuntime().sendEvent(new OrderburgerEvent(newBurgerEntry));
}
}
[/sourcecode].

That’s all ! Now let’s what happens in the CEP engine side.

Complex Event Processing

We need to code an ESPER pattern and the actions to take when this pattern is matched. First, let’s start the CEP engine

[sourcecode language="JAVA"]
EPServiceProviderepService = EPServiceProviderManager.getDefaultProvider();
epService.getEPAdministrator().getConfiguration().addEventType("OrderBurgerEvent",OrderBurgerEvent.class.getName());
//Create the pattern and load it as a statement
EPStatement statement = epService.getEPAdministrator().createEPL("select irstream sum(quantity) as totalQuantity from OrderBurgerEvent(type='superburger').win:time(2 min) having sum(quantity) > 3");
//Add a listener which will be called when the pattern is matched
statement.addListener(new BurgerRateListener());
[/sourcecode]

Finally, let’s define our BurgerRate Listener:
[sourcecode language="JAVA"]
public class BurgerRateListener implements UpdateListener {
public void update(EventBean[] newEvents, EventBean[] oldEvents) {
//here we get all the events which compose the pattern
//in our case we have to increase the supersize burger cooking rate
KitchenManager.getSingleton().updateSuperSizeRate(3);
}
}
[/sourcecode]

Notice, that the system is :

  • flexible: rules can be modified and redeployed live
  • extensible: we could connect any other event sources for refining our vision of the context
  • completely Asynchronous, that’s all about event programming

 

Architecture considerations

In this post, we only focused on simple application on the same JVM for the clarity of the post. However a real application must be  distributed among clients. We could delegate the model distribution to live-model repository such as Eclipse CDO [4] in the EMF project.  Therefore, the cash desk application and the kitchen application are considered as CDO clients able to request and modify our live-models. Concerning the CEP, the only difference is that the engine must be launched in a separate application and then, requires a message backbone such a JMS Queue or an ESB to interconnect the client applications and the CEP.

Conclusion

A lot of things in these two last posts. We saw how we can create an application with live-models, and why we needed to weave them. First, we designed our models and defined how we should weave them at runtime.  We showed the necessity to listen model modifications in order to synchronize each other.  Since we are able to get modification events, why wouldn’t we be able to listen all event notifications, therefore, we are potentially able to perceive the context in real-time, then being able to react and take action accordingly. In order to detect “notable” situations, we re-use the inference engines used in CEP and we bring the Event-Driven Architecture value in live-model applications.

References

[1] What is Complex Event Processing, http://www.thecepblog.com/2007/05/14/what-is-complex-event-processing-part-1/

[2] Oracle CEP Project, http://www.oracle.com/technology/products/event-driven-architecture/complex-event-processing.html

[3]  ESPER, the Open Source CEP, http://esper.codehaus.org/

[4] The Eclipse CDO project, http://www.eclipse.org/cdo/


 

Releated Posts

Insights From Flink Forward 2024

In October, our CTO Sabri Skhiri attended the Flink Forward conference, held in Berlin, which marked the 10-year anniversary of Apache Flink.  This event brought together experts and enthusiasts in the
Read More

Internships 2025

You are looking for an internship in an intellectually-stimulating company? are fond of feedback and continuous personal development? want to participate in the development of solutions to address tomorrow’s challenges?
Read More