def mapper(key, value): key.isprimary = "hostnames" in key.body key.body, value = value.split("\t", 1) yield key, value class Reducer: def __init__(self): self.key = None def __call__(self, key, values): if key.isprimary: self.key = key.body self.hostname = values.next() elif self.key == key.body: key.body = self.hostname for value in values: yield key, value def runner(job): job.additer(mapper, Reducer) def starter(prog): prog.addopt("addpath", "yes") prog.addopt("joinkeys", "yes") if __name__ == "__main__": from dumbo import main main(runner, starter)
$ wget http://users.ugent.be/~klbostee/files/hostnames.txt $ wget http://users.ugent.be/~klbostee/files/logs.txt $ dumbo join.py -input hostnames.txt -input logs.txt \ -output joined.code $ dumbo cat joined.code > joined.txt
In order to make join keys work for non-local runs, however, you need to apply the patch from HADOOP-5528, which requires Hadoop 0.20 or higher. More precisely, Dumbo relies on the BinaryPartitioner from HADOOP-5528 to make sure that:
- All keys that differ only in the .isprimary attribute are passed to the same reducer.
- The primary keys are always reduced before the non-primary ones.
If you want to find out how this works exactly, you might want to watch Cloudera’s “MapReduce Algorithms” lecture, since joining by means of a custom partitioner is one of the common idioms discussed in this lecture.