Moving to Hadoop 0.20

November 23, 2009

We’ve finally started looking into moving from Hadoop 0.18 to 0.20 at Last.fm, and I thought it might be useful to share a few Dumbo-related things I learned in the process:

  • We’re probably going to base our 0.20 build on Cloudera‘s 0.20 distribution, and I found out the hard way that Dumbo doesn’t work on version 0.20.1+133 of this distribution because it includes a patch for MAPREDUCE-967 that breaks some of the Hadoop Streaming functionality on which Dumbo relies. Luckily, the Cloudera guys fixed it in 0.20.1+152 by reverting this patch, but if you’re still trying to get Dumbo to work on Cloudera’s 0.20.1+133 distribution for some reason then you can expect to get NullPointerExceptions and errors like, e.g., “module wordcount not found” in your tasks’ stderr logs.
  • Also, the Cloudera guys apparently haven’t added the patch for MAPREDUCE-764 to their distribution yet, so you’ll still have to apply this patch yourself if you want to avoid strange encoding problems in certain corner cases. This patch has now been reviewed and accepted for Hadoop 0.21 for quite a while already though, so maybe we can be hopeful about it getting included in Cloudera’s 0.20 distribution soon.
  • The Twitter guys put together a pretty awesome patched and backported version of hadoop-gpl-compression for Hadoop 0.20. It includes several bugfixes and it also provides an InputFormat for the old API, which is useful for Hadoop Streaming (and hence also Dumbo) users since Streaming has not been converted to the new API yet. If you’re interested in this stuff, you might want to have a look at this guest post from Kevin and Eric on the Cloudera blog.
Advertisements

Dumbo on Cloudera’s distribution

May 31, 2009

Over the last couple of days, I picked up some rumors concerning the inclusion of all patches on which Dumbo relies in the most recent version of Cloudera’s Hadoop distribution. Todd confirmed this to me yesterday, so the time was right to finally have a look at Cloudera’s nicely packaged and patched-up Hadoop.

I started from a chrooted Debian server, on which I installed the Cloudera distribution, Python 2.5, and Dumbo as follows:

# cat /etc/apt/sources.list
deb http://ftp.be.debian.org/debian etch main contrib non-free
deb http://www.backports.org/debian etch-backports main contrib non-free
deb http://archive.cloudera.com/debian etch contrib
deb-src http://archive.cloudera.com/debian etch contrib
# wget -O - http://backports.org/debian/archive.key | apt-key add -
# wget -O - http://archive.cloudera.com/debian/archive.key | apt-key add -
# apt-get update
# apt-get install hadoop python2.5 python2.5-dev
# wget http://peak.telecommunity.com/dist/ez_setup.py
# python2.5 ez_setup.py dumbo

Then, I created a user for myself and confirmed that the wordcount.py program runs properly on Cloudera’s distribution in standalone mode:

# adduser klaas
# su - klaas
$ wget http://bit.ly/wordcountpy http://bit.ly/briantxt
$ dumbo start wordcount.py -input brian.txt -output brianwc \
-python python2.5 -hadoop /usr/lib/hadoop/
$ dumbo cat brianwc -hadoop /usr/lib/hadoop/ | grep Brian
Brian   6

Unsurprisingly, it also worked perfectly in pseudo-distributed mode:

$ exit
# apt-get install hadoop-conf-pseudo
# /etc/init.d/hadoop-namenode start
# /etc/init.d/hadoop-secondarynamenode start
# /etc/init.d/hadoop-datanode start
# /etc/init.d/hadoop-jobtracker start
# /etc/init.d/hadoop-tasktracker start
# su - klaas
$ dumbo start wordcount.py -input brian.txt -output brianwc \
-python python2.5 -hadoop /usr/lib/hadoop/
$ dumbo rm brianwc/_logs -hadoop /usr/lib/hadoop/
Deleted hdfs://localhost/user/klaas/brianwc/_logs
$ dumbo cat brianwc -hadoop /usr/lib/hadoop/ | grep Brian
Brian   6

Note that I removed the _logs directory first because dumbo cat would’ve complained about it otherwise. You can avoid this minor annoyance by disabling the creation of _logs directories.

I also verified that HADOOP-5528 got included by running the join.py example successfully:

$ wget http://bit.ly/joinpy
$ wget http://bit.ly/hostnamestxt http://bit.ly/logstxt
$ dumbo put hostnames.txt hostnames.txt -hadoop /usr/lib/hadoop/
$ dumbo put logs.txt logs.txt -hadoop /usr/lib/hadoop/
$ dumbo start join.py -input hostnames.txt -input logs.txt \
-output joined -python python2.5 -hadoop /usr/lib/hadoop/
$ dumbo rm joined/_logs -hadoop /usr/lib/hadoop
$ dumbo cat joined -hadoop /usr/lib/hadoop | grep node1
node1   5

And while I was at it, I did a quick typedbytes versus ctypedbytes comparison as well:

$ zcat /usr/share/man/man1/python2.5.1.gz > python.man
$ for i in `seq 100000`; do cat python.man >> python.txt; done
$ du -h python.txt
1.2G    python.txt
$ dumbo put python.txt python.txt -hadoop /usr/lib/hadoop/
$ time dumbo start wordcount.py -input python.txt -output pywc \
-python python2.5 -hadoop /usr/lib/hadoop/
real    17m45.473s
user    0m1.380s
sys     0m0.224s
$ exit
# apt-get install gcc libc6-dev
# su - klaas
$ python2.5 ez_setup.py -zmaxd. ctypedbytes
$ time dumbo start wordcount.py -input python.txt -output pywc2 \
-python python2.5 -hadoop /usr/lib/hadoop/ \
-libegg ctypedbytes-0.1.5-py2.5-linux-i686.egg
real    13m22.420s
user    0m1.320s
sys     0m0.216s

In this particular case, ctypedbytes appears to be 25% faster. Your mileage may vary since the running times depend on many factors, but in any case I’d always expect ctypedbytes to lead to significant speed improvements.


Computing TF-IDF weights

March 15, 2009

The guys from Cloudera are now providing their basic Hadoop training for free online. The lecture about MapReduce algorithms was the first one I watched, and I couldn’t resist writing a Dumbo program that implements the last algorithm discussed in this lecture:

from dumbo import main, sumreducer
from math import log

def mapper1(key, value):
    for word in value.split():
        yield (word, key[0]), 1

def mapper2(key, value):
    yield key[1], (key[0], value)

def reducer2(key, values):
    values = list(values)
    N = sum(value[1] for value in values)
    for (word, n) in values:
        yield (word, key), (n, N)

def mapper3(key, value):
    yield key[0], (key[1], value[0], value[1], 1)

class Reducer3:
    def __init__(self):
        self.doccount = float(self.params["doccount"])
    def __call__(self, key, values):
        values = list(values)
        m = sum(value[3] for value in values)
        for (doc, n, N) in (value[:3] for value in values):
            yield (key, doc), (float(n) / N) * log(self.doccount / m)

def runner(job):
    job.additer(mapper1, sumreducer, combiner=sumreducer)
    job.additer(mapper2, reducer2)
    job.additer(mapper3, Reducer3)

def starter(prog):
    prog.addopt("addpath", "yes")

if __name__ == "__main__":
    main(runner, starter)

As suggested on the next to last slide, I avoided the need for a 4th iteration by already computing the TF-IDF weights in the 3th reducer, but apart from that, this program works exactly as explained in the lecture. When running it, you have to use the parameter option -param doccount=<number of documents> to specify the total number of documents (which could be calculated by another Dumbo program, if necessary), and since the first mapper expects the keys to be of the form (file path, offset in file), the option -addpath yes is also required, but this one is automatically added by the starter function.

The line

values = list(values)

in Reducer3.__call__ is where the buffering issue discussed in the lecture occurs. For words like “the”, it might not be possible to put all of the values returned by the iterator values into memory as a list, and while trying to do so the program might even cause some cluster nodes to start swapping like crazy, up till the point where they become completely unresponsive and need to be power cycled in order to bring them back. This happened a few times at Last.fm, which is why we added the -memlimit <number of bytes> option to Dumbo. By putting the line