Dumbo on Cloudera’s distribution

May 31, 2009

Over the last couple of days, I picked up some rumors concerning the inclusion of all patches on which Dumbo relies in the most recent version of Cloudera’s Hadoop distribution. Todd confirmed this to me yesterday, so the time was right to finally have a look at Cloudera’s nicely packaged and patched-up Hadoop.

I started from a chrooted Debian server, on which I installed the Cloudera distribution, Python 2.5, and Dumbo as follows:

# cat /etc/apt/sources.list
deb http://ftp.be.debian.org/debian etch main contrib non-free
deb http://www.backports.org/debian etch-backports main contrib non-free
deb http://archive.cloudera.com/debian etch contrib
deb-src http://archive.cloudera.com/debian etch contrib
# wget -O - http://backports.org/debian/archive.key | apt-key add -
# wget -O - http://archive.cloudera.com/debian/archive.key | apt-key add -
# apt-get update
# apt-get install hadoop python2.5 python2.5-dev
# wget http://peak.telecommunity.com/dist/ez_setup.py
# python2.5 ez_setup.py dumbo

Then, I created a user for myself and confirmed that the wordcount.py program runs properly on Cloudera’s distribution in standalone mode:

# adduser klaas
# su - klaas
$ wget http://bit.ly/wordcountpy http://bit.ly/briantxt
$ dumbo start wordcount.py -input brian.txt -output brianwc \
-python python2.5 -hadoop /usr/lib/hadoop/
$ dumbo cat brianwc -hadoop /usr/lib/hadoop/ | grep Brian
Brian   6

Unsurprisingly, it also worked perfectly in pseudo-distributed mode:

$ exit
# apt-get install hadoop-conf-pseudo
# /etc/init.d/hadoop-namenode start
# /etc/init.d/hadoop-secondarynamenode start
# /etc/init.d/hadoop-datanode start
# /etc/init.d/hadoop-jobtracker start
# /etc/init.d/hadoop-tasktracker start
# su - klaas
$ dumbo start wordcount.py -input brian.txt -output brianwc \
-python python2.5 -hadoop /usr/lib/hadoop/
$ dumbo rm brianwc/_logs -hadoop /usr/lib/hadoop/
Deleted hdfs://localhost/user/klaas/brianwc/_logs
$ dumbo cat brianwc -hadoop /usr/lib/hadoop/ | grep Brian
Brian   6

Note that I removed the _logs directory first because dumbo cat would’ve complained about it otherwise. You can avoid this minor annoyance by disabling the creation of _logs directories.

I also verified that HADOOP-5528 got included by running the join.py example successfully:

$ wget http://bit.ly/joinpy
$ wget http://bit.ly/hostnamestxt http://bit.ly/logstxt
$ dumbo put hostnames.txt hostnames.txt -hadoop /usr/lib/hadoop/
$ dumbo put logs.txt logs.txt -hadoop /usr/lib/hadoop/
$ dumbo start join.py -input hostnames.txt -input logs.txt \
-output joined -python python2.5 -hadoop /usr/lib/hadoop/
$ dumbo rm joined/_logs -hadoop /usr/lib/hadoop
$ dumbo cat joined -hadoop /usr/lib/hadoop | grep node1
node1   5

And while I was at it, I did a quick typedbytes versus ctypedbytes comparison as well:

$ zcat /usr/share/man/man1/python2.5.1.gz > python.man
$ for i in `seq 100000`; do cat python.man >> python.txt; done
$ du -h python.txt
1.2G    python.txt
$ dumbo put python.txt python.txt -hadoop /usr/lib/hadoop/
$ time dumbo start wordcount.py -input python.txt -output pywc \
-python python2.5 -hadoop /usr/lib/hadoop/
real    17m45.473s
user    0m1.380s
sys     0m0.224s
$ exit
# apt-get install gcc libc6-dev
# su - klaas
$ python2.5 ez_setup.py -zmaxd. ctypedbytes
$ time dumbo start wordcount.py -input python.txt -output pywc2 \
-python python2.5 -hadoop /usr/lib/hadoop/ \
-libegg ctypedbytes-0.1.5-py2.5-linux-i686.egg
real    13m22.420s
user    0m1.320s
sys     0m0.216s

In this particular case, ctypedbytes appears to be 25% faster. Your mileage may vary since the running times depend on many factors, but in any case I’d always expect ctypedbytes to lead to significant speed improvements.

Advertisements

TF-IDF revisited

May 17, 2009

Remember the buffering problems for the TF-IDF program discussed in a previous post as well as the lecture about MapReduce algorithms from Cloudera‘s free Hadoop training? Thanks to the new joining abstraction (and the minor fixes and enhancements in Dumbo 0.21.13 and 0.21.14), these problems can now easily be avoided:

from dumbo import *
from math import log

@opt("addpath", "yes")
def mapper1(key, value):
    for word in value.split():
        yield (key[0], word), 1

@primary
def mapper2a(key, value):
    yield key[0], value

@secondary
def mapper2b(key, value):
    yield key[0], (key[1], value)

@primary
def mapper3a(key, value):
    yield value[0], 1

@secondary
def mapper3b(key, value):
    yield value[0], (key, value[1])

class Reducer(JoinReducer):
    def __init__(self):
        self.sum = 0
    def primary(self, key, values):
        self.sum = sum(values)

class Combiner(JoinCombiner):
    def primary(self, key, values):
        yield key, sum(values)

class Reducer1(Reducer):
    def secondary(self, key, values):
        for (doc, n) in values:
            yield key, (doc, float(n) / self.sum)

class Reducer2(Reducer):
    def __init__(self):
        Reducer.__init__(self)
        self.doccount = float(self.params["doccount"])
    def secondary(self, key, values):
        idf = log(self.doccount / self.sum)
        for (doc, tf) in values:
            yield (key, doc), tf * idf

def runner(job):
    job.additer(mapper1, sumreducer, combiner=sumreducer)
    multimapper = MultiMapper()
    multimapper.add("", mapper2a)
    multimapper.add("", mapper2b)
    job.additer(multimapper, Reducer1, Combiner)
    multimapper = MultiMapper()
    multimapper.add("", mapper3a)
    multimapper.add("", mapper3b)
    job.additer(multimapper, Reducer2, Combiner)

if __name__ == "__main__":
    main(runner)

Most of this Dumbo program shouldn’t be hard to understand if you had a peek at the posts about hiding join keys and the @opt decorator, except maybe for the following things:

  • The first argument supplied to MultiMapper‘s add method is a string corresponding to the pattern that has to occur in the file path in order for the added mapper to run on the key/value pairs in a given file. Since the empty string “” is considered to occur in every possible path string, all added mappers run on each input file in this example program.
  • It is possible (but not necessary) to yield key/value pairs in a JoinReducer‘s primary method, as illustrated by the Combiner class in this example.
  • The default implementations of JoinReducer‘s primary and secondary methods are identity operations, so Combiner combines the primary pairs and just passes on the secondary ones.

Writing this program went surprisingly smooth and didn’t take much effort at all. Apparently, the “primary/secondary abstraction” works really well for me.


Hiding join keys

May 15, 2009

Dumbo 0.21.12 further simplifies joining by hiding the join keys from the “end-programmer”. For instance, the example discussed here can now be written as follows:

import dumbo
from dumbo.lib import MultiMapper, JoinReducer
from dumbo.decor import primary, secondary

def mapper(key, value):
    yield value.split("\t", 1)

class Reducer(JoinReducer):
    def primary(self, key, values):
        self.hostname = values.next()
    def secondary(self, key, values):
        key = self.hostname
        for value in values:
            yield key, value

if __name__ == "__main__":
    multimapper = MultiMapper()
    multimapper.add("hostnames", primary(mapper))
    multimapper.add("logs", secondary(mapper))
    dumbo.run(multimapper, Reducer)

These are the things to note in this fancier version:

  • The mapping is implemented by combining decorated mappers into one MultiMapper.
  • The reducing is implemented by extending JoinReducer.
  • There is no direct interaction with the join keys. In fact, the -joinkeys yes option doesn’t even get specified explicitly (the decorated mappers and JoinReducer automatically make sure this option gets added via their opts attributes).

The primary and secondary decorators can, of course, also be applied using the @decorator syntax, i.e., I could also have written

@primary
def mapper1(key, value):
    yield value.split("\t", 1)

@secondary
def mapper2(key, value):
    yield value.split("\t", 1)

and

    multimapper.add("hostnames", mapper1)
    multimapper.add("logs", mapper2)

This is less convenient for this particular example, but it might be preferable when your primary and secondary mapper have different implementations.


The @opt decorator

May 13, 2009

I added the @opt decorator to Dumbo yesterday, and decided it was useful enough to justify a new minor release. Using this decorator, mappers and reducers can specify (additional) options they rely on. For instance, you can now write

from dumbo import opt, run, sumreducer

@opt("addpath", "yes")
def mapper(key, value):
    for word in value.split():
        yield (word, key[0]), 1

if __name__ == "__main__":
    run(mapper, sumreducer, combiner=sumreducer)

instead of

from dumbo import main, sumreducer

def mapper(key, value):
    for word in value.split():
        yield (word, key[0]), 1

def runner(job):
    job.additer(mapper, sumreducer, combiner=sumreducer)

def starter(prog):
    prog.addopt("addpath", "yes")

if __name__ == "__main__":
    main(runner, starter)

to count words on a file per file basis (recall that the -addpath yes option makes sure the file path is prepended to the key passed to the mapper). The former version is not only shorter, but also more clear, since the option specification is closer to the code that relies on it.

Under the hood, the @opt decorator appends the given option to an opts list attribute. For mapper and reducer classes, you can just set this attribute directly:

class Mapper:
    opts = [("addpath", "yes")]
    def __call__(self, key, value):
        for word in value.split():
            yield (word, key[0]), 1

if __name__ == "__main__":
    from dumbo import run, sumreducer
    run(Mapper, sumreducer, combiner=sumreducer)

Furthermore, it’s now also possible to pass an options list to run() or additer() via the opts argument:

def mapper(key, value):
    for word in value.split():
        yield (word, key[0]), 1

if __name__ == "__main__":
    from dumbo import run, sumreducer
    opts = [("addpath", "yes")]
    run(mapper, sumreducer, combiner=sumreducer, opts=opts)

which could be handy when you want to use join keys for only one iteration, for example.


Dumbo IP count in C

May 3, 2009

It doesn’t comply very well with the goal of making it as easy as possible to write MapReduce programs, but Dumbo mappers and reducers can also be written in C instead of Python. I just put an example on GitHub to illustrate this. Although it’s nowhere near as convenient as using Python, writing a mapper or reducer in C is not that hard since you get to use the nifty Python C API, and in some specific cases the speed gains might be worth the extra effort. Moreover, setuptools nicely takes care of all the building and compiling, and you can limit the C code to computationally expensive parts and still use Python for the rest.


Measuring our elephants’ appetite

April 17, 2009

We recently wanted to have a rough idea of how much data our Hadoop clusters process on a daily basis. Here’s the Dumbo program I used to obtain this information:

from datetime import date

class Mapper:
    def __init__(self):
        from re import compile
        self.numbers = []
        self.numbers.append(compile("HDFS bytes read:([0-9]+)"))
        self.numbers.append(compile("Local bytes read:([0-9]+)"))
        self.finish = compile('FINISH_TIME="([^"]+)"')
    def __call__(self, key, value):
        if value.startswith("Job") and "COUNTERS" in value:
            gb = 0  # gigabytes processed
            for number in self.numbers:
                mo = number.search(value)
                if mo: gb += int(round(float(mo.group(1)) / 2**30))
            ts = float(self.finish.search(value).group(1)) / 1000
            datetuple = date.fromtimestamp(ts).timetuple()[:3]
            yield datetuple, gb

if __name__ == "__main__":
    from dumbo import run, sumreducer
    run(Mapper, sumreducer, combiner=sumreducer)

Running this on the job logs for one of our clusters (which are gathered by the shell script discussed in this previous post) led to the following graph:

Bytes processed daily by one of our Hadoop clusters

This graph clearly shows why some of us get annoyed sometimes when they want to explore data on this cluster on certain days of the week or month…


Mapper and reducer interfaces

March 31, 2009

In Dumbo 0.21.3, an alternative interface for mappers and reducers got added. Using this interface, the “wordcount” example

def mapper(key, value):
    for word in value.split(): 
        yield word, 1

def reducer(key, values):
    yield key, sum(values)

if __name__ == "__main__":
    from dumbo import run
    run(mapper, reducer, combiner=reducer)

can be written as follows:

def mapper(data):
    for key, value in data:
        for word in value.split(): 
            yield word, 1

def reducer(data):
    for key, values in data:
        yield key, sum(values)

if __name__ == "__main__":
    from dumbo import run
    run(mapper, reducer, combiner=reducer)

Dumbo automatically detects which interface is being used by the function, and calls it appropriately. In theory, the alternative version is faster since it involves less function calls, but the real reason why the new interface got added is because it is more low-level and can make integration with existing Python code easier in some cases.

Just like the original interface, the alternative one also works for mapper and reducer classes. Adapting the first example above such that a class is used for both the mapper and reducer results in:

class Mapper:
    def __call__(self, key, value):
        for word in value.split(): 
            yield word, 1

class Reducer:
    def __call__(self, key, values):
        yield key, sum(values)

if __name__ == "__main__":
    from dumbo import run
    run(Mapper, Reducer, combiner=Reducer)

Applying the same transformation to the version using the alternative interface leads to:

class Mapper:
    def __call__(self, data):
        for key, value in data:
            for word in value.split(): 
                yield word, 1

class Reducer:
    def __call__(self, data):
        for key, values in data:
            yield key, sum(values)

if __name__ == "__main__":
    from dumbo import run
    run(Mapper, Reducer, combiner=Reducer)

Since mapper and reducer functions that use the alternative interface are called only once, you don’t need classes to add initialization or cleanup logic when using this interface, but they can still be useful if you want to access the fields that Dumbo automatically adds to them.