Today, Tim put a smile on the faces of many Dumbo users at Last.fm by backporting HADOOP-5528 to Hadoop 0.18. Now that his backported patch has been deployed, we finally get to use join keys on our production clusters, allowing us to join more easily and avoid the memory-hungry alternative ways of joining datasets.
HADOOP-5528 got committed yesterday. From Hadoop 0.21 onwards, join keys will work “out of the box”, without requiring any patching. Since the patch evolved somewhat before it got committed, it won’t work anymore with Dumbo 0.20.3 though. Therefore, I released Dumbo 0.21.4 this morning, for which the list of changes includes fixing the incompatibility with the final HADOOP-5528 patch.
So far, my luck with getting Hadoop patches reviewed and committed has varied quite a bit. From my limited personal experience, it seems that it’s more difficult to get a committer to look at a bugfix or an important enhancement, while such contributions can actually be considered more important than new features. It is of course possible that these particular issues just happened to get overlooked somehow, or maybe there’s a procedure for attracting the committers’ attention that I’m not aware of, but nevertheless I’m still under the impression that Hadoop’s patch handling currently is not as smooth and efficient as it could be. The fact that, as of this writing, not less than 47 issues are in the “Patch available” state, seems to confirm this impression.
def mapper(key, value): key.isprimary = "hostnames" in key.body key.body, value = value.split("\t", 1) yield key, value class Reducer: def __init__(self): self.key = None def __call__(self, key, values): if key.isprimary: self.key = key.body self.hostname = values.next() elif self.key == key.body: key.body = self.hostname for value in values: yield key, value def runner(job): job.additer(mapper, Reducer) def starter(prog): prog.addopt("addpath", "yes") prog.addopt("joinkeys", "yes") if __name__ == "__main__": from dumbo import main main(runner, starter)
$ wget http://users.ugent.be/~klbostee/files/hostnames.txt $ wget http://users.ugent.be/~klbostee/files/logs.txt $ dumbo join.py -input hostnames.txt -input logs.txt \ -output joined.code $ dumbo cat joined.code > joined.txt
In order to make join keys work for non-local runs, however, you need to apply the patch from HADOOP-5528, which requires Hadoop 0.20 or higher. More precisely, Dumbo relies on the BinaryPartitioner from HADOOP-5528 to make sure that:
- All keys that differ only in the .isprimary attribute are passed to the same reducer.
- The primary keys are always reduced before the non-primary ones.
If you want to find out how this works exactly, you might want to watch Cloudera’s “MapReduce Algorithms” lecture, since joining by means of a custom partitioner is one of the common idioms discussed in this lecture.