18 November 2014 - Nuremberg

TL;DR

Scalding .groupBy and .join operations can be combined into single operation using MultiGroupBy from Cascading extension, which improves the job performance. Scalding job example using MultiGroupBy can be found here.

Introduction

Let’s imagine we have two data sources. The first data contains the purchase record of the users per time and per geographical State. This data is formatted as following, <user_id, timestamp, state, purchases>. The second data contains the user demographic information. For this particular example, it only contains user age, <user_id, age>

The main goal of this map reduce job is to count the number of purchases per state and per age group.

In Scalding, we can implement this job as,

class MultiGroupByExample1(args: Args) extends Job(args) {
  // ...

  val Purchases =
    Tsv(purchasesPath, ('USERID, 'TIMESTAMP, 'STATE, 'PURCHASE))
    .read

  val UserAges =
    Tsv(userAgesPath, ('USERID, 'AGE))
    .read

  val MyJob = Purchases
    .groupBy('USERID, 'STATE) { _.size('COUNT) }
    .joinWithSmaller('USERID -> 'USERID, UserAges)
    .groupBy('STATE, 'AGE) { _.sum[Int]('COUNT) }
    .write(Tsv(outputPath))
}

This is elegant and concise solution however it is not very efficient.

The Problem

In Scalding each .groupBy and .join operation introduces another map reduce phase. That is with the code above, data will be shuffled, sorted and reduced three times before finishing the computation. Therefore, when there are very big data to be processed, the overall job performance will be very inefficient.

Luckily we can do better!

MultiGroupBy Operation

The desired solution is to perform aggregation operations while joining two data sources. Fortunately, it can be achieved using MultiGroupBy operation. In the rest of this blog I will show how to use MultiGroupBy in Scalding by reducing the three steps from above job into single map reduce phase.

Recently I was reading tips for optimizing Cascading flows (at https://nathanmarz.com/blog/tips-for-optimizing-cascading-flows.html) and recalled Cascading extensions project, which I saw several months ago. It offers additional operations on top of Cascading. Here I will only show MultiGroupBy (maybe BloomJoin in some other blog post). It is great!

The API of MultiGroupBy is defined MultiGroupBy.java#L35-L55. It accepts two pipes, two fields definitions as joining fields, renamed join field(s) and aggregation operation. We will have to write Cascading multi buffer operation in Java, but it is worth the effort.

The updated Scalding job will be as below,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import com.liveramp.cascading_ext.assembly.MultiGroupBy

class MultiGroupByExample2(args: Args) extends Job(args) {
  // ...

  val MyJob =
    new MultiGroupBy(
      Array(UserAges, Purchases),
      Array(new Fields("USERID"), new Fields("USERID")),
      new Fields("USERID"),
      new MyMultiBufferOp(new Fields("STATE", "AGE", "COUNT"))
    )
    .discard('USERID)
    .write(Tsv(outputPath))
}

Because MultiGroupBy performs join operation, it keeps the join fields. Therefore, on line 13 we just discard ‘USERID column.

Please notice the smooth Scala/Scalding and Java/Cascading interop. new Fields(“USERID”) and ‘USERID are the same.

Next we write our multi buffer operation, MyMultiBufferOp.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import com.liveramp.cascading_ext.multi_group_by.MultiBuffer;
import org.apache.commons.collections.keyvalue.MultiKey;

public class MyMultiBufferOp extends MultiBuffer {
    // ...
    @Override
    public void operate() {
        // First pipe: UserAges <USERID, AGE>
        Iterator<Tuple> userAges = getArgumentsIterator(0);
        if (!userAges.hasNext()) {
            return ;
        }
        Tuple userAgesTuple = userAges.next();
        int user_age = userAgesTuple.getInteger(1); // second field is age
        // Data structure to store the count
        MultiKey key = null;
        Map<MultiKey, Integer> countMap = new HashMap<MultiKey, Integer>();
        // Second pipe: Purchases <USERID, TIMESTAMP, STATE, PURCHASES>
        Iterator<Tuple> purchases = getArgumentsIterator(1);
        while (purchases.hasNext()) {
            Tuple purchasesTuple = purchases.next();
            int state = purchasesTuple.getInteger(2); // third column is state
            key = new MultiKey(state, user_age);
            if (countMap.containsKey(key)) {
                countMap.put(key, countMap.get(key) + 1);
            } else {
                countMap.put(key, 1);
            }
        }
        // We just calculated <STATE, AGE, COUNT> results stored in 'countMap'
        // Now we just have to emit COUNT, because we gave <STATE, AGE>
        // as grouping names when calling this buffer operation
        for (Map.Entry<MultiKey, Integer> entry : countMap.entrySet()) {
            key = entry.getKey();
            int state = (Integer) key.getKey(0);
            int age = (Integer) key.getKey(1);
            int count = entry.getValue();
            emit(new Tuple(state, age, count));
        }
    }
}

First, we obtain tuple iterators for the two data sources. Then we keep updating the hashmap HashMap(<state, age>, count) until exhausting iterators values. Finally, we emit the hashmap contents as results for this buffer operation.

You can find the full code here and here multi buffer operation. In order to test the MultiGroupBy example you will have to assembly fat jar and run it on Hadoop environment.

Conclusion

In find this kind of patterns, join after or before groupBy, a lot in our map reduce job chains. Using MultiGroupBy we achieved considerable performance increase. Additionally, it resulted in efficient cluster utilization.

I strongly believe this operation should be default in both Cascading and Scalding.

If you liked this post, you can click to Tweet it or follow me on Twitter!