16. August 2016
With the increasing distribution of smart devices and sensor systems it is now possible to get data and context information of any element of the real-world. The Internet of Things (IoT) is synonym of this trend. Furthermore, it also becomes a social meaning because it affects all areas of everyday life. But of course this trend provides massive possibilities to improve our life, as well as our companies. So for example real-time insights into business processes are getting more important in recent times. With the right information and only short delays between certain incidents decision makers on operational level are able to quickly react and adapt changes to the processes. Thus, companies with in-depth knowledge of their processes have options to optimize their business. Also, they are able to offer new service levels for customers and increase earnings.
One domain where this necessity was already seen a few years ago, is the logistics sector. Since some years, most logistics providers monitor the location status of the goods. Logistics customers want to know where the goods are, but also what happened with them. For example, fragile or sensitive goods can be very expansive for manufacturers. If they are damaged and production stands still because they aren’t in stock anymore. For this reasons logistics providers have to collect more information about the goods on a more detailed level. To collect the GPS coordinates of trucks, airplanes or ships isn’t sufficient for this. Its needed to gather data on a packet level, so information about environmental conditions or movements for each shipment good independently.
This mass of information first must be handled. One option is to use data warehouse and business intelligence systems for saving and processing data. But this approach lacks in speed of action. Depending on the used technologies it can take hours until incidents and variances in processes are detected. Another technology for analysing streams of real-time data is Complex Event Processing (CEP). Having its roots on ECA (event-condition-action) rules of database systems, firstly they were developed to detect incidents in stock exchange trading. Due to emerging new data sources they were also used to handle also RFID or other sensor data. But traditional CEP engines like Esper, Drools or Tibco are not efficiently scalable to handle millions of events per second and simultaneously detect complex event patterns in this data.
Hence, with the rise of scalable stream-processing frameworks like Apache Storm, Apache Spark Streaming and Apache Flink the foundations for scalable complex event processing was laid. While Spark Streaming-based CEP engines retain the same performance limits due to the micro-batch processing of Apache Spark (e.g. Stratio Decision [2]), Storm-based frameworks need to involve a whole system stack (e.g. WSO2 CEP [3]). Now the Apache Flink CEP framework provides a library because “[…] its true streaming nature and its capabilities for low latency as well as high throughput stream processing is a natural fit for CEP workloads.”[4]
Leipzig University and some business partners explored this research topic in the LogiLEIT [5] project. LogiLEIT is funded by the german federal ministry of education and research. In cooperation with ScaDS, a scalable approach based on big data technologies was developed to enhance the current capabilities. In this blog post, we will explain to use Apache Flink CEP for real-time logistics monitoring.
The underlying data is measured via smartphone sensors which are affixed to packages. Every 15 seconds, the mobile devices send their current status information “back home”. The sensor events contain environmental information, such as:
as well as data about the sensors itself, e.g.:
The data is send via a socket connection and looks like this in JSON format:
{"_id":"00c4b0-4671359aab5c","latitude":51.39712,"longitude":12.3595,"orientation_l_r":116.648438,"accuracy":9,"bearing":0,"provider":"gps","speed":0,"shipment_number":"BEAVER"}
{"_id":"00c891-1d6dccb2f195","latitude":51.38093,"longitude":12.3762,"orientation_l_r":17.11562,"accuracy":5,"bearing":0,"provider":"gps","speed":0.5,"shipment_number":"BEAVER"}
{"_id":"00cadd-161ae86fbfd1","latitude":51.33808,"longitude":12.3765,"orientation_l_r":167,"accuracy":13,"bearing":16.2,"provider":"gps","speed":0.5,"shipment_number":"BEAVER"}
{"_id":"010359-6521124406ca","latitude":53.54148,"longitude":9.9921,"orientation_l_r":52,"accuracy":16,"bearing":0,"provider":"gps","speed":0,"shipment_number":"EAGLE-1"}
As Flink provides connectors to different messaging systems (e.g. Kafka, RabbitMQ), data streams send on these systems can processed as well.
Before starting with data processing and monitoring a connection to the data stream has to set up. In this example it is a TCP socket connection on port 9999. Like in every Flink application an ExecutionEnvironment has to be initialized.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.enableCheckpointing(15000, CheckpointingMode.EXACTLY_ONCE);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
SingleOutputStreamOperator<SensorEvent> eventStream = env.socketTextStream("localhost", 9999, '\n').flatMap(new splitMap());
In this example the degree of parallelism is set to 4, which means that all operators, data sources and data sinks are distributed on four nodes in the cluster. In line 3 checkpoints are enabled and created every second. The mode guarantees that every data element is processed exactly one time. In line 4 the characteristic for the internal event time is defined. This configuration parameter is needed to give every incoming data stream element a timestamp e.g. for window computations. Since no timestamps are provided in the data stream the TimeCharacteristic is set to ProcessingTime, so each data element gets the system time of the node which first processes the element.
Furthermore, this mode is most efficient concerning performance and latency opposed to the IngestionTime or EventTime modes. In line 5 lastly the connection to the streaming server on localhost, port 9999 is established and a flatMap function is called which transform the incoming JSON events into SensorEvent POJOs.
For logistics providers as well as for recipients of important and fragile goods it’s very important to detect if a package was thrown or dropped. In case of a quickly detected damage of an indispensable good, this part can be reordered and the downtime can be reduced.
To observe our data stream on a certain pattern it can be necessary to define an appropriate partition strategy. This can be done via physically partition the data on a low-level with a user-defined strategy, or to use the higher level keyBy operator for a logical separation into disjoint groups of data elements. Result is a KeyedDataStream which is also necessary to build data windows (see later) or to use the aggregation functions. To detect an incident within one package the shipment_number attribute is used for partitioning, as this attribute serves as a unique identifier for each parcel.
The Pattern API is used to describe the event patterns the data stream has to be observed. Each event pattern consists of different stages; each has its own conditions. The order of the stages defines the processing order. Each stage must have a unique name for identification. The pattern for the detection of dropped packages looks like this. First, a stage named orientationEvent is defined, which also describes the begin of the event pattern (line 2). The where-clause in the next line determines the event conditions needed to accomplish the first state of the pattern. For simplification it is assumed that a dropped package is characterized by a value > 100 of the orientation_l_r attribute4.
Very important for CEP application is the definition of event sequences, e.g. to describe that event A has to be followed by event B. the followedBy function states that the first stage of our pattern has to be followed by a second stage and appends a stage called accelerationEvent. The followedBy operator has the characteristic that other events can occur between events. There is also a next operator which implies that event B has to directly succeed the previous event A and no other events are between them. The second part of the pattern states that the second characteristic of a dropped package is acceleration. So the event condition for this part is that acceleration is higher than 10. The within-part of the pattern definition determines the time window in which the events have to occur.
KeyedStream< SensorEvent, Tuple> keyedData = eventData.keyBy("shipment_number");
Pattern< SensorEvent, ?> dropPattern = Pattern.< SensorEvent >begin("orientationEvent").where(new FilterFunction< SensorEvent >() {
public boolean filter(SensorEvent orientationEvent) throws Exception {
return orientationEvent.getOrientation_l_r() > 100.0;
}
}).followedBy("accelerationEvent").where(new FilterFunction< SensorEvent >() {
public boolean filter(SensorEvent accelerationEvent) throws Exception {
return accelerationEvent.getAcceleration() > 10.0;
}
}).within(Time.seconds(3));
In the last part of this simple CEP application the observing data stream and the described event pattern are put together. Then, if the event pattern occurred in the data stream a PatternSelectFunction is used to extract the values which triggered the event and to throw a warning or initiate another action, e.g. launch another application or send a message to a system.
PatternStream<SensorEvent> FlowFirstPatternStream = CEP.pattern(keyedData, dropPattern);
DataStream<String> warning = FlowFirstPatternStream.select(new PatternSelectFunction<SensorEvent, String>() {
public String select(Map<String, SensorEvent > pattern) throws Exception {
ShipmentEvent orientEvent = pattern.get("orientationEvent");
ShipmentEvent accelEvent = pattern.get("accelerationEvent");
if (orientEvent.getShipment_number().equals(accelEvent.getShipment_number())) {
return "Shipment : " + orientEvent.getShipment_number() + " was dropped!";
}
}
});
warning.print();
Another important part of event patterns is the observation of sensor values on deviations compared with certain thresholds. For that aggregations on time windows have to be applied. Because not all aggregation functions of the Batch API are usable in the Streaming API a fold-operator is applied to compute the average value of a sensor value within a time window. The fold function comes from higher-order functions of functional programming. It is started with an initial value and applies for every new data element in the stream the corresponding processing steps. In this case it is used for sum up the values and counting the number of orientation events in a time window.
Both values are then used for computing an average value of the orientation attribute which then can be used for identifying events where e.g. the orientation value changed within the last 10 seconds by 20 percent. The fold stream contains elements for the shipment_number (for identification), then for the current orientation value and the computed values. The function returns a new SensorEvent which contain the attributes mentioned previously.
private static class AvgFold implements FoldFunction<SensorEvent, Tuple5<String, Double, Double, Double, Double>> {
public Tuple5<String, Double, Double, Double, Double> fold(Tuple5<String, Double, Double, Double, Double> total, SensorEvent current) throws Exception {
double count = total.f4 + 1.0d;
double sum = total.f3 + Double.parseDouble(current. orientation);
double avg = sum / count;
return new Tuple5<String, Double, Double, Double, Double> (current.shipment_number, Double.parseDouble(current.orientation), sum, count, avg);
}
}
This fold is applied on a keyed window to process stream elements of the same package together. And a sliding window of 10 seconds is used, which uses the processing time of the elements for timestamps. A characteristic of the fold is that it can be initialized with voluntary values. In the code example the event pattern from above is adapted with the fold function and the event pattern is changed in order that it triggers events, where the current orientation value is more than 20 percent higher than the aggregated average value of the time window.
SingleOutputStreamOperator< Tuple5<String, Double, Double, Double, Double >> aggregatedEventStream = eventData
.keyBy("shipment_number")
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.fold(new Tuple5<String, Double, Double, Double, Double >("", 0.0d, 0.0d, 0.0d, 0.0d), new AvgFold());
Pattern< Tuple5<String, Double, Double, Double, Double >, ?> examplePattern = Pattern.< Tuple5<String, Double, Double, Double, Double >>begin("orientationEvent")
.where(new FilterFunction< Tuple5<String, Double, Double, Double, Double >>(){
public boolean filter(Tuple5<String, Double, Double, Double, Double > event) throws Exception {
return event.f1 > event.f4*1.20;
}
}). followedBy("accelerationEvent").where(new FilterFunction< SensorEvent >() {
public boolean filter(SensorEvent accelerationEvent) throws Exception {
return accelerationEvent.getAcceleration() > 10.0;
}
})
.within(Time.seconds(20));
With Apache Flink CEP the first native library for complex event processing in scalable data processing frameworks was introduced. Due to its young history (the first version was released in April 2016) some functional aspects are still open. So a comparison with established CEP software like Esper or Drools is kind of unfair. In future, the Apache Flink CEP library will be extended and first results are already available. Since version 1.1.0 a Scala API for CEP is released which allows developer friendly decsription of event patterns.
[1] http://blogs-images.forbes.com/jacobmorgan/files/2014/05/libelium_smart_world_infographic_big.png
[2] https://stratio.atlassian.net/wiki/display/DECISION0x9/Home
[3] http://wso2.com/products/complex-event-processor
[4] http://flink.apache.org/news/2016/04/06/cep-monitoring.html
[5] http://www.logileit.de/