Consuming Dumbo output with Pig

February 5, 2010

Although it abstracts and simplifies it all quite a bit, Dumbo still forces you to think in MapReduce, which might not be ideal if you want to implement complex data flows in a limited amount of time. Personally, I think that Dumbo still occupies a useful space within the Hadoop ecosystem, but in some cases it makes sense to work at an even higher level and use something like Pig or Hive. In fact, sometimes it makes sense to combine the two and do some parts of your data flow in Dumbo and others in Pig. To make this possible, I recently wrote a Pig loader function for sequence files that contain TypedBytesWritables, which is the file format Dumbo uses by default to store all its output on Hadoop. Here’s an example of a Pig script that reads Dumbo output:

register pigtail.jar;  -- http://github.com/klbostee/pigtail

a = load '/hdfs/path/to/dumbo/output'
    using fm.last.pigtail.storage.TypedBytesSequenceFileLoader()
    as (artist:int, val:(listeners:int, listens:int));
b = foreach a generate artist, val.listeners as listeners;
c = order b by listeners;
d = limit c 100;

dump d;

You basically just have to specify names and types for the components of the key/value pairs and you’re good to go.

A possibly useful side-effect of writing this loader is the ability it creates of reading all sorts of file formats with Pig. Everything that Dumbo can read can also be consumed by Pig scripts now, all you have to do is write a simple Dumbo script that converts it to typed bytes sequence files:

from dumbo import run
from dumbo.lib import identitymapper

if __name__ == "__main__":
    run(identitymapper)

The proper solution is of course to write custom Pig loaders, but this gets the job done too and doesn’t slow things down that much.


Reading Hadoop records in Python

December 23, 2009

At the 11/18 Bay Area HUG, Paul Tarjan apparently presented an approach for reading Hadoop records in Python. In summary, his approach seems to work as follows:

Hadoop records
     → CsvRecordInput
         → hadoop_record Python module

Although it’s a nice and very systematic solution, I couldn’t resist blogging about an already existing alternative solution for this problem:

Hadoop records
     → TypedBytesRecordInput
         → typedbytes Python module

Not only would this have saved Paul a lot of work, it probably also would’ve been more efficient, especially when using ctypedbytes, the speedy variant of the typedbytes module.


Dumbo on Amazon EMR

December 23, 2009

A while ago, I received an email from Andrew in which he wrote:

Now you should be able to run Dumbo jobs on Elastic MapReduce. To start a cluster, you can use the Ruby client as so:

$ elastic-mapreduce --create --alive

SSH into the cluster using your EC2 keypair as user hadoop and install Dumbo with the following two commands:

$ wget http://bit.ly/ezsetup
$ sudo python ez_setup.py dumbo

Then you can run your Dumbo scripts. I was able to run the ipcount.py demo with the following command.

$ dumbo start ipcount.py -hadoop /home/hadoop \
-input s3://anhi-test-data/wordcount/input/ \
-output s3://anhi-test-data/output/dumbo/wc/

The -hadoop option is important. At this point I haven’t created an automatic Dumbo install script, so you’ll have to install Dumbo by hand each time you launch the cluster. Fortunately installation is easy.

There was a minor hiccup that required the Amazon guys to pull the AMI with Dumbo support, but it’s back now and they seem to be confident that Dumbo support is going to remain available from now on. They are also still planning to make things even easier by providing an automatic Dumbo installation script.

As an aside, it’s worth mentioning that a bug in Hadoop Streaming got fixed in the process of adding Dumbo support to EMR. I can’t wait to see what else the Amazon guys have up their sleeves.


Moving to Hadoop 0.20

November 23, 2009

We’ve finally started looking into moving from Hadoop 0.18 to 0.20 at Last.fm, and I thought it might be useful to share a few Dumbo-related things I learned in the process:

  • We’re probably going to base our 0.20 build on Cloudera’s 0.20 distribution, and I found out the hard way that Dumbo doesn’t work on version 0.20.1+133 of this distribution because it includes a patch for MAPREDUCE-967 that breaks some of the Hadoop Streaming functionality on which Dumbo relies. Luckily, the Cloudera guys fixed it in 0.20.1+152 by reverting this patch, but if you’re still trying to get Dumbo to work on Cloudera’s 0.20.1+133 distribution for some reason then you can expect to get NullPointerExceptions and errors like, e.g., “module wordcount not found” in your tasks’ stderr logs.
  • Also, the Cloudera guys apparently haven’t added the patch for MAPREDUCE-764 to their distribution yet, so you’ll still have to apply this patch yourself if you want to avoid strange encoding problems in certain corner cases. This patch has now been reviewed and accepted for Hadoop 0.21 for quite a while already though, so maybe we can be hopeful about it getting included in Cloudera’s 0.20 distribution soon.
  • The Twitter guys put together a pretty awesome patched and backported version of hadoop-gpl-compression for Hadoop 0.20. It includes several bugfixes and it also provides an InputFormat for the old API, which is useful for Hadoop Streaming (and hence also Dumbo) users since Streaming has not been converted to the new API yet. If you’re interested in this stuff, you might want to have a look at this guest post from Kevin and Eric on the Cloudera blog.

Dumbo over HBase

July 31, 2009

This should be old news for dumbo-user subscribers, but Tim has, once again, put his Java coding skills to good use. This time around he created nifty input and output formats for consuming and/or producing HBase tables from Dumbo programs. Here’s a silly but illustrative example:

from dumbo import opt, run

@opt("inputformat", "fm.last.hbase.mapred.TypedBytesTableInputFormat")
@opt("hadoopconf", "hbase.mapred.tablecolumns=testfamily:testqualifier")
def mapper(key, columns):
    for family, column in columns.iteritems():
        for qualifier, value in column.iteritems():
            yield key, (family, qualifier, value)

@opt("outputformat", "fm.last.hbase.mapred.TypedBytesTableOutputFormat")
def reducer(key, values):
    columns = {}
    for family, qualifier, value in values:
        column = columns.get(family, {})
        column[qualifier] = value
    yield key, columns

if __name__ == "__main__":
    run(mapper, reducer)

Have a look at the readme for more information.


Integration with Java code

June 16, 2009

Although Python has many advantages, you might still want to write some of your mappers or reducers in Java once in a while. Flexibility and speed are probably the most likely potential reasons. Thanks to a recent enhancement, this is now easily achievable. Here’s a version of wordcount.py that uses the example mapper and reducer from the feathers project (and thus requires -libjar feathers.jar):

import dumbo
dumbo.run("fm.last.feathers.map.Words",
          "fm.last.feathers.reduce.Sum",
          combiner="fm.last.feathers.reduce.Sum")

You can basically mix up Python with Java in any way you like. There’s only one minor restriction: You cannot use a Python combiner when you specify a Java mapper. Things should still work in this case though, it’ll just be slow since the combiner won’t actually run. In theory, this limitation could be avoided by relying on HADOOP-4842, but personally I don’t think it’s worth the trouble.

The source code for fm.last.feathers.map.Words and fm.last.feathers.reduce.Sum is just as straightforward as the code for the OutputFormat classes discussed in my previous post. All you have to keep in mind is that only the mapper input keys and values can be arbitrary writables. Every other key or value has to be a TypedBytesWritable. Writing a custom Java partitioner for Dumbo programs is equally easy by the way. The fm.last.feather.partition.Prefix class is a simple example. It can be used by specifying -partitioner fm.last.feather.partition.Prefix.

As you probably expected already, none of this will work for local runs on UNIX, but you can still test things locally fairly easily by running on Hadoop in standalone mode.


Multiple outputs

June 8, 2009

Dumbo 0.21.20 adds support for multiple outputs by providing a -getpath option. Here’s an example:

from dumbo import run, sumreducer, opt

def mapper(key, value):
    for word in value.split():
        yield word, 1

@opt("getpath", "yes")
def reducer(key, values):
    yield (key[0].upper(), key), sum(values)

if __name__ == "__main__":
    run(mapper, reducer, combiner=sumreducer)

Running this splitwordcount.py program on my chrooted Cloudera-flavored Hadoop server (after updating Dumbo and building feathers.jar) gave me the following results:

$ dumbo splitwordcount.py -input brian.txt -output brianwc \
-hadoop /usr/lib/hadoop/ -python python2.5 -libjar feathers.jar
[...]
$ dumbo ls brianwc -hadoop /usr/lib/hadoop/
Found 17 items
drwxr-xr-x   - klaas [...] /user/klaas/brianwc/A
drwxr-xr-x   - klaas [...] /user/klaas/brianwc/B
drwxr-xr-x   - klaas [...] /user/klaas/brianwc/C
[...]
$ dumbo cat brianwc/B -hadoop /usr/lib/hadoop/
be      2
boy     1
Brian   6
became  2

So each ((<path>, <key>), <value>) pair got stored as (<key>, <value>) in <outputdir>/<path>. This only works when running on Hadoop, by the way. For a local run on UNIX everything would still end up in one file.

Under the hood, -getpath yes basically just makes sure that -outputformat sequencefile (which is the default when running on Hadoop) and -outputformat text get translated to -outputformat fm.last.feathers.output.MultipleSequenceFiles and -outputformat fm.last.feathers.output.MultipleTextFiles, respectively. These OutputFormat implementations are nice illustrations of how easy it can be to integrate Java code with Dumbo programs. The brand-new feathers project already provides a few other Java classes that can also easily be used by Dumbo programs, including a mapper and a reducer. I’ll try to find some time to ramble a bit about those as well, but that’s for another post.


Dumbo on Cloudera’s distribution

May 31, 2009

Over the last couple of days, I picked up some rumors concerning the inclusion of all patches on which Dumbo relies in the most recent version of Cloudera’s Hadoop distribution. Todd confirmed this to me yesterday, so the time was right to finally have a look at Cloudera’s nicely packaged and patched-up Hadoop.

I started from a chrooted Debian server, on which I installed the Cloudera distribution, Python 2.5, and Dumbo as follows:

# cat /etc/apt/sources.list
deb http://ftp.be.debian.org/debian etch main contrib non-free
deb http://www.backports.org/debian etch-backports main contrib non-free
deb http://archive.cloudera.com/debian etch contrib
deb-src http://archive.cloudera.com/debian etch contrib
# wget -O - http://backports.org/debian/archive.key | apt-key add -
# wget -O - http://archive.cloudera.com/debian/archive.key | apt-key add -
# apt-get update
# apt-get install hadoop python2.5 python2.5-dev
# wget http://peak.telecommunity.com/dist/ez_setup.py
# python2.5 ez_setup.py dumbo

Then, I created a user for myself and confirmed that the wordcount.py program runs properly on Cloudera’s distribution in standalone mode:

# adduser klaas
# su - klaas
$ wget http://bit.ly/wordcountpy http://bit.ly/briantxt
$ dumbo start wordcount.py -input brian.txt -output brianwc \
-python python2.5 -hadoop /usr/lib/hadoop/
$ dumbo cat brianwc -hadoop /usr/lib/hadoop/ | grep Brian
Brian   6

Unsurprisingly, it also worked perfectly in pseudo-distributed mode:

$ exit
# apt-get install hadoop-conf-pseudo
# /etc/init.d/hadoop-namenode start
# /etc/init.d/hadoop-secondarynamenode start
# /etc/init.d/hadoop-datanode start
# /etc/init.d/hadoop-jobtracker start
# /etc/init.d/hadoop-tasktracker start
# su - klaas
$ dumbo start wordcount.py -input brian.txt -output brianwc \
-python python2.5 -hadoop /usr/lib/hadoop/
$ dumbo rm brianwc/_logs -hadoop /usr/lib/hadoop/
Deleted hdfs://localhost/user/klaas/brianwc/_logs
$ dumbo cat brianwc -hadoop /usr/lib/hadoop/ | grep Brian
Brian   6

Note that I removed the _logs directory first because dumbo cat would’ve complained about it otherwise. You can avoid this minor annoyance by disabling the creation of _logs directories.

I also verified that HADOOP-5528 got included by running the join.py example successfully:

$ wget http://bit.ly/joinpy
$ wget http://bit.ly/hostnamestxt http://bit.ly/logstxt
$ dumbo put hostnames.txt hostnames.txt -hadoop /usr/lib/hadoop/
$ dumbo put logs.txt logs.txt -hadoop /usr/lib/hadoop/
$ dumbo start join.py -input hostnames.txt -input logs.txt \
-output joined -python python2.5 -hadoop /usr/lib/hadoop/
$ dumbo rm joined/_logs -hadoop /usr/lib/hadoop
$ dumbo cat joined -hadoop /usr/lib/hadoop | grep node1
node1   5

And while I was at it, I did a quick typedbytes versus ctypedbytes comparison as well:

$ zcat /usr/share/man/man1/python2.5.1.gz > python.man
$ for i in `seq 100000`; do cat python.man >> python.txt; done
$ du -h python.txt
1.2G    python.txt
$ dumbo put python.txt python.txt -hadoop /usr/lib/hadoop/
$ time dumbo start wordcount.py -input python.txt -output pywc \
-python python2.5 -hadoop /usr/lib/hadoop/
real    17m45.473s
user    0m1.380s
sys     0m0.224s
$ exit
# apt-get install gcc libc6-dev
# su - klaas
$ python2.5 ez_setup.py -zmaxd. ctypedbytes
$ time dumbo start wordcount.py -input python.txt -output pywc2 \
-python python2.5 -hadoop /usr/lib/hadoop/ \
-libegg ctypedbytes-0.1.5-py2.5-linux-i686.egg
real    13m22.420s
user    0m1.320s
sys     0m0.216s

In this particular case, ctypedbytes appears to be 25% faster. Your mileage may vary since the running times depend on many factors, but in any case I’d always expect ctypedbytes to lead to significant speed improvements.


Virtual Python environments

May 24, 2009

Judging from some of the questions about Dumbo development that keep popping up, virtual Python environments are apparently not that widely known and used yet. Therefore, I thought it made sense to write a quick post about them.

The virtualenv tool can be installed as follows:

$ wget http://peak.telecommunity.com/dist/ez_setup.py
$ python ez_setup.py virtualenv

While you’re at it, you might also want to install nose by doing

$ python ez_setup.py nose

since running unit tests sometimes doesn’t work if you don’t install this module manually. Once you got virtualenv installed, you can create and activate a virtual Python environment as follows:

$ mkdir ~/envs
$ virtualenv ~/envs/dumbo
$ source ~/envs/dumbo/bin/activate

You then get a slightly different prompt to remind you that you’re using the isolated virtual environment:

(dumbo)$ which python
/home/username/envs/dumbo/bin/python
(dumbo)$ deactivate
$ which python
/usr/bin/python

Such a virtual environment can be very convenient for developing and debugging Dumbo:

$ source ~/envs/dumbo/bin/activate
(dumbo)$ git clone git://github.com/klbostee/dumbo.git
(dumbo)$ cd dumbo
(dumbo)$ python setup.py test  # run unit tests
(dumbo)$ python setup.py develop  # install symlinks
(dumbo)$ which dumbo
/home/username/envs/dumbo/bin/dumbo
(dumbo)$ cd examples
(dumbo)$ dumbo start wordcount.py -input brian.txt -output out.code
(dumbo)$ dumbo cat out.code | head -n 2
A       2
And     4

Anything you change to the source code will then immediately affect the behavior of dumbo in the virtual environment, and none of this interferes with your global Python installation in any way. Soon enough, you’ll start wondering how you ever managed to live without virtual Python environments.

Note that running on Hadoop won’t work when you installed Dumbo via python setup.py develop. The develop command installs symlinks to the source files (such that you don’t have to run it after each change when you’re developing), but in order to be able to run on Hadoop an egg needs to be generated and installed, which is precisely what python setup.py install does.


TF-IDF revisited

May 17, 2009

Remember the buffering problems for the TF-IDF program discussed in a previous post as well as the lecture about MapReduce algorithms from Cloudera’s free Hadoop training? Thanks to the new joining abstraction (and the minor fixes and enhancements in Dumbo 0.21.13 and 0.21.14), these problems can now easily be avoided:

from dumbo import *
from math import log

@opt("addpath", "yes")
def mapper1(key, value):
    for word in value.split():
        yield (key[0], word), 1

@primary
def mapper2a(key, value):
    yield key[0], value

@secondary
def mapper2b(key, value):
    yield key[0], (key[1], value)

@primary
def mapper3a(key, value):
    yield value[0], 1

@secondary
def mapper3b(key, value):
    yield value[0], (key, value[1])

class Reducer(JoinReducer):
    def __init__(self):
        self.sum = 0
    def primary(self, key, values):
        self.sum = sum(values)

class Combiner(JoinReducer):
    def primary(self, key, values):
        yield key, sum(values)

class Reducer1(Reducer):
    def secondary(self, key, values):
        for (doc, n) in values:
            yield key, (doc, float(n) / self.sum)

class Reducer2(Reducer):
    def __init__(self):
        Reducer.__init__(self)
        self.doccount = float(self.params["doccount"])
    def secondary(self, key, values):
        idf = log(self.doccount / self.sum)
        for (doc, tf) in values:
            yield (key, doc), tf * idf

def runner(job):
    job.additer(mapper1, sumreducer, combiner=sumreducer)
    multimapper = MultiMapper()
    multimapper.add("", mapper2a)
    multimapper.add("", mapper2b)
    job.additer(multimapper, Reducer1, Combiner)
    multimapper = MultiMapper()
    multimapper.add("", mapper3a)
    multimapper.add("", mapper3b)
    job.additer(multimapper, Reducer2, Combiner)

if __name__ == "__main__":
    main(runner)

Most of this Dumbo program shouldn’t be hard to understand if you had a peek at the posts about hiding join keys and the @opt decorator, except maybe for the following things:

  • The first argument supplied to MultiMapper’s add method is a string corresponding to the pattern that has to occur in the file path in order for the added mapper to run on the key/value pairs in a given file. Since the empty string “” is considered to occur in every possible path string, all added mappers run on each input file in this example program.
  • It is possible (but not necessary) to yield key/value pairs in a JoinReducer’s primary method, as illustrated by the Combiner class in this example.
  • The default implementations of JoinReducer’s primary and secondary methods are identity operations, so Combiner combines the primary pairs and just passes on the secondary ones.

Writing this program went surprisingly smooth and didn’t take much effort at all. Apparently, the “primary/secondary abstraction” works really well for me.