Dumbo on Cloudera’s distribution

May 31, 2009

Over the last couple of days, I picked up some rumors concerning the inclusion of all patches on which Dumbo relies in the most recent version of Cloudera’s Hadoop distribution. Todd confirmed this to me yesterday, so the time was right to finally have a look at Cloudera’s nicely packaged and patched-up Hadoop.

I started from a chrooted Debian server, on which I installed the Cloudera distribution, Python 2.5, and Dumbo as follows:

# cat /etc/apt/sources.list
deb http://ftp.be.debian.org/debian etch main contrib non-free
deb http://www.backports.org/debian etch-backports main contrib non-free
deb http://archive.cloudera.com/debian etch contrib
deb-src http://archive.cloudera.com/debian etch contrib
# wget -O - http://backports.org/debian/archive.key | apt-key add -
# wget -O - http://archive.cloudera.com/debian/archive.key | apt-key add -
# apt-get update
# apt-get install hadoop python2.5 python2.5-dev
# wget http://peak.telecommunity.com/dist/ez_setup.py
# python2.5 ez_setup.py dumbo

Then, I created a user for myself and confirmed that the wordcount.py program runs properly on Cloudera’s distribution in standalone mode:

# adduser klaas
# su - klaas
$ wget http://bit.ly/wordcountpy http://bit.ly/briantxt
$ dumbo start wordcount.py -input brian.txt -output brianwc \
-python python2.5 -hadoop /usr/lib/hadoop/
$ dumbo cat brianwc -hadoop /usr/lib/hadoop/ | grep Brian
Brian   6

Unsurprisingly, it also worked perfectly in pseudo-distributed mode:

$ exit
# apt-get install hadoop-conf-pseudo
# /etc/init.d/hadoop-namenode start
# /etc/init.d/hadoop-secondarynamenode start
# /etc/init.d/hadoop-datanode start
# /etc/init.d/hadoop-jobtracker start
# /etc/init.d/hadoop-tasktracker start
# su - klaas
$ dumbo start wordcount.py -input brian.txt -output brianwc \
-python python2.5 -hadoop /usr/lib/hadoop/
$ dumbo rm brianwc/_logs -hadoop /usr/lib/hadoop/
Deleted hdfs://localhost/user/klaas/brianwc/_logs
$ dumbo cat brianwc -hadoop /usr/lib/hadoop/ | grep Brian
Brian   6

Note that I removed the _logs directory first because dumbo cat would’ve complained about it otherwise. You can avoid this minor annoyance by disabling the creation of _logs directories.

I also verified that HADOOP-5528 got included by running the join.py example successfully:

$ wget http://bit.ly/joinpy
$ wget http://bit.ly/hostnamestxt http://bit.ly/logstxt
$ dumbo put hostnames.txt hostnames.txt -hadoop /usr/lib/hadoop/
$ dumbo put logs.txt logs.txt -hadoop /usr/lib/hadoop/
$ dumbo start join.py -input hostnames.txt -input logs.txt \
-output joined -python python2.5 -hadoop /usr/lib/hadoop/
$ dumbo rm joined/_logs -hadoop /usr/lib/hadoop
$ dumbo cat joined -hadoop /usr/lib/hadoop | grep node1
node1   5

And while I was at it, I did a quick typedbytes versus ctypedbytes comparison as well:

$ zcat /usr/share/man/man1/python2.5.1.gz > python.man
$ for i in `seq 100000`; do cat python.man >> python.txt; done
$ du -h python.txt
1.2G    python.txt
$ dumbo put python.txt python.txt -hadoop /usr/lib/hadoop/
$ time dumbo start wordcount.py -input python.txt -output pywc \
-python python2.5 -hadoop /usr/lib/hadoop/
real    17m45.473s
user    0m1.380s
sys     0m0.224s
$ exit
# apt-get install gcc libc6-dev
# su - klaas
$ python2.5 ez_setup.py -zmaxd. ctypedbytes
$ time dumbo start wordcount.py -input python.txt -output pywc2 \
-python python2.5 -hadoop /usr/lib/hadoop/ \
-libegg ctypedbytes-0.1.5-py2.5-linux-i686.egg
real    13m22.420s
user    0m1.320s
sys     0m0.216s

In this particular case, ctypedbytes appears to be 25% faster. Your mileage may vary since the running times depend on many factors, but in any case I’d always expect ctypedbytes to lead to significant speed improvements.

Advertisements

Virtual Python environments

May 24, 2009

Judging from some of the questions about Dumbo development that keep popping up, virtual Python environments are apparently not that widely known and used yet. Therefore, I thought it made sense to write a quick post about them.

The virtualenv tool can be installed as follows:

$ wget http://peak.telecommunity.com/dist/ez_setup.py
$ python ez_setup.py virtualenv

While you’re at it, you might also want to install nose by doing

$ python ez_setup.py nose

since running unit tests sometimes doesn’t work if you don’t install this module manually. Once you got virtualenv installed, you can create and activate a virtual Python environment as follows:

$ mkdir ~/envs
$ virtualenv ~/envs/dumbo
$ source ~/envs/dumbo/bin/activate

You then get a slightly different prompt to remind you that you’re using the isolated virtual environment:

(dumbo)$ which python
/home/username/envs/dumbo/bin/python
(dumbo)$ deactivate
$ which python
/usr/bin/python

Such a virtual environment can be very convenient for developing and debugging Dumbo:

$ source ~/envs/dumbo/bin/activate
(dumbo)$ git clone git://github.com/klbostee/dumbo.git
(dumbo)$ cd dumbo
(dumbo)$ python setup.py test  # run unit tests
(dumbo)$ python setup.py develop  # install symlinks
(dumbo)$ which dumbo
/home/username/envs/dumbo/bin/dumbo
(dumbo)$ cd examples
(dumbo)$ dumbo start wordcount.py -input brian.txt -output out.code
(dumbo)$ dumbo cat out.code | head -n 2
A       2
And     4

Anything you change to the source code will then immediately affect the behavior of dumbo in the virtual environment, and none of this interferes with your global Python installation in any way. Soon enough, you’ll start wondering how you ever managed to live without virtual Python environments.

Note that running on Hadoop won’t work when you installed Dumbo via python setup.py develop. The develop command installs symlinks to the source files (such that you don’t have to run it after each change when you’re developing), but in order to be able to run on Hadoop an egg needs to be generated and installed, which is precisely what python setup.py install does.


TF-IDF revisited

May 17, 2009

Remember the buffering problems for the TF-IDF program discussed in a previous post as well as the lecture about MapReduce algorithms from Cloudera‘s free Hadoop training? Thanks to the new joining abstraction (and the minor fixes and enhancements in Dumbo 0.21.13 and 0.21.14), these problems can now easily be avoided:

from dumbo import *
from math import log

@opt("addpath", "yes")
def mapper1(key, value):
    for word in value.split():
        yield (key[0], word), 1

@primary
def mapper2a(key, value):
    yield key[0], value

@secondary
def mapper2b(key, value):
    yield key[0], (key[1], value)

@primary
def mapper3a(key, value):
    yield value[0], 1

@secondary
def mapper3b(key, value):
    yield value[0], (key, value[1])

class Reducer(JoinReducer):
    def __init__(self):
        self.sum = 0
    def primary(self, key, values):
        self.sum = sum(values)

class Combiner(JoinCombiner):
    def primary(self, key, values):
        yield key, sum(values)

class Reducer1(Reducer):
    def secondary(self, key, values):
        for (doc, n) in values:
            yield key, (doc, float(n) / self.sum)

class Reducer2(Reducer):
    def __init__(self):
        Reducer.__init__(self)
        self.doccount = float(self.params["doccount"])
    def secondary(self, key, values):
        idf = log(self.doccount / self.sum)
        for (doc, tf) in values:
            yield (key, doc), tf * idf

def runner(job):
    job.additer(mapper1, sumreducer, combiner=sumreducer)
    multimapper = MultiMapper()
    multimapper.add("", mapper2a)
    multimapper.add("", mapper2b)
    job.additer(multimapper, Reducer1, Combiner)
    multimapper = MultiMapper()
    multimapper.add("", mapper3a)
    multimapper.add("", mapper3b)
    job.additer(multimapper, Reducer2, Combiner)

if __name__ == "__main__":
    main(runner)

Most of this Dumbo program shouldn’t be hard to understand if you had a peek at the posts about hiding join keys and the @opt decorator, except maybe for the following things:

  • The first argument supplied to MultiMapper‘s add method is a string corresponding to the pattern that has to occur in the file path in order for the added mapper to run on the key/value pairs in a given file. Since the empty string “” is considered to occur in every possible path string, all added mappers run on each input file in this example program.
  • It is possible (but not necessary) to yield key/value pairs in a JoinReducer‘s primary method, as illustrated by the Combiner class in this example.
  • The default implementations of JoinReducer‘s primary and secondary methods are identity operations, so Combiner combines the primary pairs and just passes on the secondary ones.

Writing this program went surprisingly smooth and didn’t take much effort at all. Apparently, the “primary/secondary abstraction” works really well for me.


Hiding join keys

May 15, 2009

Dumbo 0.21.12 further simplifies joining by hiding the join keys from the “end-programmer”. For instance, the example discussed here can now be written as follows:

import dumbo
from dumbo.lib import MultiMapper, JoinReducer
from dumbo.decor import primary, secondary

def mapper(key, value):
    yield value.split("\t", 1)

class Reducer(JoinReducer):
    def primary(self, key, values):
        self.hostname = values.next()
    def secondary(self, key, values):
        key = self.hostname
        for value in values:
            yield key, value

if __name__ == "__main__":
    multimapper = MultiMapper()
    multimapper.add("hostnames", primary(mapper))
    multimapper.add("logs", secondary(mapper))
    dumbo.run(multimapper, Reducer)

These are the things to note in this fancier version:

  • The mapping is implemented by combining decorated mappers into one MultiMapper.
  • The reducing is implemented by extending JoinReducer.
  • There is no direct interaction with the join keys. In fact, the -joinkeys yes option doesn’t even get specified explicitly (the decorated mappers and JoinReducer automatically make sure this option gets added via their opts attributes).

The primary and secondary decorators can, of course, also be applied using the @decorator syntax, i.e., I could also have written

@primary
def mapper1(key, value):
    yield value.split("\t", 1)

@secondary
def mapper2(key, value):
    yield value.split("\t", 1)

and

    multimapper.add("hostnames", mapper1)
    multimapper.add("logs", mapper2)

This is less convenient for this particular example, but it might be preferable when your primary and secondary mapper have different implementations.


The @opt decorator

May 13, 2009

I added the @opt decorator to Dumbo yesterday, and decided it was useful enough to justify a new minor release. Using this decorator, mappers and reducers can specify (additional) options they rely on. For instance, you can now write

from dumbo import opt, run, sumreducer

@opt("addpath", "yes")
def mapper(key, value):
    for word in value.split():
        yield (word, key[0]), 1

if __name__ == "__main__":
    run(mapper, sumreducer, combiner=sumreducer)

instead of

from dumbo import main, sumreducer

def mapper(key, value):
    for word in value.split():
        yield (word, key[0]), 1

def runner(job):
    job.additer(mapper, sumreducer, combiner=sumreducer)

def starter(prog):
    prog.addopt("addpath", "yes")

if __name__ == "__main__":
    main(runner, starter)

to count words on a file per file basis (recall that the -addpath yes option makes sure the file path is prepended to the key passed to the mapper). The former version is not only shorter, but also more clear, since the option specification is closer to the code that relies on it.

Under the hood, the @opt decorator appends the given option to an opts list attribute. For mapper and reducer classes, you can just set this attribute directly:

class Mapper:
    opts = [("addpath", "yes")]
    def __call__(self, key, value):
        for word in value.split():
            yield (word, key[0]), 1

if __name__ == "__main__":
    from dumbo import run, sumreducer
    run(Mapper, sumreducer, combiner=sumreducer)

Furthermore, it’s now also possible to pass an options list to run() or additer() via the opts argument:

def mapper(key, value):
    for word in value.split():
        yield (word, key[0]), 1

if __name__ == "__main__":
    from dumbo import run, sumreducer
    opts = [("addpath", "yes")]
    run(mapper, sumreducer, combiner=sumreducer, opts=opts)

which could be handy when you want to use join keys for only one iteration, for example.


Fast Python module for typed bytes

April 13, 2009

Over the past few days, I spent some time implementing a typed bytes Python module in C. It’s probably not quite ready for production use yet, and it still falls back to the pure python module for floats, but it seems to work fine and already leads to substantial speedups.

For example, the Python program

from typedbytes import Output
Output(open("test.tb", "wb")).writes(xrange(10**7))

needs 18.8 secs to finish on this laptop, whereas it requires only 0.9 secs after replacing typedbytes with ctypedbytes. Similarly, the running time for

from typedbytes import Input
for item in Input(open("test.tb", "rb")).reads(): pass

can be reduced from 22.9 to merely 1.7 secs by using ctypedbytes instead of typedbytes.

Obviously, Dumbo programs can benefit from this faster typed bytes module as well, but the gains probably won’t be as spectacular as for the simple test programs above. To give it a go, make sure you’re using the latest version of Dumbo, build an egg for the ctypedbytes module, and add the following option to your start command:

-libegg <path to ctypedbytes egg>

From what I’ve seen so far, this can speed up Dumbo programs by 30%, which definitely makes it worth the effort if you ask me. In fact, the Dumbo program would now probably beat the Java program in the benchmark discussed here, but, unfortunately, this wouldn’t be a very fair comparison. Johan recently made me aware of the fact that it’s better to avoid Java’s split() method for strings when you don’t need regular expression support, and using a combination of substring() and indexOf() instead seems to make the Java program about 40% faster. So we’re not quite as fast as Java yet, but at least the gap got narrowed down some more.


Computing TF-IDF weights

March 15, 2009

The guys from Cloudera are now providing their basic Hadoop training for free online. The lecture about MapReduce algorithms was the first one I watched, and I couldn’t resist writing a Dumbo program that implements the last algorithm discussed in this lecture:

from dumbo import main, sumreducer
from math import log

def mapper1(key, value):
    for word in value.split():
        yield (word, key[0]), 1

def mapper2(key, value):
    yield key[1], (key[0], value)

def reducer2(key, values):
    values = list(values)
    N = sum(value[1] for value in values)
    for (word, n) in values:
        yield (word, key), (n, N)

def mapper3(key, value):
    yield key[0], (key[1], value[0], value[1], 1)

class Reducer3:
    def __init__(self):
        self.doccount = float(self.params["doccount"])
    def __call__(self, key, values):
        values = list(values)
        m = sum(value[3] for value in values)
        for (doc, n, N) in (value[:3] for value in values):
            yield (key, doc), (float(n) / N) * log(self.doccount / m)

def runner(job):
    job.additer(mapper1, sumreducer, combiner=sumreducer)
    job.additer(mapper2, reducer2)
    job.additer(mapper3, Reducer3)

def starter(prog):
    prog.addopt("addpath", "yes")

if __name__ == "__main__":
    main(runner, starter)

As suggested on the next to last slide, I avoided the need for a 4th iteration by already computing the TF-IDF weights in the 3th reducer, but apart from that, this program works exactly as explained in the lecture. When running it, you have to use the parameter option -param doccount=<number of documents> to specify the total number of documents (which could be calculated by another Dumbo program, if necessary), and since the first mapper expects the keys to be of the form (file path, offset in file), the option -addpath yes is also required, but this one is automatically added by the starter function.

The line

values = list(values)

in Reducer3.__call__ is where the buffering issue discussed in the lecture occurs. For words like “the”, it might not be possible to put all of the values returned by the iterator values into memory as a list, and while trying to do so the program might even cause some cluster nodes to start swapping like crazy, up till the point where they become completely unresponsive and need to be power cycled in order to bring them back. This happened a few times at Last.fm, which is why we added the -memlimit <number of bytes> option to Dumbo. By putting the line