Join keys

March 20, 2009

Earlier today, I released Dumbo 0.21.3, which adds support for so called “join keys” (amongst other things). Here’s an example of a Dumbo program that uses such keys:

def mapper(key, value):
    key.isprimary = "hostnames" in key.body[0]
    key.body, value = value.split("\t", 1)
    yield key, value
    
class Reducer:
    def __init__(self):
        self.key = None
    def __call__(self, key, values):
        if key.isprimary:
            self.key = key.body
            self.hostname = values.next()
        elif self.key == key.body:
            key.body = self.hostname
            for value in values:
                yield key, value
    
def runner(job):
    job.additer(mapper, Reducer)
    
def starter(prog):
    prog.addopt("addpath", "yes")
    prog.addopt("joinkeys", "yes")

if __name__ == "__main__":
    from dumbo import main
    main(runner, starter)

When you put this code in join.py, you can join the files hostnames.txt and logs.txt as follows:

$ wget http://users.ugent.be/~klbostee/files/hostnames.txt
$ wget http://users.ugent.be/~klbostee/files/logs.txt
$ dumbo join.py -input hostnames.txt -input logs.txt \
-output joined.code
$ dumbo cat joined.code > joined.txt

In order to make join keys work for non-local runs, however, you need to apply the patch from HADOOP-5528, which requires Hadoop 0.20 or higher. More precisely, Dumbo relies on the BinaryPartitioner from HADOOP-5528 to make sure that:

  1. All keys that differ only in the .isprimary attribute are passed to the same reducer.
  2. The primary keys are always reduced before the non-primary ones.

If you want to find out how this works exactly, you might want to watch Cloudera’s “MapReduce Algorithms” lecture, since joining by means of a custom partitioner is one of the common idioms discussed in this lecture.

UPDATE: Dumbo 0.21.3 is not compatible with the evolved version of the patch for HADOOP-5528. To make things work with the final patch, you need to upgrade to Dumbo 0.21.4.


Computing TF-IDF weights

March 15, 2009

The guys from Cloudera are now providing their basic Hadoop training for free online. The lecture about MapReduce algorithms was the first one I watched, and I couldn’t resist writing a Dumbo program that implements the last algorithm discussed in this lecture:

from dumbo import main, sumreducer
from math import log

def mapper1(key, value):
    for word in value.split():
        yield (word, key[0]), 1

def mapper2(key, value):
    yield key[1], (key[0], value)

def reducer2(key, values):
    values = list(values)
    N = sum(value[1] for value in values)
    for (word, n) in values:
        yield (word, key), (n, N)

def mapper3(key, value):
    yield key[0], (key[1], value[0], value[1], 1)

class Reducer3:
    def __init__(self):
        self.doccount = float(self.params["doccount"])
    def __call__(self, key, values):
        values = list(values)
        m = sum(value[3] for value in values)
        for (doc, n, N) in (value[:3] for value in values):
            yield (key, doc), (float(n) / N) * log(self.doccount / m)

def runner(job):
    job.additer(mapper1, sumreducer, combiner=sumreducer)
    job.additer(mapper2, reducer2)
    job.additer(mapper3, Reducer3)

def starter(prog):
    prog.addopt("addpath", "yes")

if __name__ == "__main__":
    main(runner, starter)

As suggested on the next to last slide, I avoided the need for a 4th iteration by already computing the TF-IDF weights in the 3th reducer, but apart from that, this program works exactly as explained in the lecture. When running it, you have to use the parameter option -param doccount=<number of documents> to specify the total number of documents (which could be calculated by another Dumbo program, if necessary), and since the first mapper expects the keys to be of the form (file path, offset in file), the option -addpath yes is also required, but this one is automatically added by the starter function.

The line

values = list(values)

in Reducer3.__call__ is where the buffering issue discussed in the lecture occurs. For words like “the”, it might not be possible to put all of the values returned by the iterator values into memory as a list, and while trying to do so the program might even cause some cluster nodes to start swapping like crazy, up till the point where they become completely unresponsive and need to be power cycled in order to bring them back. This happened a few times at Last.fm, which is why we added the -memlimit <number of bytes> option to Dumbo. By putting the line


Parsers and records

March 6, 2009

Earlier today I released Dumbo 0.20.29, which adds built-in support for parser and record classes. The easiest way to explain the use of such classes is by means of an example, so lets just dive in straight away and look at some code:

class JobLog:
    def __init__(self, logline=None):
        from re import compile
        self.fieldselector = compile('([A-Z_]+)="([^"]*)"')
        if logline: self.parse(logline)
    def parse(self, logline):
        self.type = logline.split(" ", 1)[0]
        fields = dict(self.fieldselector.findall(logline))
        self.fields = fields
        if self.type.endswith("Attempt"):
            try:
                self.jobname = fields["JOBNAME"]
                self.taskid = fields["TASK_ATTEMPT_ID"]
                if "START_TIME" in fields:
                    self.start = int(fields["START_TIME"])
                    self.stop = None
                    self.status = None
                elif "FINISH_TIME" in fields:
                    self.start = None
                    self.stop = int(fields["FINISH_TIME"])
                    self.status = fields["TASK_STATUS"]
            except KeyError, err:
                raise ValueError(str(err))
        return self

This class is a parser because:

  1. It has a parse method that takes a line of text as input and returns an object that represents this line.
  2. Its parse method raises a ValueError when the given line of text is in an unexpected format.

Since the objects returned by the parse method only have to stay valid until the next time parse is called, the same object can be reused. It’s even possible to return the parser object itself, as illustrated by the example above.

Now, when you put JobLog in parsers.py and add the option -file parsers.py to the dumbo start command, you can rewrite the job statistics program from my previous post as follows:

from dumbo import sumsreducer, statsreducer, statscombiner, main

def mapper1(key, log):
    if log.type.endswith("Attempt"):
        if log.start:
            yield (log.type, log.jobname, log.taskid), (log.start, 0)
        elif log.stop and log.status == "SUCCESS":
            yield (log.type, log.jobname, log.taskid), (0, log.stop)

def mapper2(key, value):
    if value[0] > 0 and value[1] > 0:
        yield key[:2], (value[1] - value[0]) / 1000.0

def runner(job):
    job.additer(Mapper1, sumsreducer, combiner=sumsreducer)
    job.additer(mapper2, statsreducer, statscombiner)

def starter(prog):
    prog.addopt("parser", "parsers.JobLog")
    date = prog.delopt("date")
    if date:
        prog.addopt("input", "/user/hadoop/joblogs/" + date)
        prog.addopt("name", "jobstats-" + date)

if __name__ == "__main__":
    main(runner, starter)

The main thing to note in this code is the line

prog.addopt("parser", "parsers.JobLog")

By means of the new -parser <full name of Python class> option, you can tell Dumbo to convert all values to objects using a given parser, before passing them on to your mapper. When a ValueError occurs during this process, Dumbo will ignore the corresponding value and increment the counter “Bad inputs”, unless you indicated that errors shouldn’t be caught by using the option -debug yes.

Record classes are similar to parser classes, but instead of a parse method they have a set method that can take multiple values as input. The corresponding option is -record <full name of Python class>, which makes Dumbo create an instance of the given record class and call instance.set(*value) for each value. Since Dumbo represents Hadoop records as sequences of Python values corresponding to the different fields, this is especially useful when consuming files that contain such Hadoop records (which explains why I refer to these value-transforming Python classes as “records”).

At Last.fm, we’ve been using parser and record classes for quite a while now, because they allow us to both abstract away file format details and handle many bug fixes and improvements centrally. We bundle them in Python eggs and make sure they are submitted along with every Dumbo program by putting the necessary -libegg options in /etc/dumbo.conf. Always having to manually create and invoke the parser or record class quickly becomes a bit tiring though, so the new -parser and -record options are definitely a welcome improvement.


Simple job logs analysis

March 4, 2009

Computing some simple statistics for the jobs that run on your Hadoop cluster can be very useful in practice. Data collection systems like Chuckwa probably allow you to do this, but if you don’t have hundreds of nodes, simply running the following shell script daily on your master might be all you need:

find /path/to/hadoop/logs/history/ -daystart -ctime 1 | \
grep -v 'xml$' | grep -v 'crc$' | while read FILE; do
NAME="`basename $FILE`"
sed 's/ *$//' $FILE | sed "s/\$/ JOBNAME=\"$NAME\"/"
done > /tmp/joblogs.txt

DATE="`date -d yesterday +"%Y/%m/%d"`"
/path/to/hadoop/bin/hadoop dfs -mkdir /user/hadoop/joblogs/$DATE
/path/to/hadoop/bin/hadoop dfs -put /tmp/joblogs.txt \
/user/hadoop/joblogs/$DATE/joblogs.txt || exit 1

rm /tmp/joblogs.txt

This script takes all of yesterday’s job logs, adds a JOBNAME=”<filename>” field to each line, puts everything in a single file, and uploads this file to the DFS. Once you got this set up, you can use Hadoop to analyse your job logs. Here’s an example in the form of a Dumbo program:

from dumbo import sumsreducer, statsreducer, statscombiner, main

class Mapper1:
    def __init__(self):
        from re import compile
        self.fieldselector = compile('([A-Z_]+)="([^"]*)"')
    def __call__(self, key, value):
        logtype = value.split(" ", 1)[0]
        if logtype.endswith("Attempt"):
            try:
                fields = dict(self.fieldselector.findall(value))
                jobname = fields["JOBNAME"]
                taskid = fields["TASK_ATTEMPT_ID"]
                if "START_TIME" in fields:
                    start = int(fields["START_TIME"])
                    yield (logtype, jobname, taskid), (start, 0)
                elif "FINISH_TIME" in fields and not "ERROR" in fields:
                    stop = int(fields["FINISH_TIME"])
                    yield (logtype, jobname, taskid), (0, stop)
            except KeyError:
                self.counters["Broken loglines"] += 1

def mapper2(key, value):
    if value[0] > 0 and value[1] > 0:
        yield key[:2], (value[1] - value[0]) / 1000.0

def runner(job):
    job.additer(Mapper1, sumsreducer, combiner=sumsreducer)
    job.additer(mapper2, statsreducer, statscombiner)

def starter(prog):
    date = prog.delopt("date")
    if date:
        prog.addopt("input", "/user/hadoop/joblogs/" + date)
        prog.addopt("name", "jobstats-" + date)

if __name__ == "__main__":
    main(runner, starter)

From the output of this program, you can easily generate a few charts that show you which jobs are slowest. We recently started playing with this at Last.fm, mainly because such charts allows us to identify the jobs on which we should focus our optimization efforts.


The HADOOP-1722 graph

February 27, 2009

I really enjoyed reading Paul’s fascinating blog post about generating artist similarity graphs. Personally, I would’ve used a different API and programming language, but his end result is just amazing. Since I computed similarities between Hadoop JIRA issues the other day, this made me wonder if it would be possible to generate such a graph for HADOOP-1722.

In addition to the similarities, Paul’s graph generation algorithm also requires familiarity/popularity scores, which conveniently allows me to add the obligatory Dumbo flavor to this post. Even if comments.txt would be a huge file consisting of millions of (issue, commenter) pairs, the following Dumbo program could still be used to compute both the number of comments and the numbers of comment authors for each issue:

from dumbo import sumreducer, sumsreducer, main

def mapper1(key, value):
    issuenr, commenter = value.split("\t")
    yield (int(issuenr), commenter), 1

def mapper2(key, value):
    yield key[0], (value, 1)

def runner(job):
    job.additer(mapper1, sumreducer, combiner=sumreducer)
    job.additer(mapper2, sumsreducer, combiner=sumsreducer)

if __name__ == "__main__":
    main(runner)

In my experience, such two-step counting is a rather common pattern in MapReduce programs. It might remind you of the number of plays and listeners shown on Last.fm‘s artist, album, and track pages, for example. Usually, the second count is a better measures of popularity, so that’s what I’ll be using to generate my graph.

The file pops.code is what you obtain when you run the program above locally on comments.txt. It’s in a convenient human-readable text format that preserves the type information. By using the -ascode yes option with dumbo cat, you can save files from the HDFS in this format as well, which is how I obtained the file recs.code. The following Python program generates the necessary Graphviz instructions from these files:

from dumbo import loadcode
pops = dict(loadcode(open("pops.code")))
recs = dict(loadcode(open("recs.code")))

todos = [1722]
plotted = set(todos)
print 'digraph {'
print '"%s" [label="HADOOP-%s"]' % (todos[0], todos[0])
while todos:
    todo  = todos.pop(0)
    for score, issue in recs[todo]:
        if score > 0.5 and pops[issue][1] < pops[todo][1]:
            print '"%s" -> "%s"' % (todo, issue)
            if not issue in plotted:
                todos.append(issue)
                plotted.add(issue)
                print '"%s" [label="HADOOP-%s"]' % (issue, issue)
print '}'

Here’s the resulting graph, which seems to be a nice graphical overview of the four Hadoop issues I commented on:

The HADOOP-1722 graph

Not too bad a result given the small amount of time and effort this required I guess, but it’s not nearly as cool as Paul’s Led Zeppelin graph. I strongly encourage his wife to watch a few more episodes of Dr. House in the future.


Mapper and reducer classes

February 26, 2009

As explained in the short tutorial, Dumbo provides the ability to use a class as mapper and/or reducer (and also as combiner, of course, but a combiner is really just a reducer with a slightly different purpose). Up until now, the main benefit of this was the possibility that it creates to use the constructor for initializations like, e.g., loading the contents of a file into memory. Version 0.20.27 introduces another reason to use classes instead of functions, however, as illustrated by the following extended version of the sampling program from my previous post:

class Mapper:
    def __init__(self):
        self.status = "Initialization started"
        from random import Random
        self.randgen = Random()
        self.cutoff = float(self.params["percentage"]) / 100
        self.samplesize = self.counters["Sample size"]
        self.status = "Initialization done"
    def __call__(self, key, value):
        if self.randgen.random() < self.cutoff:
            self.samplesize += 1
            yield key, value

if __name__ == "__main__":
    from dumbo import run
    run(Mapper)

The things to note in this code are the class variables params, counters, and status, which seem to come out of nowhere. From version 0.20.27 onwards, Dumbo will instantiate a dynamically generated class that inherits from both the class supplied by the programmer and dumbo.MapRedBase, resulting in the seemingly magical addition of the following fields:

  • params: A dictionary that contains the parameters specified using -param <key>=<value> options.
  • counters: You can think of this field as a defaultdict containing dumbo.Counter objects. When a given key has no corresponding counter yet, a new counter is created using the key as name for it. You can still change the name afterwards by assigning a different string to the counter’s name field, but by using a suitable key you can put out two candles with one blow.
  • status: Strings assigned to this field will show up as status message for the task in Hadoop’s web interface.

And as if this is not enough, you can now also use the

counter += amount

syntax to increment counters, instead of the less fancy (and harder to remember)

counter.incr(amount)

method call.


Random sampling

February 25, 2009

Simple random sampling is a technique often used in data analysis to (substantially) reduce the amount of data to be processed. In this post, I’ll take a stab at explaining how this can be done with Dumbo.

Consider the file commentcounts.txt, derived as follows from the comments.txt file that was generated as part of an earlier post:

$ sort comments.txt | uniq -c | \
sed 's/^[^0-9]*\([0-9]*\) \(.*\)*/\2\t\1/' > commentcounts.txt

which leads to lines of the form:

<Hadoop JIRA issue number>\t<comment author>\t<number of comments>

By means of the following Dumbo program, some statistics can be computed for each of the comment authors that occur in this file:

def mapper(key, value):
    issuenr, commenter, count = value.split("\t")
    yield commenter, int(count)

if __name__ == "__main__":
    from dumbo import run, statsreducer, statscombiner
    run(mapper, statsreducer, statscombiner)

The following three lines are part of the output obtained by running this program on commentcounts.txt:

Doug Cutting	1487    2.28984532616   2.94902633612   1       52
Owen O'Malley	1062    1.90301318267   1.87736300761   1       21
Tom White	268     2.26492537313   2.45723899975   1       24

Each of these lines ends with a number sequence consisting of the total number of issues commented on, followed by the mean, standard deviation, minimum, and maximum, respectively, of the number of comments per issue.

An easy way to take a random sample from commentcounts.txt is by means of the following Dumbo program:

from random import random

def mapper(key, value):
    if random() < 0.5: yield key, value

if __name__ == "__main__":
    from dumbo import run
    run(mapper)

Running the statistics program on the output of this sampling program gave me the following lines for Doug, Owen, and Tom:

Doug Cutting    760     2.22105263158   2.94351722074   1       52
Owen O'Malley   506     1.84584980237   1.69629699266   1       19
Tom White       131     2.23664122137   2.0296732376    1       11

A quick comparison of these lines with the ones above reveals that:

  • The number of issues about halved for each of the considered comment authors, which makes sense since Python’s random() function generates uniformly distributed floats between 0.0 (inclusive) and 1.0 (exclusive).
  • The means and standard deviations computed from the sample are quite close to the ones obtained from the complete dataset.
  • Two out of the three maximums computed from the sample aren’t very close to the real one. In Tom’s case it even isn’t close at all.

So, apparently, we can reliably compute means and standard deviations — but not maximums — from a sample, which shouldn’t come as a surprise to anyone who has some basic knowledge of statistics. Just like maximums, minimums can, in general, not be computed from samples by the way, even though this happens to work fine in the example above.

Even for means and standard deviations, some caution has to be taken when computing them from a random sample though. In my case, for instance, the mean computed from the sample is 4.0, whereas the real mean is 7.0:

Klaas Bosteels	4	7.0	6.0415229868	1	16
Klaas Bosteels	3	4.0	3.55902608401	1	9

The reason for this inaccuracy is that I only commented on four Hadoop issues so far. Instead of regarding all of this as taking one random sample from commentcounts.txt, it’s better to think of it as taking a random sample for each comment author mentioned in this file. The number of issues for each author then corresponds to the sample size, which is inversely proportional to the sampling error you can expect for many statistics. More precisely, the error will, on average, be inversely proportional to the square root of the sample size for many statistics, including the mean.

Now, you might wonder why random sampling is useful in a Hadoop context. After all, the point of Hadoop is to allow you to easily process very large datasets, and hence you usually shouldn’t have to revert to random sampling to compute the numbers you’re interested in. Nevertheless, random sampling does have some advantages though, the main one being that it can allow you to get a representative dataset that fits entirely into your desktop’s memory, which creates the possibility of using a convenient software package like Pylab, R, Matlab, or even Excel for your analysis. Furthermore, random sampling is often also a faster way of computing the statistics you’re after, especially when your Hadoop cluster is very busy, since the sampling doesn’t require any reduce task slots (which can be hard to get a hold of on a busy cluster).