The @opt decorator

May 13, 2009

I added the @opt decorator to Dumbo yesterday, and decided it was useful enough to justify a new minor release. Using this decorator, mappers and reducers can specify (additional) options they rely on. For instance, you can now write

from dumbo import opt, run, sumreducer

@opt("addpath", "yes")
def mapper(key, value):
    for word in value.split():
        yield (word, key[0]), 1

if __name__ == "__main__":
    run(mapper, sumreducer, combiner=sumreducer)

instead of

from dumbo import main, sumreducer

def mapper(key, value):
    for word in value.split():
        yield (word, key[0]), 1

def runner(job):
    job.additer(mapper, sumreducer, combiner=sumreducer)

def starter(prog):
    prog.addopt("addpath", "yes")

if __name__ == "__main__":
    main(runner, starter)

to count words on a file per file basis (recall that the -addpath yes option makes sure the file path is prepended to the key passed to the mapper). The former version is not only shorter, but also more clear, since the option specification is closer to the code that relies on it.

Under the hood, the @opt decorator appends the given option to an opts list attribute. For mapper and reducer classes, you can just set this attribute directly:

class Mapper:
    opts = [("addpath", "yes")]
    def __call__(self, key, value):
        for word in value.split():
            yield (word, key[0]), 1

if __name__ == "__main__":
    from dumbo import run, sumreducer
    run(Mapper, sumreducer, combiner=sumreducer)

Furthermore, it’s now also possible to pass an options list to run() or additer() via the opts argument:

def mapper(key, value):
    for word in value.split():
        yield (word, key[0]), 1

if __name__ == "__main__":
    from dumbo import run, sumreducer
    opts = [("addpath", "yes")]
    run(mapper, sumreducer, combiner=sumreducer, opts=opts)

which could be handy when you want to use join keys for only one iteration, for example.


Dumbo IP count in C

May 3, 2009

It doesn’t comply very well with the goal of making it as easy as possible to write MapReduce programs, but Dumbo mappers and reducers can also be written in C instead of Python. I just put an example on GitHub to illustrate this. Although it’s nowhere near as convenient as using Python, writing a mapper or reducer in C is not that hard since you get to use the nifty Python C API, and in some specific cases the speed gains might be worth the extra effort. Moreover, setuptools nicely takes care of all the building and compiling, and you can limit the C code to computationally expensive parts and still use Python for the rest.


Measuring our elephants’ appetite

April 17, 2009

We recently wanted to have a rough idea of how much data our Hadoop clusters process on a daily basis. Here’s the Dumbo program I used to obtain this information:

from datetime import date

class Mapper:
    def __init__(self):
        from re import compile
        self.numbers = []
        self.numbers.append(compile("HDFS bytes read:([0-9]+)"))
        self.numbers.append(compile("Local bytes read:([0-9]+)"))
        self.finish = compile('FINISH_TIME="([^"]+)"')
    def __call__(self, key, value):
        if value.startswith("Job") and "COUNTERS" in value:
            gb = 0  # gigabytes processed
            for number in self.numbers:
                mo = number.search(value)
                if mo: gb += int(round(float(mo.group(1)) / 2**30))
            ts = float(self.finish.search(value).group(1)) / 1000
            datetuple = date.fromtimestamp(ts).timetuple()[:3]
            yield datetuple, gb

if __name__ == "__main__":
    from dumbo import run, sumreducer
    run(Mapper, sumreducer, combiner=sumreducer)

Running this on the job logs for one of our clusters (which are gathered by the shell script discussed in this previous post) led to the following graph:

Bytes processed daily by one of our Hadoop clusters

This graph clearly shows why some of us get annoyed sometimes when they want to explore data on this cluster on certain days of the week or month…


Mapper and reducer interfaces

March 31, 2009

In Dumbo 0.21.3, an alternative interface for mappers and reducers got added. Using this interface, the “wordcount” example

def mapper(key, value):
    for word in value.split(): 
        yield word, 1

def reducer(key, values):
    yield key, sum(values)

if __name__ == "__main__":
    from dumbo import run
    run(mapper, reducer, combiner=reducer)

can be written as follows:

def mapper(data):
    for key, value in data:
        for word in value.split(): 
            yield word, 1

def reducer(data):
    for key, values in data:
        yield key, sum(values)

if __name__ == "__main__":
    from dumbo import run
    run(mapper, reducer, combiner=reducer)

Dumbo automatically detects which interface is being used by the function, and calls it appropriately. In theory, the alternative version is faster since it involves less function calls, but the real reason why the new interface got added is because it is more low-level and can make integration with existing Python code easier in some cases.

Just like the original interface, the alternative one also works for mapper and reducer classes. Adapting the first example above such that a class is used for both the mapper and reducer results in:

class Mapper:
    def __call__(self, key, value):
        for word in value.split(): 
            yield word, 1

class Reducer:
    def __call__(self, key, values):
        yield key, sum(values)

if __name__ == "__main__":
    from dumbo import run
    run(Mapper, Reducer, combiner=Reducer)

Applying the same transformation to the version using the alternative interface leads to:

class Mapper:
    def __call__(self, data):
        for key, value in data:
            for word in value.split(): 
                yield word, 1

class Reducer:
    def __call__(self, data):
        for key, values in data:
            yield key, sum(values)

if __name__ == "__main__":
    from dumbo import run
    run(Mapper, Reducer, combiner=Reducer)

Since mapper and reducer functions that use the alternative interface are called only once, you don’t need classes to add initialization or cleanup logic when using this interface, but they can still be useful if you want to access the fields that Dumbo automatically adds to them.


Join keys

March 20, 2009

Earlier today, I released Dumbo 0.21.3, which adds support for so called “join keys” (amongst other things). Here’s an example of a Dumbo program that uses such keys:

def mapper(key, value):
    key.isprimary = "hostnames" in key.body[0]
    key.body, value = value.split("\t", 1)
    yield key, value
    
class Reducer:
    def __init__(self):
        self.key = None
    def __call__(self, key, values):
        if key.isprimary:
            self.key = key.body
            self.hostname = values.next()
        elif self.key == key.body:
            key.body = self.hostname
            for value in values:
                yield key, value
    
def runner(job):
    job.additer(mapper, Reducer)
    
def starter(prog):
    prog.addopt("addpath", "yes")
    prog.addopt("joinkeys", "yes")

if __name__ == "__main__":
    from dumbo import main
    main(runner, starter)

When you put this code in join.py, you can join the files hostnames.txt and logs.txt as follows:

$ wget http://users.ugent.be/~klbostee/files/hostnames.txt
$ wget http://users.ugent.be/~klbostee/files/logs.txt
$ dumbo join.py -input hostnames.txt -input logs.txt \
-output joined.code
$ dumbo cat joined.code > joined.txt

In order to make join keys work for non-local runs, however, you need to apply the patch from HADOOP-5528, which requires Hadoop 0.20 or higher. More precisely, Dumbo relies on the BinaryPartitioner from HADOOP-5528 to make sure that:

  1. All keys that differ only in the .isprimary attribute are passed to the same reducer.
  2. The primary keys are always reduced before the non-primary ones.

If you want to find out how this works exactly, you might want to watch Cloudera’s “MapReduce Algorithms” lecture, since joining by means of a custom partitioner is one of the common idioms discussed in this lecture.

UPDATE: Dumbo 0.21.3 is not compatible with the evolved version of the patch for HADOOP-5528. To make things work with the final patch, you need to upgrade to Dumbo 0.21.4.


Computing TF-IDF weights

March 15, 2009

The guys from Cloudera are now providing their basic Hadoop training for free online. The lecture about MapReduce algorithms was the first one I watched, and I couldn’t resist writing a Dumbo program that implements the last algorithm discussed in this lecture:

from dumbo import main, sumreducer
from math import log

def mapper1(key, value):
    for word in value.split():
        yield (word, key[0]), 1

def mapper2(key, value):
    yield key[1], (key[0], value)

def reducer2(key, values):
    values = list(values)
    N = sum(value[1] for value in values)
    for (word, n) in values:
        yield (word, key), (n, N)

def mapper3(key, value):
    yield key[0], (key[1], value[0], value[1], 1)

class Reducer3:
    def __init__(self):
        self.doccount = float(self.params["doccount"])
    def __call__(self, key, values):
        values = list(values)
        m = sum(value[3] for value in values)
        for (doc, n, N) in (value[:3] for value in values):
            yield (key, doc), (float(n) / N) * log(self.doccount / m)

def runner(job):
    job.additer(mapper1, sumreducer, combiner=sumreducer)
    job.additer(mapper2, reducer2)
    job.additer(mapper3, Reducer3)

def starter(prog):
    prog.addopt("addpath", "yes")

if __name__ == "__main__":
    main(runner, starter)

As suggested on the next to last slide, I avoided the need for a 4th iteration by already computing the TF-IDF weights in the 3th reducer, but apart from that, this program works exactly as explained in the lecture. When running it, you have to use the parameter option -param doccount=<number of documents> to specify the total number of documents (which could be calculated by another Dumbo program, if necessary), and since the first mapper expects the keys to be of the form (file path, offset in file), the option -addpath yes is also required, but this one is automatically added by the starter function.

The line

values = list(values)

in Reducer3.__call__ is where the buffering issue discussed in the lecture occurs. For words like “the”, it might not be possible to put all of the values returned by the iterator values into memory as a list, and while trying to do so the program might even cause some cluster nodes to start swapping like crazy, up till the point where they become completely unresponsive and need to be power cycled in order to bring them back. This happened a few times at Last.fm, which is why we added the -memlimit <number of bytes> option to Dumbo. By putting the line

memlimit: <maximum number of bytes>

in the [common] section of /etc/dumbo.conf, you can set an upper limit on the amount of memory that can be used, and make your Dumbo programs fail with a MemoryError when they exceed this limit.


Parsers and records

March 6, 2009

Earlier today I released Dumbo 0.20.29, which adds built-in support for parser and record classes. The easiest way to explain the use of such classes is by means of an example, so lets just dive in straight away and look at some code:

class JobLog:
    def __init__(self, logline=None):
        from re import compile
        self.fieldselector = compile('([A-Z_]+)="([^"]*)"')
        if logline: self.parse(logline)
    def parse(self, logline):
        self.type = logline.split(" ", 1)[0]
        fields = dict(self.fieldselector.findall(logline))
        self.fields = fields
        if self.type.endswith("Attempt"):
            try:
                self.jobname = fields["JOBNAME"]
                self.taskid = fields["TASK_ATTEMPT_ID"]
                if "START_TIME" in fields:
                    self.start = int(fields["START_TIME"])
                    self.stop = None
                    self.status = None
                elif "FINISH_TIME" in fields:
                    self.start = None
                    self.stop = int(fields["FINISH_TIME"])
                    self.status = fields["TASK_STATUS"]
            except KeyError, err:
                raise ValueError(str(err))
        return self

This class is a parser because:

  1. It has a parse method that takes a line of text as input and returns an object that represents this line.
  2. Its parse method raises a ValueError when the given line of text is in an unexpected format.

Since the objects returned by the parse method only have to stay valid until the next time parse is called, the same object can be reused. It’s even possible to return the parser object itself, as illustrated by the example above.

Now, when you put JobLog in parsers.py and add the option -file parsers.py to the dumbo start command, you can rewrite the job statistics program from my previous post as follows:

from dumbo import sumsreducer, statsreducer, statscombiner, main

def mapper1(key, log):
    if log.type.endswith("Attempt"):
        if log.start:
            yield (log.type, log.jobname, log.taskid), (log.start, 0)
        elif log.stop and log.status == "SUCCESS":
            yield (log.type, log.jobname, log.taskid), (0, log.stop)

def mapper2(key, value):
    if value[0] > 0 and value[1] > 0:
        yield key[:2], (value[1] - value[0]) / 1000.0

def runner(job):
    job.additer(Mapper1, sumsreducer, combiner=sumsreducer)
    job.additer(mapper2, statsreducer, statscombiner)

def starter(prog):
    prog.addopt("parser", "parsers.JobLog")
    date = prog.delopt("date")
    if date:
        prog.addopt("input", "/user/hadoop/joblogs/" + date)
        prog.addopt("name", "jobstats-" + date)

if __name__ == "__main__":
    main(runner, starter)

The main thing to note in this code is the line

prog.addopt("parser", "parsers.JobLog")

By means of the new -parser <full name of Python class> option, you can tell Dumbo to convert all values to objects using a given parser, before passing them on to your mapper. When a ValueError occurs during this process, Dumbo will ignore the corresponding value and increment the counter “Bad inputs”, unless you indicated that errors shouldn’t be caught by using the option -debug yes.

Record classes are similar to parser classes, but instead of a parse method they have a set method that can take multiple values as input. The corresponding option is -record <full name of Python class>, which makes Dumbo create an instance of the given record class and call instance.set(*value) for each value. Since Dumbo represents Hadoop records as sequences of Python values corresponding to the different fields, this is especially useful when consuming files that contain such Hadoop records (which explains why I refer to these value-transforming Python classes as “records”).

At Last.fm, we’ve been using parser and record classes for quite a while now, because they allow us to both abstract away file format details and handle many bug fixes and improvements centrally. We bundle them in Python eggs and make sure they are submitted along with every Dumbo program by putting the necessary -libegg options in /etc/dumbo.conf. Always having to manually create and invoke the parser or record class quickly becomes a bit tiring though, so the new -parser and -record options are definitely a welcome improvement.


Simple job logs analysis

March 4, 2009

Computing some simple statistics for the jobs that run on your Hadoop cluster can be very useful in practice. Data collection systems like Chuckwa probably allow you to do this, but if you don’t have hundreds of nodes, simply running the following shell script daily on your master might be all you need:

find /path/to/hadoop/logs/history/ -daystart -ctime 1 | \
grep -v 'xml$' | grep -v 'crc$' | while read FILE; do
NAME="`basename $FILE`"
sed 's/ *$//' $FILE | sed "s/\$/ JOBNAME=\"$NAME\"/"
done > /tmp/joblogs.txt

DATE="`date -d yesterday +"%Y/%m/%d"`"
/path/to/hadoop/bin/hadoop dfs -mkdir /user/hadoop/joblogs/$DATE
/path/to/hadoop/bin/hadoop dfs -put /tmp/joblogs.txt \
/user/hadoop/joblogs/$DATE/joblogs.txt || exit 1

rm /tmp/joblogs.txt

This script takes all of yesterday’s job logs, adds a JOBNAME=”<filename>” field to each line, puts everything in a single file, and uploads this file to the DFS. Once you got this set up, you can use Hadoop to analyse your job logs. Here’s an example in the form of a Dumbo program:

from dumbo import sumsreducer, statsreducer, statscombiner, main

class Mapper1:
    def __init__(self):
        from re import compile
        self.fieldselector = compile('([A-Z_]+)="([^"]*)"')
    def __call__(self, key, value):
        logtype = value.split(" ", 1)[0]
        if logtype.endswith("Attempt"):
            try:
                fields = dict(self.fieldselector.findall(value))
                jobname = fields["JOBNAME"]
                taskid = fields["TASK_ATTEMPT_ID"]
                if "START_TIME" in fields:
                    start = int(fields["START_TIME"])
                    yield (logtype, jobname, taskid), (start, 0)
                elif "FINISH_TIME" in fields and not "ERROR" in fields:
                    stop = int(fields["FINISH_TIME"])
                    yield (logtype, jobname, taskid), (0, stop)
            except KeyError:
                self.counters["Broken loglines"] += 1

def mapper2(key, value):
    if value[0] > 0 and value[1] > 0:
        yield key[:2], (value[1] - value[0]) / 1000.0

def runner(job):
    job.additer(Mapper1, sumsreducer, combiner=sumsreducer)
    job.additer(mapper2, statsreducer, statscombiner)

def starter(prog):
    date = prog.delopt("date")
    if date:
        prog.addopt("input", "/user/hadoop/joblogs/" + date)
        prog.addopt("name", "jobstats-" + date)

if __name__ == "__main__":
    main(runner, starter)

From the output of this program, you can easily generate a few charts that show you which jobs are slowest. We recently started playing with this at Last.fm, mainly because such charts allows us to identify the jobs on which we should focus our optimization efforts.


The HADOOP-1722 graph

February 27, 2009

I really enjoyed reading Paul’s fascinating blog post about generating artist similarity graphs. Personally, I would’ve used a different API and programming language, but his end result is just amazing. Since I computed similarities between Hadoop JIRA issues the other day, this made me wonder if it would be possible to generate such a graph for HADOOP-1722.

In addition to the similarities, Paul’s graph generation algorithm also requires familiarity/popularity scores, which conveniently allows me to add the obligatory Dumbo flavor to this post. Even if comments.txt would be a huge file consisting of millions of (issue, commenter) pairs, the following Dumbo program could still be used to compute both the number of comments and the numbers of comment authors for each issue:

from dumbo import sumreducer, sumsreducer, main

def mapper1(key, value):
    issuenr, commenter = value.split("\t")
    yield (int(issuenr), commenter), 1

def mapper2(key, value):
    yield key[0], (value, 1)

def runner(job):
    job.additer(mapper1, sumreducer, combiner=sumreducer)
    job.additer(mapper2, sumsreducer, combiner=sumsreducer)

if __name__ == "__main__":
    main(runner)

In my experience, such two-step counting is a rather common pattern in MapReduce programs. It might remind you of the number of plays and listeners shown on Last.fm‘s artist, album, and track pages, for example. Usually, the second count is a better measures of popularity, so that’s what I’ll be using to generate my graph.

The file pops.code is what you obtain when you run the program above locally on comments.txt. It’s in a convenient human-readable text format that preserves the type information. By using the -ascode yes option with dumbo cat, you can save files from the HDFS in this format as well, which is how I obtained the file recs.code. The following Python program generates the necessary Graphviz instructions from these files:

from dumbo import loadcode
pops = dict(loadcode(open("pops.code")))
recs = dict(loadcode(open("recs.code")))

todos = [1722]
plotted = set(todos)
print 'digraph {'
print '"%s" [label="HADOOP-%s"]' % (todos[0], todos[0])
while todos:
    todo  = todos.pop(0)
    for score, issue in recs[todo]:
        if score > 0.5 and pops[issue][1] < pops[todo][1]:
            print '"%s" -> "%s"' % (todo, issue)
            if not issue in plotted:
                todos.append(issue)
                plotted.add(issue)
                print '"%s" [label="HADOOP-%s"]' % (issue, issue)
print '}'

Here’s the resulting graph, which seems to be a nice graphical overview of the four Hadoop issues I commented on:

The HADOOP-1722 graph

Not too bad a result given the small amount of time and effort this required I guess, but it’s not nearly as cool as Paul’s Led Zeppelin graph. I strongly encourage his wife to watch a few more episodes of Dr. House in the future.


Mapper and reducer classes

February 26, 2009

As explained in the short tutorial, Dumbo provides the ability to use a class as mapper and/or reducer (and also as combiner, of course, but a combiner is really just a reducer with a slightly different purpose). Up until now, the main benefit of this was the possibility that it creates to use the constructor for initializations like, e.g., loading the contents of a file into memory. Version 0.20.27 introduces another reason to use classes instead of functions, however, as illustrated by the following extended version of the sampling program from my previous post:

class Mapper:
    def __init__(self):
        self.status = "Initialization started"
        from random import Random
        self.randgen = Random()
        self.cutoff = float(self.params["percentage"]) / 100
        self.samplesize = self.counters["Sample size"]
        self.status = "Initialization done"
    def __call__(self, key, value):
        if self.randgen.random() < self.cutoff:
            self.samplesize += 1
            yield key, value

if __name__ == "__main__":
    from dumbo import run
    run(Mapper)

The things to note in this code are the class variables params, counters, and status, which seem to come out of nowhere. From version 0.20.27 onwards, Dumbo will instantiate a dynamically generated class that inherits from both the class supplied by the programmer and dumbo.MapRedBase, resulting in the seemingly magical addition of the following fields:

  • params: A dictionary that contains the parameters specified using -param <key>=<value> options.
  • counters: You can think of this field as a defaultdict containing dumbo.Counter objects. When a given key has no corresponding counter yet, a new counter is created using the key as name for it. You can still change the name afterwards by assigning a different string to the counter’s name field, but by using a suitable key you can put out two candles with one blow.
  • status: Strings assigned to this field will show up as status message for the task in Hadoop’s web interface.

And as if this is not enough, you can now also use the

counter += amount

syntax to increment counters, instead of the less fancy (and harder to remember)

counter.incr(amount)

method call.


Follow

Get every new post delivered to your Inbox.