Hiding join keys

May 15, 2009

Dumbo 0.21.12 further simplifies joining by hiding the join keys from the “end-programmer”. For instance, the example discussed here can now be written as follows:

import dumbo
from dumbo.lib import MultiMapper, JoinReducer
from dumbo.decor import primary, secondary

def mapper(key, value):
    yield value.split("\t", 1)

class Reducer(JoinReducer):
    def primary(self, key, values):
        self.hostname = values.next()
    def secondary(self, key, values):
        key = self.hostname
        for value in values:
            yield key, value

if __name__ == "__main__":
    multimapper = MultiMapper()
    multimapper.add("hostnames", primary(mapper))
    multimapper.add("logs", secondary(mapper))
    dumbo.run(multimapper, Reducer)

These are the things to note in this fancier version:

  • The mapping is implemented by combining decorated mappers into one MultiMapper.
  • The reducing is implemented by extending JoinReducer.
  • There is no direct interaction with the join keys. In fact, the -joinkeys yes option doesn’t even get specified explicitly (the decorated mappers and JoinReducer automatically make sure this option gets added via their opts attributes).

The primary and secondary decorators can, of course, also be applied using the @decorator syntax, i.e., I could also have written

def mapper1(key, value):
    yield value.split("\t", 1)

def mapper2(key, value):
    yield value.split("\t", 1)


    multimapper.add("hostnames", mapper1)
    multimapper.add("logs", mapper2)

This is less convenient for this particular example, but it might be preferable when your primary and secondary mapper have different implementations.

BinaryPartitioner backported to 0.18

May 6, 2009

Today, Tim put a smile on the faces of many Dumbo users at Last.fm by backporting HADOOP-5528 to Hadoop 0.18. Now that his backported patch has been deployed, we finally get to use join keys on our production clusters, allowing us to join more easily and avoid the memory-hungry alternative ways of joining datasets.

Join keys

March 20, 2009

Earlier today, I released Dumbo 0.21.3, which adds support for so called “join keys” (amongst other things). Here’s an example of a Dumbo program that uses such keys:

def mapper(key, value):
    key.isprimary = "hostnames" in key.body[0]
    key.body, value = value.split("\t", 1)
    yield key, value
class Reducer:
    def __init__(self):
        self.key = None
    def __call__(self, key, values):
        if key.isprimary:
            self.key = key.body
            self.hostname = values.next()
        elif self.key == key.body:
            key.body = self.hostname
            for value in values:
                yield key, value
def runner(job):
    job.additer(mapper, Reducer)
def starter(prog):
    prog.addopt("addpath", "yes")
    prog.addopt("joinkeys", "yes")

if __name__ == "__main__":
    from dumbo import main
    main(runner, starter)

When you put this code in join.py, you can join the files hostnames.txt and logs.txt as follows:

$ wget http://users.ugent.be/~klbostee/files/hostnames.txt
$ wget http://users.ugent.be/~klbostee/files/logs.txt
$ dumbo join.py -input hostnames.txt -input logs.txt \
-output joined.code
$ dumbo cat joined.code > joined.txt

In order to make join keys work for non-local runs, however, you need to apply the patch from HADOOP-5528, which requires Hadoop 0.20 or higher. More precisely, Dumbo relies on the BinaryPartitioner from HADOOP-5528 to make sure that:

  1. All keys that differ only in the .isprimary attribute are passed to the same reducer.
  2. The primary keys are always reduced before the non-primary ones.

If you want to find out how this works exactly, you might want to watch Cloudera’s “MapReduce Algorithms” lecture, since joining by means of a custom partitioner is one of the common idioms discussed in this lecture.

UPDATE: Dumbo 0.21.3 is not compatible with the evolved version of the patch for HADOOP-5528. To make things work with the final patch, you need to upgrade to Dumbo 0.21.4.