Dumbo on Cloudera’s distribution

May 31, 2009

Over the last couple of days, I picked up some rumors concerning the inclusion of all patches on which Dumbo relies in the most recent version of Cloudera’s Hadoop distribution. Todd confirmed this to me yesterday, so the time was right to finally have a look at Cloudera’s nicely packaged and patched-up Hadoop.

I started from a chrooted Debian server, on which I installed the Cloudera distribution, Python 2.5, and Dumbo as follows:

# cat /etc/apt/sources.list
deb http://ftp.be.debian.org/debian etch main contrib non-free
deb http://www.backports.org/debian etch-backports main contrib non-free
deb http://archive.cloudera.com/debian etch contrib
deb-src http://archive.cloudera.com/debian etch contrib
# wget -O - http://backports.org/debian/archive.key | apt-key add -
# wget -O - http://archive.cloudera.com/debian/archive.key | apt-key add -
# apt-get update
# apt-get install hadoop python2.5 python2.5-dev
# wget http://peak.telecommunity.com/dist/ez_setup.py
# python2.5 ez_setup.py dumbo

Then, I created a user for myself and confirmed that the wordcount.py program runs properly on Cloudera’s distribution in standalone mode:

# adduser klaas
# su - klaas
$ wget http://bit.ly/wordcountpy http://bit.ly/briantxt
$ dumbo start wordcount.py -input brian.txt -output brianwc \
-python python2.5 -hadoop /usr/lib/hadoop/
$ dumbo cat brianwc -hadoop /usr/lib/hadoop/ | grep Brian
Brian   6

Unsurprisingly, it also worked perfectly in pseudo-distributed mode:

$ exit
# apt-get install hadoop-conf-pseudo
# /etc/init.d/hadoop-namenode start
# /etc/init.d/hadoop-secondarynamenode start
# /etc/init.d/hadoop-datanode start
# /etc/init.d/hadoop-jobtracker start
# /etc/init.d/hadoop-tasktracker start
# su - klaas
$ dumbo start wordcount.py -input brian.txt -output brianwc \
-python python2.5 -hadoop /usr/lib/hadoop/
$ dumbo rm brianwc/_logs -hadoop /usr/lib/hadoop/
Deleted hdfs://localhost/user/klaas/brianwc/_logs
$ dumbo cat brianwc -hadoop /usr/lib/hadoop/ | grep Brian
Brian   6

Note that I removed the _logs directory first because dumbo cat would’ve complained about it otherwise. You can avoid this minor annoyance by disabling the creation of _logs directories.

I also verified that HADOOP-5528 got included by running the join.py example successfully:

$ wget http://bit.ly/joinpy
$ wget http://bit.ly/hostnamestxt http://bit.ly/logstxt
$ dumbo put hostnames.txt hostnames.txt -hadoop /usr/lib/hadoop/
$ dumbo put logs.txt logs.txt -hadoop /usr/lib/hadoop/
$ dumbo start join.py -input hostnames.txt -input logs.txt \
-output joined -python python2.5 -hadoop /usr/lib/hadoop/
$ dumbo rm joined/_logs -hadoop /usr/lib/hadoop
$ dumbo cat joined -hadoop /usr/lib/hadoop | grep node1
node1   5

And while I was at it, I did a quick typedbytes versus ctypedbytes comparison as well:

$ zcat /usr/share/man/man1/python2.5.1.gz > python.man
$ for i in `seq 100000`; do cat python.man >> python.txt; done
$ du -h python.txt
1.2G    python.txt
$ dumbo put python.txt python.txt -hadoop /usr/lib/hadoop/
$ time dumbo start wordcount.py -input python.txt -output pywc \
-python python2.5 -hadoop /usr/lib/hadoop/
real    17m45.473s
user    0m1.380s
sys     0m0.224s
$ exit
# apt-get install gcc libc6-dev
# su - klaas
$ python2.5 ez_setup.py -zmaxd. ctypedbytes
$ time dumbo start wordcount.py -input python.txt -output pywc2 \
-python python2.5 -hadoop /usr/lib/hadoop/ \
-libegg ctypedbytes-0.1.5-py2.5-linux-i686.egg
real    13m22.420s
user    0m1.320s
sys     0m0.216s

In this particular case, ctypedbytes appears to be 25% faster. Your mileage may vary since the running times depend on many factors, but in any case I’d always expect ctypedbytes to lead to significant speed improvements.


Virtual Python environments

May 24, 2009

Judging from some of the questions about Dumbo development that keep popping up, virtual Python environments are apparently not that widely known and used yet. Therefore, I thought it made sense to write a quick post about them.

The virtualenv tool can be installed as follows:

$ wget http://peak.telecommunity.com/dist/ez_setup.py
$ python ez_setup.py virtualenv

While you’re at it, you might also want to install nose by doing

$ python ez_setup.py nose

since running unit tests sometimes doesn’t work if you don’t install this module manually. Once you got virtualenv installed, you can create and activate a virtual Python environment as follows:

$ mkdir ~/envs
$ virtualenv ~/envs/dumbo
$ source ~/envs/dumbo/bin/activate

You then get a slightly different prompt to remind you that you’re using the isolated virtual environment:

(dumbo)$ which python
/home/username/envs/dumbo/bin/python
(dumbo)$ deactivate
$ which python
/usr/bin/python

Such a virtual environment can be very convenient for developing and debugging Dumbo:

$ source ~/envs/dumbo/bin/activate
(dumbo)$ git clone git://github.com/klbostee/dumbo.git
(dumbo)$ cd dumbo
(dumbo)$ python setup.py test  # run unit tests
(dumbo)$ python setup.py develop  # install symlinks
(dumbo)$ which dumbo
/home/username/envs/dumbo/bin/dumbo
(dumbo)$ cd examples
(dumbo)$ dumbo start wordcount.py -input brian.txt -output out.code
(dumbo)$ dumbo cat out.code | head -n 2
A       2
And     4

Anything you change to the source code will then immediately affect the behavior of dumbo in the virtual environment, and none of this interferes with your global Python installation in any way. Soon enough, you’ll start wondering how you ever managed to live without virtual Python environments.

Note that running on Hadoop won’t work when you installed Dumbo via python setup.py develop. The develop command installs symlinks to the source files (such that you don’t have to run it after each change when you’re developing), but in order to be able to run on Hadoop an egg needs to be generated and installed, which is precisely what python setup.py install does.


TF-IDF revisited

May 17, 2009

Remember the buffering problems for the TF-IDF program discussed in a previous post as well as the lecture about MapReduce algorithms from Cloudera‘s free Hadoop training? Thanks to the new joining abstraction (and the minor fixes and enhancements in Dumbo 0.21.13 and 0.21.14), these problems can now easily be avoided:

from dumbo import *
from math import log

@opt("addpath", "yes")
def mapper1(key, value):
    for word in value.split():
        yield (key[0], word), 1

@primary
def mapper2a(key, value):
    yield key[0], value

@secondary
def mapper2b(key, value):
    yield key[0], (key[1], value)

@primary
def mapper3a(key, value):
    yield value[0], 1

@secondary
def mapper3b(key, value):
    yield value[0], (key, value[1])

class Reducer(JoinReducer):
    def __init__(self):
        self.sum = 0
    def primary(self, key, values):
        self.sum = sum(values)

class Combiner(JoinCombiner):
    def primary(self, key, values):
        yield key, sum(values)

class Reducer1(Reducer):
    def secondary(self, key, values):
        for (doc, n) in values:
            yield key, (doc, float(n) / self.sum)

class Reducer2(Reducer):
    def __init__(self):
        Reducer.__init__(self)
        self.doccount = float(self.params["doccount"])
    def secondary(self, key, values):
        idf = log(self.doccount / self.sum)
        for (doc, tf) in values:
            yield (key, doc), tf * idf

def runner(job):
    job.additer(mapper1, sumreducer, combiner=sumreducer)
    multimapper = MultiMapper()
    multimapper.add("", mapper2a)
    multimapper.add("", mapper2b)
    job.additer(multimapper, Reducer1, Combiner)
    multimapper = MultiMapper()
    multimapper.add("", mapper3a)
    multimapper.add("", mapper3b)
    job.additer(multimapper, Reducer2, Combiner)

if __name__ == "__main__":
    main(runner)

Most of this Dumbo program shouldn’t be hard to understand if you had a peek at the posts about hiding join keys and the @opt decorator, except maybe for the following things:

  • The first argument supplied to MultiMapper‘s add method is a string corresponding to the pattern that has to occur in the file path in order for the added mapper to run on the key/value pairs in a given file. Since the empty string “” is considered to occur in every possible path string, all added mappers run on each input file in this example program.
  • It is possible (but not necessary) to yield key/value pairs in a JoinReducer‘s primary method, as illustrated by the Combiner class in this example.
  • The default implementations of JoinReducer‘s primary and secondary methods are identity operations, so Combiner combines the primary pairs and just passes on the secondary ones.

Writing this program went surprisingly smooth and didn’t take much effort at all. Apparently, the “primary/secondary abstraction” works really well for me.


Hiding join keys

May 15, 2009

Dumbo 0.21.12 further simplifies joining by hiding the join keys from the “end-programmer”. For instance, the example discussed here can now be written as follows:

import dumbo
from dumbo.lib import MultiMapper, JoinReducer
from dumbo.decor import primary, secondary

def mapper(key, value):
    yield value.split("\t", 1)

class Reducer(JoinReducer):
    def primary(self, key, values):
        self.hostname = values.next()
    def secondary(self, key, values):
        key = self.hostname
        for value in values:
            yield key, value

if __name__ == "__main__":
    multimapper = MultiMapper()
    multimapper.add("hostnames", primary(mapper))
    multimapper.add("logs", secondary(mapper))
    dumbo.run(multimapper, Reducer)

These are the things to note in this fancier version:

  • The mapping is implemented by combining decorated mappers into one MultiMapper.
  • The reducing is implemented by extending JoinReducer.
  • There is no direct interaction with the join keys. In fact, the -joinkeys yes option doesn’t even get specified explicitly (the decorated mappers and JoinReducer automatically make sure this option gets added via their opts attributes).

The primary and secondary decorators can, of course, also be applied using the @decorator syntax, i.e., I could also have written

@primary
def mapper1(key, value):
    yield value.split("\t", 1)

@secondary
def mapper2(key, value):
    yield value.split("\t", 1)

and

    multimapper.add("hostnames", mapper1)
    multimapper.add("logs", mapper2)

This is less convenient for this particular example, but it might be preferable when your primary and secondary mapper have different implementations.


The @opt decorator

May 13, 2009

I added the @opt decorator to Dumbo yesterday, and decided it was useful enough to justify a new minor release. Using this decorator, mappers and reducers can specify (additional) options they rely on. For instance, you can now write

from dumbo import opt, run, sumreducer

@opt("addpath", "yes")
def mapper(key, value):
    for word in value.split():
        yield (word, key[0]), 1

if __name__ == "__main__":
    run(mapper, sumreducer, combiner=sumreducer)

instead of

from dumbo import main, sumreducer

def mapper(key, value):
    for word in value.split():
        yield (word, key[0]), 1

def runner(job):
    job.additer(mapper, sumreducer, combiner=sumreducer)

def starter(prog):
    prog.addopt("addpath", "yes")

if __name__ == "__main__":
    main(runner, starter)

to count words on a file per file basis (recall that the -addpath yes option makes sure the file path is prepended to the key passed to the mapper). The former version is not only shorter, but also more clear, since the option specification is closer to the code that relies on it.

Under the hood, the @opt decorator appends the given option to an opts list attribute. For mapper and reducer classes, you can just set this attribute directly:

class Mapper:
    opts = [("addpath", "yes")]
    def __call__(self, key, value):
        for word in value.split():
            yield (word, key[0]), 1

if __name__ == "__main__":
    from dumbo import run, sumreducer
    run(Mapper, sumreducer, combiner=sumreducer)

Furthermore, it’s now also possible to pass an options list to run() or additer() via the opts argument:

def mapper(key, value):
    for word in value.split():
        yield (word, key[0]), 1

if __name__ == "__main__":
    from dumbo import run, sumreducer
    opts = [("addpath", "yes")]
    run(mapper, sumreducer, combiner=sumreducer, opts=opts)

which could be handy when you want to use join keys for only one iteration, for example.


Powered by Dumbo?

May 9, 2009

I’ve slowly started taking on the slightly daunting task of writing my Ph.D. dissertation, and I’m considering including a chapter about Dumbo and Hadoop. However, thinking about this made me realize that I’m pretty clueless as to how many people are using Dumbo, and for what purposes it’s being used outside of Last.fm. I know for a fact that CBSi started using it recently, and there are a few other companies like Lookery that appear to be making use of it it as well, but I don’t really know what they’re using it for exactly, and judging from the number of questions I keep getting there must be more people out there who are using Dumbo for non-toy projects. So, if you aren’t just reading this blog out of personal interest, please drop me a line at klaas at last dot fm or add a comment to this post. It’ll make my day, and you might get an honorable mention in my dissertation. When the list is long enough, I might even devote an entire wiki page to it as well.


BinaryPartitioner backported to 0.18

May 6, 2009

Today, Tim put a smile on the faces of many Dumbo users at Last.fm by backporting HADOOP-5528 to Hadoop 0.18. Now that his backported patch has been deployed, we finally get to use join keys on our production clusters, allowing us to join more easily and avoid the memory-hungry alternative ways of joining datasets.