Dumbo on Amazon EMR

December 23, 2009

A while ago, I received an email from Andrew in which he wrote:

Now you should be able to run Dumbo jobs on Elastic MapReduce. To start a cluster, you can use the Ruby client as so:

$ elastic-mapreduce --create --alive

SSH into the cluster using your EC2 keypair as user hadoop and install Dumbo with the following two commands:

$ wget -O ez_setup.py http://bit.ly/ezsetup
$ sudo python ez_setup.py dumbo

Then you can run your Dumbo scripts. I was able to run the ipcount.py demo with the following command.

$ dumbo start ipcount.py -hadoop /home/hadoop \
-input s3://anhi-test-data/wordcount/input/ \
-output s3://anhi-test-data/output/dumbo/wc/

The -hadoop option is important. At this point I haven’t created an automatic Dumbo install script, so you’ll have to install Dumbo by hand each time you launch the cluster. Fortunately installation is easy.

There was a minor hiccup that required the Amazon guys to pull the AMI with Dumbo support, but it’s back now and they seem to be confident that Dumbo support is going to remain available from now on. They are also still planning to make things even easier by providing an automatic Dumbo installation script.

As an aside, it’s worth mentioning that a bug in Hadoop Streaming got fixed in the process of adding Dumbo support to EMR. I can’t wait to see what else the Amazon guys have up their sleeves.

Moving to Hadoop 0.20

November 23, 2009

We’ve finally started looking into moving from Hadoop 0.18 to 0.20 at Last.fm, and I thought it might be useful to share a few Dumbo-related things I learned in the process:

  • We’re probably going to base our 0.20 build on Cloudera‘s 0.20 distribution, and I found out the hard way that Dumbo doesn’t work on version 0.20.1+133 of this distribution because it includes a patch for MAPREDUCE-967 that breaks some of the Hadoop Streaming functionality on which Dumbo relies. Luckily, the Cloudera guys fixed it in 0.20.1+152 by reverting this patch, but if you’re still trying to get Dumbo to work on Cloudera’s 0.20.1+133 distribution for some reason then you can expect to get NullPointerExceptions and errors like, e.g., “module wordcount not found” in your tasks’ stderr logs.
  • Also, the Cloudera guys apparently haven’t added the patch for MAPREDUCE-764 to their distribution yet, so you’ll still have to apply this patch yourself if you want to avoid strange encoding problems in certain corner cases. This patch has now been reviewed and accepted for Hadoop 0.21 for quite a while already though, so maybe we can be hopeful about it getting included in Cloudera’s 0.20 distribution soon.
  • The Twitter guys put together a pretty awesome patched and backported version of hadoop-gpl-compression for Hadoop 0.20. It includes several bugfixes and it also provides an InputFormat for the old API, which is useful for Hadoop Streaming (and hence also Dumbo) users since Streaming has not been converted to the new API yet. If you’re interested in this stuff, you might want to have a look at this guest post from Kevin and Eric on the Cloudera blog.

Dumbo over HBase

July 31, 2009

This should be old news for dumbo-user subscribers, but Tim has, once again, put his Java coding skills to good use. This time around he created nifty input and output formats for consuming and/or producing HBase tables from Dumbo programs. Here’s a silly but illustrative example:

from dumbo import opt, run

@opt("inputformat", "fm.last.hbase.mapred.TypedBytesTableInputFormat")
@opt("hadoopconf", "hbase.mapred.tablecolumns=testfamily:testqualifier")
def mapper(key, columns):
    for family, column in columns.iteritems():
        for qualifier, value in column.iteritems():
            yield key, (family, qualifier, value)

@opt("outputformat", "fm.last.hbase.mapred.TypedBytesTableOutputFormat")
@opt("hadoopconf", "hbase.mapred.outputtable=output_table")
def reducer(key, values):
    columns = {}
    for family, qualifier, value in values:
        column = columns.get(family, {})
        column[qualifier] = value
    yield key, columns

if __name__ == "__main__":
    run(mapper, reducer)

Have a look at the readme for more information.


July 15, 2009

Unfortunately, the list of Hadoop patches required for making Dumbo work properly just expanded a bit, since I traced down a strange encoding bug to an issue in Streaming’s typed bytes code. Hence, you might want to apply the MAPREDUCE-764 patch to your Hadoop build if you use Dumbo, even though the bug only leads to problems in very specific cases and usually isn’t hard to work around. Hopefully this patch will make it into Hadoop 0.21.

This isn’t all bad news, however. The encoding bug was initially reported on the dumbo-user mailing list, which apparently has 12 subscribers already and is starting to attract fairly regular traffic. I haven’t promoted this mailing list much so far and never really expected that people would actually start using it to be honest, but obviously I was wrong. Everyone who reads this blog should consider subscribing, I’m sure you won’t regret it!

Analysing Apache logs

June 18, 2009

The Cloudera guys blogged about using Pig for examining Apache logs yesterday. Although it nicely illustrates several lesser-known Pig features, I’m not overly impressed with the described program to be honest. Having to revert to three different scripting languages to do some GeoIP lookups just complicates things too much if you ask me. Personally, I’d much prefer writing something like:

class Mapper:
    def __init__(self):
        from re import compile
        self.regex = compile(r'(?P<ip>[\d\.\-]+) (?P<id>[\w\-]+) ' \
                             r'(?P<user>[\w\-]+) \[(?P<time>[^\]]+)\] ' \
                             r'"(?P<request>[^"]+)" (?P<status>[\d\-]+) ' \
                             r'(?P<bytes>[\d\-]+) "(?P<referer>[^"]+)" ' \
        from pygeoip import GeoIP, MEMORY_CACHE
        self.geoip = GeoIP(self.params["geodata"], flags=MEMORY_CACHE)
    def __call__(self, key, value):
        mo = self.regex.match(value)
        if mo:
            request, bytes = mo.group("request"), mo.group("bytes")
            if request.startswith("GET") and bytes != "-":
                rec = self.geoip.record_by_addr(mo.group("ip"))
                country = rec["country_code"] if rec else "-"
                yield country, (1, int(bytes))

if __name__ == "__main__":
    from dumbo import run, sumsreducer
    run(Mapper, sumsreducer, combiner=sumsreducer)

After installing Python 2.6, I tested this hits_by_country.py program on my chrooted Cloudera-flavored Hadoop server as follows:

$ wget http://pygeoip.googlecode.com/files/pygeoip-0.1.1-py2.6.egg
$ wget http://bit.ly/geolitecity
$ wget http://bit.ly/randomapachelog  # found via Google
$ dumbo put access.log access.log -hadoop /usr/lib/hadoop
$ dumbo start hits_by_country.py -hadoop /usr/lib/hadoop \
-input access.log -output hits_by_country \
-python python2.6 -libegg pygeoip-0.1.1-py2.6.egg \
-file GeoLiteCity.dat -param geodata=GeoLiteCity.dat
$ dumbo cat hits_by_country/part-00000 -hadoop /usr/lib/hadoop/ | \
sort -k2,2nr | head -n 5
US      9400    388083137
KR      6714    2655270
DE      1859    32131992
RU      1838    44073038
CA      1055    23035208

At Last.fm, we use the GeoIP Python bindings instead of the pure-Python pygeoip module, which is nearly identical API-wise but might be a bit slower. Also, we abstract away the format of our Apache logs by using a parser class and we have some library code for identifying hits from robots as well, much like the IsBotUA() method in the Pig example.

Integration with Java code

June 16, 2009

Although Python has many advantages, you might still want to write some of your mappers or reducers in Java once in a while. Flexibility and speed are probably the most likely potential reasons. Thanks to a recent enhancement, this is now easily achievable. Here’s a version of wordcount.py that uses the example mapper and reducer from the feathers project (and thus requires -libjar feathers.jar):

import dumbo

You can basically mix up Python with Java in any way you like. There’s only one minor restriction: You cannot use a Python combiner when you specify a Java mapper. Things should still work in this case though, it’ll just be slow since the combiner won’t actually run. In theory, this limitation could be avoided by relying on HADOOP-4842, but personally I don’t think it’s worth the trouble.

The source code for fm.last.feathers.map.Words and fm.last.feathers.reduce.Sum is just as straightforward as the code for the OutputFormat classes discussed in my previous post. All you have to keep in mind is that only the mapper input keys and values can be arbitrary writables. Every other key or value has to be a TypedBytesWritable. Writing a custom Java partitioner for Dumbo programs is equally easy by the way. The fm.last.feather.partition.Prefix class is a simple example. It can be used by specifying -partitioner fm.last.feather.partition.Prefix.

As you probably expected already, none of this will work for local runs on UNIX, but you can still test things locally fairly easily by running on Hadoop in standalone mode.

Multiple outputs

June 8, 2009

Dumbo 0.21.20 adds support for multiple outputs by providing a -getpath option. Here’s an example:

from dumbo import run, sumreducer, opt

def mapper(key, value):
    for word in value.split():
        yield word, 1

@opt("getpath", "yes")
def reducer(key, values):
    yield (key[0].upper(), key), sum(values)

if __name__ == "__main__":
    run(mapper, reducer, combiner=sumreducer)

Running this splitwordcount.py program on my chrooted Cloudera-flavored Hadoop server (after updating Dumbo and building feathers.jar) gave me the following results:

$ dumbo splitwordcount.py -input brian.txt -output brianwc \
-hadoop /usr/lib/hadoop/ -python python2.5 -libjar feathers.jar
$ dumbo ls brianwc -hadoop /usr/lib/hadoop/
Found 17 items
drwxr-xr-x   - klaas [...] /user/klaas/brianwc/A
drwxr-xr-x   - klaas [...] /user/klaas/brianwc/B
drwxr-xr-x   - klaas [...] /user/klaas/brianwc/C
$ dumbo cat brianwc/B -hadoop /usr/lib/hadoop/
be      2
boy     1
Brian   6
became  2

So each ((<path>, <key>), <value>) pair got stored as (<key>, <value>) in <outputdir>/<path>. This only works when running on Hadoop, by the way. For a local run on UNIX everything would still end up in one file.

Under the hood, -getpath yes basically just makes sure that -outputformat sequencefile (which is the default when running on Hadoop) and -outputformat text get translated to -outputformat fm.last.feathers.output.MultipleSequenceFiles and -outputformat fm.last.feathers.output.MultipleTextFiles, respectively. These OutputFormat implementations are nice illustrations of how easy it can be to integrate Java code with Dumbo programs. The brand-new feathers project already provides a few other Java classes that can also easily be used by Dumbo programs, including a mapper and a reducer. I’ll try to find some time to ramble a bit about those as well, but that’s for another post.