27. October 2016

Transfer and Service

At the 2nd International ScaDS Summer School on Big Data we offered a couple of workshops with the aim to provide an introduction into the three Big Data technologies MongoDB, Flink and Gelly. This post is an extension of the Gelly tutorial to demonstrate the new feature of Gelly: the Vertex-Centric Iteration or Pregel Iteration. Find out which child is getting the largest amount of candies in our **Halloween-Special** of Trick-or-treat…

Three months after the 2nd International ScaDS Summer school on Big Data, a new stable version of Flink is available. The version 1.1 involves many bug fixes and new features. Along with the new Flink version, also a new Gelly version was released. In this blog post, we will highlight and demonstrate a new feature of the new Gelly version: the Pregel iteration or how Gelly it calls the Vertex-Centric Iteration. It is the third iteration that can be used in Gelly and more general model then the two older types of iterations. The idea was already published in the year 2010 as Pregel: a system for large-scale graph processing, many algorithm are published using this iteration model.

We will explain the Pregel iteration using a halloween example. We first introduce the story of our example and continue afterwards with the implementation in Gelly. Note that only the code relevant for the Pregel iteration is shown but not how to use Flink and Gelly in general. However, the runnable code of the tutorial can be found at the end of the blog post as download.

In the small village ScaDS in the eastern Germany live only 23 families. Most of them (20) live on the main road that is a circle. Only three families live in the middle of the village. Each of them has a own road that leads to the house. This three families are also the only one that have children.

Each year on halloween, the children of this families go on a Trick-or-treat tour around the main road (anti-clockwise) and try to get the most candies among all children.

This year every children have a different strategy to get the most candy. The strategies are:

- The first child live in the house with the number 101. The child’s strategy is to go to each House and get one candy if there is any candy left.
- The second child live in the House with the number 102. His strategy is to go to every second house. This takes double the time then the first child needs to go to one house. However, due of the longer way it looks more hungry and gets two candies at the same time.
- The third child live in the House wit the number 103. Her strategy is to go to a house and beg long enough to get one third of all candies that are left in this house. This begging takes double the time that the first child needs to go to a house and get the candy.

Beside this different strategies the neighbors have also different amount of candy in their houses.

House | Amount of Candies | House | Amount of Candies | |

1 | 10 | 11 | 3 | |

2 | 6 | 12 | 7 | |

3 | 8 | 13 | 3 | |

4 | 5 | 14 | 1 | |

5 | 9 | 15 | 7 | |

6 | 1 | 16 | 14 | |

7 | 6 | 17 | 0 | |

8 | 3 | 18 | 2 | |

9 | 0 | 19 | 3 | |

10 | 9 | 20 | 10 |

Who is getting the largest amount of candies if they all Trick-or-treat as long as the first child needs for one/two/three tours around the main road?

The trick-and-treat problem can be easily translated into a graph problem. Every house is a vertex and edges indicate neighborhood of the houses.

Each vertex has a value which is a positive number. This value is called candy and shows how many candies are in stock at this house.

The starting values are:

Vertex ID | Value (Candy) | Vertex ID | Value (Candy) | |

1 | 10 | 11 | 3 | |

2 | 6 | 12 | 7 | |

3 | 8 | 13 | 3 | |

4 | 5 | 14 | 1 | |

5 | 9 | 15 | 7 | |

6 | 1 | 16 | 14 | |

7 | 6 | 17 | 0 | |

8 | 3 | 18 | 2 | |

9 | 0 | 19 | 3 | |

10 | 9 | 20 | 10 |

Starting with the vertices 101,102, and 103, the graph is traversed with three different strategies corresponding to the three different trick-and-treat strategies of the children:

- Start at vertex 101. In one time step, go to the next vertex and collect one candy.
- Start at vertex 102. In one time step, go to the next vertex and collect two candy. In the next time step, go to the next vertex and collect no candy.
- Start at vertex 103. In one time step, go to the next vertex start bagging. In the next time step, stay at the same vertex and collect one third of the candy of the vertex.

When candies are collected, the value is saved in the own house.

Which of the vertices 101,102, and 103 has the most candies after 20, 40, and 60 time steps?

The Vertex-Centric iteration is the most general iteration of the three graph iteration in Gelly. It has an arbitrary update logic, communication scope and communication logic. Only one class must be implemented. Thus, the complete logic of the algorithm is in this one class and not in two or three classes like in the other graph iterations. Note that there is a possibility to implement a second class that combine messages between the vertices and so reduce the communication overhead. However, for this tutorial, we will use Vertex-Centric iteration implemented in only one class.

Like all iterations in Gelly, the iteration is performed in super steps. This steps make sure that every vertex has received all messages of the last super step, has computed all information and has send out all messages.

The abstract class that has to be extended is the ComputeFunction having four generics. This are the Vertex ID class, the Vertex value class, the Edge value class, and the message class. Our iteration class then must implement the method compute(vertex, messages) that is called for every vertex in every iteration step. In the variable messages, all messages are contained that the vertex receive in this step. The variable messages is iterable.

An example is given in documentation of Flink. It computes the shortest part from all vertices to one source vertex.

`1 `**public static final class** SSSPComputeFunction **extends** ComputeFunction<Long, Double, Double, Double> {

2 **public void** compute(Vertex<Long, Double> vertex, MessageIterator messages) {

3 **double** minDistance = (vertex.getId().equals(srcId)) **?** 0d **:** Double.POSITIVE_INFINITY;

4 **for** (Double msg : messages) {

5 minDistance = Math.min(minDistance, msg);

6 }

7 **if** (minDistance < vertex.getValue()) {

8 setNewVertexValue(minDistance);

9 **for** (Edge<Long, Double> e: getEdges()) {

10 sendMessageTo(e.getTarget(), minDistance + e.getValue());

11 }

12 }

13}

In this example, it first checks if the vertex is the source vertex (line 3). Then it iterates over the incoming messages (line 4-6) and checks if the message contains a new shortest distance to the source (line 5). If the new distance is shorter then before (line 7-11), the new distance is save in the vertex (line 8). Then new messages are send out to the neighbors containing the new distances (line 9-11).

This example also shows three important functions that can be used in the compute function:

- The first is the
**setNewVertexValue**function. This function changes the vertex value and can only be called once. - The second function is the
**getEdges**function that return all outgoing edges of the vertex (can be configured to incoming or all edges). Also this function can be only called once. - The third function is the
**sendMessageTo**function. This function sends the messages around the graph. It has two parameters. The first one is the id of the vertex that the message is sent to and the second is the message.

The Halloween Algorithm is a Vertex-Centric Iteration that works on a graph introduced above. Each super step of the iteration is one time step of the trick-and-treat tour.

Since iterations work by passing messages from vertex to vertex, children are implemented as messages having three values:

- The strategy that the child has.
- The start vertex of this child such that the collected candies can be send to the vertex.
- An inner state of the child only necessary for strategy two and three.

These messages are not the only messages that are sent. Also the collected candies are sent to the home vertex. Since only one type of messages can be used in the iterations, the candy massages also have three values. However, only two are used. The first value marks it as a candy message and the last value to save the value of candy that is sent.

Our iteration need an initialization phase such that the children can start to traverse the graph. To avoid that in every step a new children start to travers the graph, an unique vertex value is used at the beginning of the iteration. It has furthermore benefit that for every house where a children starts the strategy can be also saved in this unique value. In the iteration, all vertex values are larger or equal zero. Therefore,a start value that is smaller then zero is a unique condition. A value of -1 shows that a child with strategy one starts here. The same hold for -2 and -3 for strategy two and three, respectively. The initialization overwrites the negative values after there recognition with zero to make the vertices ready for the collected candies.

For the algorithm the definition of candy is also an interesting part. In the form of the story one candy sounds like an undividable element. Which would mean that by the calculation of one third some rounding is necessary. However, it is also possible to assume that one candy is dividable and you then get only one third of some candy. This second variant is easier to write down in the program because that reduce many calculations down to simple minimum decisions. So this variant is used here and a double is used for the candy.

The program needs only a very simple main method. It has basically three steps:

- Create the graph.
- Start the iteration.
- Print the result.

For the first step, the graph could be directly created in java. However, it is more easier to read the vertex and edge definitions from a prepared file. Gelly provides the possibility to read comma-separated values (CSV) files. . For the tutorial, there are two files defining the village as a graph:

- Vertex definition in the format: <vertex id>,<candy amount>. Download it here!
- Edge definitions in the format: <source vertex id>,<sink vertex id>. Download it here!

The parser need to know the type (i.e. class) of the vertex ID and the vertex value (candy amount). In our case, we use an Integer (house number) for the ID and a Double for the amount of candy. Note that due to the use of generic types, the types have to be defined as classes. Thus, we use the wrapper classes for int and double instead of the primitive types.

In the second step, the iteration is startet. The iteration function is initialized with three parameters. First, the class that implements Halloween algorithm. The implementation will be demonstrated at the next sites. The second parameter would be a message combiner, i.e. a class that reduce the communication overhead by reducing the number of messages. However, for simplicity, we do not use a message combiner in this tutorial. To indicate this, we specifiy **null** as second parameter. The last parameter then is the number of super steps that are done for the iteration. In our example, the number of super steps corresponds roughly to the number of time steps.

For example, in the first run, the number of super steps will be 20+1+1=22: 20 because that is the number of time steps for the first run. One extra step for the initialization and one extra step because the messages from the house that is visited the last must be received by the homes. Note that in most algorithm the termination is that no messages are send in one super step and the number of super steps is only the worst case termination.

The third and last step prints the vertices of the graph with their value. Thus, the values of vertices 101,102, and 103 will hold the amount of candies collected with the three trick-and-treat strategy. The house with the highest values therefore corresponds to the best trick-and-treat strategy. The complete main method is show here:

**public static void** main(String[] args ) **throws** Exception {
//number of time steps
**int **numberOfSteps = 20;
//set up the execution environment
**final** ExecutionEnvironment env = ExecutionEnvironment.*getExecutionEnvironment*();
//Read the graph from the given CSV files
Graph<Integer,Double, NullValue> graph = Graph.*fromCsvReader*("csv/vertices.csv", "csv/edges.csv", env).vertexTypes(Integer.**class**, Double.**class**);
//Run the Vertex-Centric Iteration to get the collected candies out of the graph.
Graph<Integer, Double, NullValue>result = graph.runVertexCentricIteration(**new** HalloweenComputeFunction(), **null**, 2+numberOfSteps);
//Print the result of the iteration
result.getVertices().print();
}

Both csv files are in an folder “csv”. If you path is different in your project, this must be adopted to the actual path to the csv files. Next, we will implement the class HalloweenComputeFunction.

The HalloweenComputeFunction is the class which implements the trick-and-treat tour..Therefore, it has to extend the ComputeFunction and has to define the types of the following four generic variables:

- The vertex id: Integer (wil be our house numbers)
- The vertex value: Double (will be the amount of candies in the house)
- The edge value: NullValue (we do not use edge values in this tutorial)
- The class of the messages: Tuple3<Integer, Integer, Double>(see below for a description)

The messages type need some further explanations. Tuples are specialized types defined by Flink. For each cardinality, one type is defined. For example, a Tuple3 is simple a tuple of three elements. The generic types of the Tuple indicate the types of the fields. In the case of HalloweenComputeFunction, a Tuple3 is used and we define the Halloween message as follows:

- The number of the child/strategy: Integer
- The home vertex of the child (message): Integer
- The inner state of the child/ The amount of candies it sends back home: Double

The compute function that contains the logic of the class has analogously typed variables as input. A Gelly class MessageIterator is used to do the iteration over incoming messages.

First, we need to decide if this function is called in the initialization step or if it is part of the ongoing iteration. This can simple done by a check of the vertex value. Rememeber that the initialization is marked with negative vertex values. In the case that the vertex value is negative the initialization is started. Otherwise, the for a iteration step can run.

In the following, the skeleton of the class HalloweenComputeFuntion is shown:

**private static final class** HalloweenComputeFunction **extends** ComputeFunction<Integer, Double, NullValue, Tuple3<Integer, Integer, Double>> {

*@Override*

**public void** compute(Vertex<Integer, Double> vertex, MessageIterator<Tuple3<Integer, Integer,Double>> messages) **throws** Exception {

//children start to walk

**if**(vertex.getValue()<0) {

...

}

//children are walking around

**else** {

...

}

}

}

Note that the “**…**” stands for code that we will implement in the next steps.

The first gap of the code skeleton is the initialization of the iteration. Two task have to be performed:

- Set the vertex value to zero
- Send the children to the neighbors to collect the candies.

The super step is finished after the initialization so that the function **setNewVertexValue** and **getEdges** can be direct called. Therefore, the first step simply calls the **setNewVertexValue** with argument ‘0’. Note that the function needs a double as input. Thus, the argument is specified as ‘**0D**‘ where the D indicates a zero of type double.

The second task is also straight forward. It is iterates over the outgoing edges with the **getEdges** function and the messages are send out with the **sendMessageTo** function. Messages are created using **Tuple3.of** which is a static method to create a new Tuple3 with the given values and resolve the right Gernics by the values. The values are:

- The strategy which is the absolute value of the vertex value as an integer. It is calculated with the Math.abs function and a cast to int
- The vertex id of this vertex that can retrieved from the vertex.
- The inner state as double. At the beginning, it is zero for all three strategies.

The source code for the initialization looks as follows:

**if**(vertex.getValue()<0) {

//Set the collected candy to zero

setNewVertexValue(0D);

//Send out to the neighbors.

//Field one the the strategies of the children

//Field two the id where the children started

//Field three the zero to show that they started.

**for** (Edge<Integer,NullValue> e: getEdges()) {

sendMessageTo(e.getTarget(),Tuple3.of((**int**)Math.abs(vertex.getValue()), vertex.getId(), 0D));

}

}

Note that a message is send to all neighbors that are connected with an outgoing edge. Since each vertex has at most on successor, at most one message is send in each step. However, on an arbitrary village graph structure with more than one successor, this would result in sending out more than one message in each step. In other words, in each step, more than one child for each home node would be send out for a trick-and-treat tour.

Also note that the **setNewVertexValue** function does not changes the value of the current copy but of the global instance of the vertex. So that the call of **vertex.getValue** still retrieves the old value. Also a change on the value of the current copy have no influence on the global instance.

The last gap in the skeleton of the compute function we have to implement is the trick-and-treating. It is the most complex part of the tutorial because four different cases have to be handled.

- Candies are send to a home vertex. First field is zero.
- A child with strategy one is arrives. First field is one.
- A child with strategy two is arrives. First field is two.
- A child with strategy three is arrives. First field is three.

First, the first field of the incoming messages are checked to distinguish the four cases. Depending on the case, different parts of code are executed. However, this is not the only problem making this step more complex. Moreover, there can be more then one incoming message. For example, a child with strategy one and a child with strategy three arrive at a house at the same time.Thus, we would have to call the functions **setNewVertexValue** and **getEdges** twice, which is not allowed. We will solve this problem by collecting the changes of the vertex value and the outgoing messages. At the end of the calculation, we make one update and one message send step. For this prupose, the helper variables **newVal** and **outMessages** will be used.

The helper variable **newVal** address an other problem as well. Even if we could call the **setNewVertexValue** function for each child, the changes would only be persistent after the current super step. Therefore, the calculation of the remaining candies would be with respect to the old value of the vertex.

To respect the fact that the children have influence on each other, the order of the children have to be fixed. We will sort the children by strategy number. First comes strategy one, then strategy two, and last but not least strategy three.

Let us recapitulate how Flink allows us how to access the fields of a Tuple. The variables of the Tuple classes are public and can be direct access without any getter or setter method. The variables are **f0**, **f1**, and **f2** for the first field, the second field, and the last field of a Tuple3, respectively.

In the following, the resulting code is shown:

**else** {

//Helper variable because setNewVertexValue can be only called once

**double** newVal = vertex.getValue();

//Helper list because getEdges can only be called once

ArrayList<Tuple3<Integer, Integer,Double>> outMessages = **new** ArrayList<>();

//Handle the incoming messages

**for** ( Tuple3<Integer, Integer,Double> msg : messages) {

//children have found candies

**if** (msg.f0 == 0) {

...

}

//child has strategy one

**else if** (msg.f0 == 1) {

...

}

//child has strategy two

**else if** (msg.f0 == 2) {

...

}

//child has strategy three

**else if** (msg.f0 == 3) {

...

}

}

//change value

**if**(newVal != vertex.getValue()) {

setNewVertexValue(newVal);

}

//Sending messages to neighbors

**for** (Edge<Integer, NullValue> e: getEdges()) {

//for every neighbor, send out all saved messages

**for** (Tuple3<Integer, Integer, Double> msg: outMessages) {

sendMessageTo(e.getTarget(), msg);

}

}

}

Again, “**…**” stands for code we will implement on the next sites.

The first case is the one where candies are send back home. Is a very simple case. All that has to be done is to add the amount of candies to the vertex value.

This simple case looks like this:

**if** (msg.f0 == 0) {

newVal += msg.f2;

}

The first of the three strategies is the simplest one. It does four steps:

- Calculated how many candies it gets.
- Remove the candies from the vertex.
- Send the candies to the home vertex
- Goes to the next vertex.

In the first step, the minimum of 1 and the remaining candies at the house is calculated since no or just a fraction of one candy might be left. Note that the remaining amount of candies is in the already saved in the variable **newVal**. The minimum is saved in a variable **candy**.

In the second step, the remaining amount of candies is update by subtracting **candy** from **newVal.**

In the third step, the candy is sent back home using the function **sendMessageTo**. The target of this message is the value of the second flied of the input message and the first value is zero to show candy is sent home. The last field is then again the value of **candy**. To create the message, the **Tuple3.of** function is used. The second field does not hold information in the case of this type of message. Therefore, it may contain any integer value.

To go to the next vertex, the incoming message is saved in **outMessages**, i.e. it is passed to the next neighbor.

Here is the complete source code for the first strategy:

**else if** (msg.f0 == 1) {

//How many candies they get

**double** candy = Math.min(1D, newVal);

//Remove candies from the house

newVal -= candy;

//Sent candies home

sendMessageTo(msg.f1,Tuple3.of(0, 0, candy));

//Children goes to next house

outMessages.add(msg);

}

Note here that only the message to the neighbors are saved for sending it later. The message to the home vertex is sent directly.

The second strategy is similar to the first one. However, it has an inner state to collect candies only at every second house. The performed steps are:

- Check the state if applicable, reset state and got to next vertex.
- Calculated if there the child gets a candy or not.
- Remove the candy from the vertex.
- Send the candy to the home vertex
- Reset the inner state
- Goes to the next vertex.

The inner state is either zero or one. If it is zero, a candy is collected. If it is one, the inner state is set to zero and send to the next vertex. The change of the inner state can be done directly in the received message by using the public variable f2. The resulting message is then added to the **outMessages** list. Steps 2-6 are skipt in this case.

If the inner state is zero, steps 2-6 are performed. They are the same steps as for strategy one except the step 5. The inner state is set to one indicating that the next house is skipped.

The complete code then looks like this:

**else if** (msg.f0 == 2) {

//Check if they skip this house

**if**(msg.f2 != 0D) {

//Do not skip next house

msg.f2 = 0D;

//go to the next house

outMessages.add(msg);

}

//When the do not skip this house

**else** {

//How many candies they get

**double** candy = Math.min(2D, newVal);

//Remove candies from the house

newVal -= candy;

//Sent candies home

sendMessageTo(msg.f1,Tuple3.of(0, 0, candy));

//Skip next house;

msg.f2 = 1D;

//Children goes to next house

outMessages.add(msg);

}

}

Note that because of the double roll of the third field in the messages, it is a Double. Therefore, all checks and changes of the status have to be implemented in the means of double values (adding **D**).

We are almost done! This is the last strategy and not very different to the second strategy. The steps are:

- Check the state and wait at this house where applicable.
- Calculated how many candies the child gets.
- Remove the candies from the vertex.
- Send the candies to the home vertex
- Reset the inner state
- Goes to the next vertex.

In the first step, the child waits at the vertex instead of going to the next vertex. Note that the child now waits when the inner state is zero and collects candies when it is one. In strategy three, the child has to beg before it gets candy while in strategy two it first goes to a house to collect candies and then skips the next one. Waiting at one house is implemented by sending a messages to it self. This ensure that in the next step this messages arrives again at the current node and the candies are collected. The message to itself can send as every other message with the **sendMessageTo** function and **vertex.getId** as target id.

The next difference is in step two. Instead of collecting the minimum of one and the remaining amount of candies, one third of the remaining amount of candies is collected. As already mentioned, the inner state indicates the opposite behavior as in strategy two. Consequently, the inner state is set two zero in step 5 (compare to step 5 of strategy two where it is set to one). The remaining steps do not differ between the last two strategies.

This is the source code for strategy three:

**else if** (msg.f0 == 3) {

//Check if they begged enough

**if**(msg.f2 == 0) {

//Next step get the candies

msg.f2 = 1D;

//Wait here for the next round

sendMessageTo(vertex.getId(),msg);

}

//When enough begged

**else** {

//How many candies they get

**double** candy = newVal/3;

//Remove candies from the house

newVal -= candy;

//Sent candies home

sendMessageTo(msg.f1,Tuple3.of(0, 0, candy));

//Skip next house;

msg.f2 = 0D;

//Children goes to next house

outMessages.add(msg);

}

}

Congratulations, you just implemented your first vertex-centric iteration in Flink. So what is the best strategy for your next trick-and-treat tour?

This tutorial gives a introduction onto how vertex-centric iteration work. Using an very simple example, the key features of vertex-centric iterations are explained and demonstrated. In detail, we introduced to you:

- Sending messages.
- Receiving messages.
- Changing vertex value.
- Getting access to the neighbors.
- Super steps.

We hope that the tutorial enables you to implement any algorithm given as pseudocode for a pregel iteration in Flink and to even write your own algorithm as a vertex-centric iteration.

The complete project with configured for the maven build system can be found here: Download zip.

It is a zip archive that contains the project sources and maven configuration. import it into Eclipse: File-> import -> General/Existing Projects into Workspace

Note that this project uses Maven. Thus, you need a runnable Maven installation and Eclipse configured to use it.

funded by: