BinaryPartitioner backported to 0.18

May 6, 2009

Today, Tim put a smile on the faces of many Dumbo users at Last.fm by backporting HADOOP-5528 to Hadoop 0.18. Now that his backported patch has been deployed, we finally get to use join keys on our production clusters, allowing us to join more easily and avoid the memory-hungry alternative ways of joining datasets.

Advertisements

HADOOP-5528

April 1, 2009

HADOOP-5528 got committed yesterday. From Hadoop 0.21 onwards, join keys will work “out of the box”, without requiring any patching. Since the patch evolved somewhat before it got committed, it won’t work anymore with Dumbo 0.20.3 though. Therefore, I released Dumbo 0.21.4 this morning, for which the list of changes includes fixing the incompatibility with the final HADOOP-5528 patch.

So far, my luck with getting Hadoop patches reviewed and committed has varied quite a bit. From my limited personal experience, it seems that it’s more difficult to get a committer to look at a bugfix or an important enhancement, while such contributions can actually be considered more important than new features. It is of course possible that these particular issues just happened to get overlooked somehow, or maybe there’s a procedure for attracting the committers’ attention that I’m not aware of, but nevertheless I’m still under the impression that Hadoop’s patch handling currently is not as smooth and efficient as it could be. The fact that, as of this writing, not less than 47 issues are in the “Patch available” state, seems to confirm this impression.


Join keys

March 20, 2009

Earlier today, I released Dumbo 0.21.3, which adds support for so called “join keys” (amongst other things). Here’s an example of a Dumbo program that uses such keys:

def mapper(key, value):
    key.isprimary = "hostnames" in key.body[0]
    key.body, value = value.split("\t", 1)
    yield key, value
    
class Reducer:
    def __init__(self):
        self.key = None
    def __call__(self, key, values):
        if key.isprimary:
            self.key = key.body
            self.hostname = values.next()
        elif self.key == key.body:
            key.body = self.hostname
            for value in values:
                yield key, value
    
def runner(job):
    job.additer(mapper, Reducer)
    
def starter(prog):
    prog.addopt("addpath", "yes")
    prog.addopt("joinkeys", "yes")

if __name__ == "__main__":
    from dumbo import main
    main(runner, starter)

When you put this code in join.py, you can join the files hostnames.txt and logs.txt as follows:

$ wget http://users.ugent.be/~klbostee/files/hostnames.txt
$ wget http://users.ugent.be/~klbostee/files/logs.txt
$ dumbo join.py -input hostnames.txt -input logs.txt \
-output joined.code
$ dumbo cat joined.code > joined.txt

In order to make join keys work for non-local runs, however, you need to apply the patch from HADOOP-5528, which requires Hadoop 0.20 or higher. More precisely, Dumbo relies on the BinaryPartitioner from HADOOP-5528 to make sure that:

  1. All keys that differ only in the .isprimary attribute are passed to the same reducer.
  2. The primary keys are always reduced before the non-primary ones.

If you want to find out how this works exactly, you might want to watch Cloudera’s “MapReduce Algorithms” lecture, since joining by means of a custom partitioner is one of the common idioms discussed in this lecture.

UPDATE: Dumbo 0.21.3 is not compatible with the evolved version of the patch for HADOOP-5528. To make things work with the final patch, you need to upgrade to Dumbo 0.21.4.