Using MultiGroupBy with Scalding
TL;DR
Scalding .groupBy
and .join
operations can be combined into single operation
using MultiGroupBy
from Cascading extension, which
improves the job performance. Scalding job example using MultiGroupBy can be
found here.
Introduction
Let’s imagine we have two data sources. The first data contains the purchase
record of the users per time and per geographical State. This data is formatted
as following, <user_id, timestamp, state,
purchases>
. The second data contains the user demographic information.
For this particular example, it only contains user age, <user_id, age>
The main goal of this map reduce job is to count the number of purchases per state and per age group.
In Scalding, we can implement this job as,
This is elegant and concise solution however it is not very efficient.
The Problem
In Scalding each .groupBy
and .join
operation introduces another map reduce
phase. That is with the code above, data will be shuffled, sorted and reduced
three times before finishing the computation. Therefore, when there are very
big data to be processed, the overall job performance
will be very inefficient.
Luckily we can do better!
MultiGroupBy Operation
The desired solution is to perform aggregation operations while joining two data sources. Fortunately, it can be achieved using MultiGroupBy operation. In the rest of this blog I will show how to use MultiGroupBy in Scalding by reducing the three steps from above job into single map reduce phase.
Recently I was reading tips for optimizing Cascading flows (at https://nathanmarz.com/blog/tips-for-optimizing-cascading-flows.html) and recalled Cascading extensions project, which I saw several months ago. It offers additional operations on top of Cascading. Here I will only show MultiGroupBy (maybe BloomJoin in some other blog post). It is great!
The API of MultiGroupBy is defined MultiGroupBy.java#L35-L55. It accepts two pipes, two fields definitions as joining fields, renamed join field(s) and aggregation operation. We will have to write Cascading multi buffer operation in Java, but it is worth the effort.
The updated Scalding job will be as below,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import com.liveramp.cascading_ext.assembly.MultiGroupBy
class MultiGroupByExample2(args: Args) extends Job(args) {
// ...
val MyJob =
new MultiGroupBy(
Array(UserAges, Purchases),
Array(new Fields("USERID"), new Fields("USERID")),
new Fields("USERID"),
new MyMultiBufferOp(new Fields("STATE", "AGE", "COUNT"))
)
.discard('USERID)
.write(Tsv(outputPath))
}
Because MultiGroupBy performs join operation, it keeps the join fields. Therefore, on line 13 we just discard ‘USERID column.
Please notice the smooth Scala/Scalding and Java/Cascading interop. new Fields(“USERID”) and ‘USERID are the same.
Next we write our multi buffer operation, MyMultiBufferOp.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import com.liveramp.cascading_ext.multi_group_by.MultiBuffer;
import org.apache.commons.collections.keyvalue.MultiKey;
public class MyMultiBufferOp extends MultiBuffer {
// ...
@Override
public void operate() {
// First pipe: UserAges <USERID, AGE>
Iterator<Tuple> userAges = getArgumentsIterator(0);
if (!userAges.hasNext()) {
return ;
}
Tuple userAgesTuple = userAges.next();
int user_age = userAgesTuple.getInteger(1); // second field is age
// Data structure to store the count
MultiKey key = null;
Map<MultiKey, Integer> countMap = new HashMap<MultiKey, Integer>();
// Second pipe: Purchases <USERID, TIMESTAMP, STATE, PURCHASES>
Iterator<Tuple> purchases = getArgumentsIterator(1);
while (purchases.hasNext()) {
Tuple purchasesTuple = purchases.next();
int state = purchasesTuple.getInteger(2); // third column is state
key = new MultiKey(state, user_age);
if (countMap.containsKey(key)) {
countMap.put(key, countMap.get(key) + 1);
} else {
countMap.put(key, 1);
}
}
// We just calculated <STATE, AGE, COUNT> results stored in 'countMap'
// Now we just have to emit COUNT, because we gave <STATE, AGE>
// as grouping names when calling this buffer operation
for (Map.Entry<MultiKey, Integer> entry : countMap.entrySet()) {
key = entry.getKey();
int state = (Integer) key.getKey(0);
int age = (Integer) key.getKey(1);
int count = entry.getValue();
emit(new Tuple(state, age, count));
}
}
}
First, we obtain tuple iterators for the two data sources. Then we keep updating
the hashmap HashMap(<state, age>, count)
until exhausting iterators values.
Finally, we emit the hashmap contents as results for this buffer operation.
You can find the full code here and here multi buffer operation. In order to test the MultiGroupBy example you will have to assembly fat jar and run it on Hadoop environment.
Conclusion
In find this kind of patterns, join after or before groupBy, a lot in our map reduce job chains. Using MultiGroupBy we achieved considerable performance increase. Additionally, it resulted in efficient cluster utilization.
I strongly believe this operation should be default in both Cascading and Scalding.
If you liked this post, you can click to Tweet it or follow me on Twitter!