Mapper and reducer interfaces

March 31, 2009

In Dumbo 0.21.3, an alternative interface for mappers and reducers got added. Using this interface, the “wordcount” example

def mapper(key, value):
    for word in value.split(): 
        yield word, 1

def reducer(key, values):
    yield key, sum(values)

if __name__ == "__main__":
    from dumbo import run
    run(mapper, reducer, combiner=reducer)

can be written as follows:

def mapper(data):
    for key, value in data:
        for word in value.split(): 
            yield word, 1

def reducer(data):
    for key, values in data:
        yield key, sum(values)

if __name__ == "__main__":
    from dumbo import run
    run(mapper, reducer, combiner=reducer)

Dumbo automatically detects which interface is being used by the function, and calls it appropriately. In theory, the alternative version is faster since it involves less function calls, but the real reason why the new interface got added is because it is more low-level and can make integration with existing Python code easier in some cases.

Just like the original interface, the alternative one also works for mapper and reducer classes. Adapting the first example above such that a class is used for both the mapper and reducer results in:

class Mapper:
    def __call__(self, key, value):
        for word in value.split(): 
            yield word, 1

class Reducer:
    def __call__(self, key, values):
        yield key, sum(values)

if __name__ == "__main__":
    from dumbo import run
    run(Mapper, Reducer, combiner=Reducer)

Applying the same transformation to the version using the alternative interface leads to:

class Mapper:
    def __call__(self, data):
        for key, value in data:
            for word in value.split(): 
                yield word, 1

class Reducer:
    def __call__(self, data):
        for key, values in data:
            yield key, sum(values)

if __name__ == "__main__":
    from dumbo import run
    run(Mapper, Reducer, combiner=Reducer)

Since mapper and reducer functions that use the alternative interface are called only once, you don’t need classes to add initialization or cleanup logic when using this interface, but they can still be useful if you want to access the fields that Dumbo automatically adds to them.

Advertisements

Join keys

March 20, 2009

Earlier today, I released Dumbo 0.21.3, which adds support for so called “join keys” (amongst other things). Here’s an example of a Dumbo program that uses such keys:

def mapper(key, value):
    key.isprimary = "hostnames" in key.body[0]
    key.body, value = value.split("\t", 1)
    yield key, value
    
class Reducer:
    def __init__(self):
        self.key = None
    def __call__(self, key, values):
        if key.isprimary:
            self.key = key.body
            self.hostname = values.next()
        elif self.key == key.body:
            key.body = self.hostname
            for value in values:
                yield key, value
    
def runner(job):
    job.additer(mapper, Reducer)
    
def starter(prog):
    prog.addopt("addpath", "yes")
    prog.addopt("joinkeys", "yes")

if __name__ == "__main__":
    from dumbo import main
    main(runner, starter)

When you put this code in join.py, you can join the files hostnames.txt and logs.txt as follows:

$ wget http://users.ugent.be/~klbostee/files/hostnames.txt
$ wget http://users.ugent.be/~klbostee/files/logs.txt
$ dumbo join.py -input hostnames.txt -input logs.txt \
-output joined.code
$ dumbo cat joined.code > joined.txt

In order to make join keys work for non-local runs, however, you need to apply the patch from HADOOP-5528, which requires Hadoop 0.20 or higher. More precisely, Dumbo relies on the BinaryPartitioner from HADOOP-5528 to make sure that:

  1. All keys that differ only in the .isprimary attribute are passed to the same reducer.
  2. The primary keys are always reduced before the non-primary ones.

If you want to find out how this works exactly, you might want to watch Cloudera’s “MapReduce Algorithms” lecture, since joining by means of a custom partitioner is one of the common idioms discussed in this lecture.

UPDATE: Dumbo 0.21.3 is not compatible with the evolved version of the patch for HADOOP-5528. To make things work with the final patch, you need to upgrade to Dumbo 0.21.4.


How to contribute

March 12, 2009

As part of the attempt to get more organized, this post outlines how to contribute to Dumbo. I might turn this into a wiki page eventually, but a blog post will probably get more attention, and people who are not planning to contribute to Dumbo any time soon might still be interested in the process described in the remainder of this post.

If you haven’t done so already, create a GitHub account and fork the master tree. Then go through the following steps:

  1. Either create a new ticket for the changes you have in mind, or add a comment to the corresponding existing ticket to inform everyone that you started working on it.
  2. Make the necessary changes (and add unit tests for them).
  3. Run all unit tests:

    $ python setup.py test
  4. If none of the tests fail, commit your changes using a commit message that GitHub’s issue tracker understands:

    $ git commit -a -m "Closes GH-<ticket number>"
  5. Push your commit to GitHub:

    $ git push
  6. Send a pull request to me.

If you want to be able to easily get back to it later, you can also create a separate branch for the ticket and push this branch to GitHub instead. This might, for instance, save you some time when I spot a bug in your changes, and ask you to send another pull request after fixing this bug.


Getting organized

March 11, 2009

I created Assembla issue trackers for both Dumbo and Typedbytes a few moments ago. They aren’t quite as powerful as JIRA, but they integrate nicely with GitHub and should be more than sufficient for the time being.

UPDATE: We now use GitHub’s native issue trackers:


Dumbo 0.21 and typedbytes 0.3

March 11, 2009

I finally took the time to release Dumbo 0.21. All Java code is gone now, and the Python code got split up into submodules to make things more clear and maintainable. The installation of Dumbo itself is now very easy, but until Hadoop 0.21 gets released it requires a patched Hadoop, since it relies on HADOOP-1722 for replacing the functionality provided by the removed Java code.

Thanks to Daniel Lescohier, the typedbytes Python module — which is now a required dependency for Dumbo — also had its version number increased. Version 0.3 of this module incorporates various improvements and optimizations, but one of these improvements requires another patch to be applied to Hadoop, unfortunately.

So, for now, you have to apply the patches from both HADOOP-1722 and HADOOP-5450 in order to be able to use the latest Dumbo offerings, but this minor annoyance will hopefully be remedied by Hadoop 0.21, since the plan is to get HADOOP-5450 in Hadoop 0.21, just like HADOOP-1722.


Parsers and records

March 6, 2009

Earlier today I released Dumbo 0.20.29, which adds built-in support for parser and record classes. The easiest way to explain the use of such classes is by means of an example, so lets just dive in straight away and look at some code:

class JobLog:
    def __init__(self, logline=None):
        from re import compile
        self.fieldselector = compile('([A-Z_]+)="([^"]*)"')
        if logline: self.parse(logline)
    def parse(self, logline):
        self.type = logline.split(" ", 1)[0]
        fields = dict(self.fieldselector.findall(logline))
        self.fields = fields
        if self.type.endswith("Attempt"):
            try:
                self.jobname = fields["JOBNAME"]
                self.taskid = fields["TASK_ATTEMPT_ID"]
                if "START_TIME" in fields:
                    self.start = int(fields["START_TIME"])
                    self.stop = None
                    self.status = None
                elif "FINISH_TIME" in fields:
                    self.start = None
                    self.stop = int(fields["FINISH_TIME"])
                    self.status = fields["TASK_STATUS"]
            except KeyError, err:
                raise ValueError(str(err))
        return self

This class is a parser because:

  1. It has a parse method that takes a line of text as input and returns an object that represents this line.
  2. Its parse method raises a ValueError when the given line of text is in an unexpected format.

Since the objects returned by the parse method only have to stay valid until the next time parse is called, the same object can be reused. It’s even possible to return the parser object itself, as illustrated by the example above.

Now, when you put JobLog in parsers.py and add the option -file parsers.py to the dumbo start command, you can rewrite the job statistics program from my previous post as follows:

from dumbo import sumsreducer, statsreducer, statscombiner, main

def mapper1(key, log):
    if log.type.endswith("Attempt"):
        if log.start:
            yield (log.type, log.jobname, log.taskid), (log.start, 0)
        elif log.stop and log.status == "SUCCESS":
            yield (log.type, log.jobname, log.taskid), (0, log.stop)

def mapper2(key, value):
    if value[0] > 0 and value[1] > 0:
        yield key[:2], (value[1] - value[0]) / 1000.0

def runner(job):
    job.additer(Mapper1, sumsreducer, combiner=sumsreducer)
    job.additer(mapper2, statsreducer, statscombiner)

def starter(prog):
    prog.addopt("parser", "parsers.JobLog")
    date = prog.delopt("date")
    if date:
        prog.addopt("input", "/user/hadoop/joblogs/" + date)
        prog.addopt("name", "jobstats-" + date)

if __name__ == "__main__":
    main(runner, starter)

The main thing to note in this code is the line

prog.addopt("parser", "parsers.JobLog")

By means of the new -parser <full name of Python class> option, you can tell Dumbo to convert all values to objects using a given parser, before passing them on to your mapper. When a ValueError occurs during this process, Dumbo will ignore the corresponding value and increment the counter “Bad inputs”, unless you indicated that errors shouldn’t be caught by using the option -debug yes.

Record classes are similar to parser classes, but instead of a parse method they have a set method that can take multiple values as input. The corresponding option is -record <full name of Python class>, which makes Dumbo create an instance of the given record class and call instance.set(*value) for each value. Since Dumbo represents Hadoop records as sequences of Python values corresponding to the different fields, this is especially useful when consuming files that contain such Hadoop records (which explains why I refer to these value-transforming Python classes as “records”).

At Last.fm, we’ve been using parser and record classes for quite a while now, because they allow us to both abstract away file format details and handle many bug fixes and improvements centrally. We bundle them in Python eggs and make sure they are submitted along with every Dumbo program by putting the necessary -libegg options in /etc/dumbo.conf. Always having to manually create and invoke the parser or record class quickly becomes a bit tiring though, so the new -parser and -record options are definitely a welcome improvement.


HADOOP-1722 and typed bytes

February 24, 2009

The Hadoop guys recently accepted a patch that adds support for binary communication formats to Streaming, the Hadoop component on which Dumbo is based. As described in the comments for the corresponding JIRA issue (HADOOP-1722), this patch basically abstracts away the used communication format by introducing the classes InputWriter and OutputReader. It provides extensions of these classes that implement three different communication formats, including the so called typed bytes format, which happens to be a great alternative to the functionality provided by the Java code that is currently part of Dumbo.

A while ago, Johan and I ran four versions of the IP count program on about 300 gigs of weblogs, which resulted in the following timings:

  1. Java: 8 minutes
  2. Dumbo with typed bytes: 10 minutes
  3. Hive: 13 minutes
  4. Dumbo without typed bytes: 16 minutes

Hence, thanks to typed bytes, Java is only 20% faster than Dumbo anymore, and there might still be some room for narrowing this gap even further. For instance, I wouldn’t be surprised if another minute could be chopped off by implementing the typed bytes serialization at the Python side in C instead of pure Python, just like the Thrift guys did with their fastbinary Python module. Since typed bytes are also a cleaner and more maintainable solution, the Java code will probably be removed in Dumbo 0.21, making typed bytes the only supported approach.

Unfortunately, all Hadoop releases before 0.21.0 will not yet include typed bytes support, but I do plan to attach backported patches to HADOOP-1722. The patch for 0.18 is already there, and backports for more recent versions of Hadoop will follow as soon as we start using those versions at Last.fm. In order to use typed bytes with Dumbo 0.20, you just have to make sure the typedbytes module is installed on the machine from which you run your Dumbo programs, add typedbytes: all and type: typedbytes to the sections [streaming] and [cat], respectively, of your /etc/dumbo.conf or ~/.dumborc file, and apply the HADOOP-1722 patch to your Hadoop build:

$ cd /path/to/hadoop-0.18
$ wget 'https://issues.apache.org/jira/secure/attachment/'\
'12400166/HADOOP-1722-branch-0.18.patch'
$ patch -p0 < HADOOP-1722-branch-0.18.patch
$ ant package

For now, Dumbo will still look for its jar though, so you cannot get rid of src/contrib/dumbo in your Hadoop directory just yet.