Hiding join keys

May 15, 2009

Dumbo 0.21.12 further simplifies joining by hiding the join keys from the “end-programmer”. For instance, the example discussed here can now be written as follows:

import dumbo
from dumbo.lib import MultiMapper, JoinReducer
from dumbo.decor import primary, secondary

def mapper(key, value):
    yield value.split("\t", 1)

class Reducer(JoinReducer):
    def primary(self, key, values):
        self.hostname = values.next()
    def secondary(self, key, values):
        key = self.hostname
        for value in values:
            yield key, value

if __name__ == "__main__":
    multimapper = MultiMapper()
    multimapper.add("hostnames", primary(mapper))
    multimapper.add("logs", secondary(mapper))
    dumbo.run(multimapper, Reducer)

These are the things to note in this fancier version:

  • The mapping is implemented by combining decorated mappers into one MultiMapper.
  • The reducing is implemented by extending JoinReducer.
  • There is no direct interaction with the join keys. In fact, the -joinkeys yes option doesn’t even get specified explicitly (the decorated mappers and JoinReducer automatically make sure this option gets added via their opts attributes).

The primary and secondary decorators can, of course, also be applied using the @decorator syntax, i.e., I could also have written

@primary
def mapper1(key, value):
    yield value.split("\t", 1)

@secondary
def mapper2(key, value):
    yield value.split("\t", 1)

and

    multimapper.add("hostnames", mapper1)
    multimapper.add("logs", mapper2)

This is less convenient for this particular example, but it might be preferable when your primary and secondary mapper have different implementations.


The @opt decorator

May 13, 2009

I added the @opt decorator to Dumbo yesterday, and decided it was useful enough to justify a new minor release. Using this decorator, mappers and reducers can specify (additional) options they rely on. For instance, you can now write

from dumbo import opt, run, sumreducer

@opt("addpath", "yes")
def mapper(key, value):
    for word in value.split():
        yield (word, key[0]), 1

if __name__ == "__main__":
    run(mapper, sumreducer, combiner=sumreducer)

instead of

from dumbo import main, sumreducer

def mapper(key, value):
    for word in value.split():
        yield (word, key[0]), 1

def runner(job):
    job.additer(mapper, sumreducer, combiner=sumreducer)

def starter(prog):
    prog.addopt("addpath", "yes")

if __name__ == "__main__":
    main(runner, starter)

to count words on a file per file basis (recall that the -addpath yes option makes sure the file path is prepended to the key passed to the mapper). The former version is not only shorter, but also more clear, since the option specification is closer to the code that relies on it.

Under the hood, the @opt decorator appends the given option to an opts list attribute. For mapper and reducer classes, you can just set this attribute directly:

class Mapper:
    opts = [("addpath", "yes")]
    def __call__(self, key, value):
        for word in value.split():
            yield (word, key[0]), 1

if __name__ == "__main__":
    from dumbo import run, sumreducer
    run(Mapper, sumreducer, combiner=sumreducer)

Furthermore, it’s now also possible to pass an options list to run() or additer() via the opts argument:

def mapper(key, value):
    for word in value.split():
        yield (word, key[0]), 1

if __name__ == "__main__":
    from dumbo import run, sumreducer
    opts = [("addpath", "yes")]
    run(mapper, sumreducer, combiner=sumreducer, opts=opts)

which could be handy when you want to use join keys for only one iteration, for example.


Fast Python module for typed bytes

April 13, 2009

Over the past few days, I spent some time implementing a typed bytes Python module in C. It’s probably not quite ready for production use yet, and it still falls back to the pure python module for floats, but it seems to work fine and already leads to substantial speedups.

For example, the Python program

from typedbytes import Output
Output(open("test.tb", "wb")).writes(xrange(10**7))

needs 18.8 secs to finish on this laptop, whereas it requires only 0.9 secs after replacing typedbytes with ctypedbytes. Similarly, the running time for

from typedbytes import Input
for item in Input(open("test.tb", "rb")).reads(): pass

can be reduced from 22.9 to merely 1.7 secs by using ctypedbytes instead of typedbytes.

Obviously, Dumbo programs can benefit from this faster typed bytes module as well, but the gains probably won’t be as spectacular as for the simple test programs above. To give it a go, make sure you’re using the latest version of Dumbo, build an egg for the ctypedbytes module, and add the following option to your start command:

-libegg <path to ctypedbytes egg>

From what I’ve seen so far, this can speed up Dumbo programs by 30%, which definitely makes it worth the effort if you ask me. In fact, the Dumbo program would now probably beat the Java program in the benchmark discussed here, but, unfortunately, this wouldn’t be a very fair comparison. Johan recently made me aware of the fact that it’s better to avoid Java’s split() method for strings when you don’t need regular expression support, and using a combination of substring() and indexOf() instead seems to make the Java program about 40% faster. So we’re not quite as fast as Java yet, but at least the gap got narrowed down some more.


Computing TF-IDF weights

March 15, 2009

The guys from Cloudera are now providing their basic Hadoop training for free online. The lecture about MapReduce algorithms was the first one I watched, and I couldn’t resist writing a Dumbo program that implements the last algorithm discussed in this lecture:

from dumbo import main, sumreducer
from math import log

def mapper1(key, value):
    for word in value.split():
        yield (word, key[0]), 1

def mapper2(key, value):
    yield key[1], (key[0], value)

def reducer2(key, values):
    values = list(values)
    N = sum(value[1] for value in values)
    for (word, n) in values:
        yield (word, key), (n, N)

def mapper3(key, value):
    yield key[0], (key[1], value[0], value[1], 1)

class Reducer3:
    def __init__(self):
        self.doccount = float(self.params["doccount"])
    def __call__(self, key, values):
        values = list(values)
        m = sum(value[3] for value in values)
        for (doc, n, N) in (value[:3] for value in values):
            yield (key, doc), (float(n) / N) * log(self.doccount / m)

def runner(job):
    job.additer(mapper1, sumreducer, combiner=sumreducer)
    job.additer(mapper2, reducer2)
    job.additer(mapper3, Reducer3)

def starter(prog):
    prog.addopt("addpath", "yes")

if __name__ == "__main__":
    main(runner, starter)

As suggested on the next to last slide, I avoided the need for a 4th iteration by already computing the TF-IDF weights in the 3th reducer, but apart from that, this program works exactly as explained in the lecture. When running it, you have to use the parameter option -param doccount=<number of documents> to specify the total number of documents (which could be calculated by another Dumbo program, if necessary), and since the first mapper expects the keys to be of the form (file path, offset in file), the option -addpath yes is also required, but this one is automatically added by the starter function.

The line

values = list(values)

in Reducer3.__call__ is where the buffering issue discussed in the lecture occurs. For words like “the”, it might not be possible to put all of the values returned by the iterator values into memory as a list, and while trying to do so the program might even cause some cluster nodes to start swapping like crazy, up till the point where they become completely unresponsive and need to be power cycled in order to bring them back. This happened a few times at Last.fm, which is why we added the -memlimit <number of bytes> option to Dumbo. By putting the line

memlimit: <maximum number of bytes>

in the [common] section of /etc/dumbo.conf, you can set an upper limit on the amount of memory that can be used, and make your Dumbo programs fail with a MemoryError when they exceed this limit.


Simple job logs analysis

March 4, 2009

Computing some simple statistics for the jobs that run on your Hadoop cluster can be very useful in practice. Data collection systems like Chuckwa probably allow you to do this, but if you don’t have hundreds of nodes, simply running the following shell script daily on your master might be all you need:

find /path/to/hadoop/logs/history/ -daystart -ctime 1 | \
grep -v 'xml$' | grep -v 'crc$' | while read FILE; do
NAME="`basename $FILE`"
sed 's/ *$//' $FILE | sed "s/\$/ JOBNAME=\"$NAME\"/"
done > /tmp/joblogs.txt

DATE="`date -d yesterday +"%Y/%m/%d"`"
/path/to/hadoop/bin/hadoop dfs -mkdir /user/hadoop/joblogs/$DATE
/path/to/hadoop/bin/hadoop dfs -put /tmp/joblogs.txt \
/user/hadoop/joblogs/$DATE/joblogs.txt || exit 1

rm /tmp/joblogs.txt

This script takes all of yesterday’s job logs, adds a JOBNAME=”<filename>” field to each line, puts everything in a single file, and uploads this file to the DFS. Once you got this set up, you can use Hadoop to analyse your job logs. Here’s an example in the form of a Dumbo program:

from dumbo import sumsreducer, statsreducer, statscombiner, main

class Mapper1:
    def __init__(self):
        from re import compile
        self.fieldselector = compile('([A-Z_]+)="([^"]*)"')
    def __call__(self, key, value):
        logtype = value.split(" ", 1)[0]
        if logtype.endswith("Attempt"):
            try:
                fields = dict(self.fieldselector.findall(value))
                jobname = fields["JOBNAME"]
                taskid = fields["TASK_ATTEMPT_ID"]
                if "START_TIME" in fields:
                    start = int(fields["START_TIME"])
                    yield (logtype, jobname, taskid), (start, 0)
                elif "FINISH_TIME" in fields and not "ERROR" in fields:
                    stop = int(fields["FINISH_TIME"])
                    yield (logtype, jobname, taskid), (0, stop)
            except KeyError:
                self.counters["Broken loglines"] += 1

def mapper2(key, value):
    if value[0] > 0 and value[1] > 0:
        yield key[:2], (value[1] - value[0]) / 1000.0

def runner(job):
    job.additer(Mapper1, sumsreducer, combiner=sumsreducer)
    job.additer(mapper2, statsreducer, statscombiner)

def starter(prog):
    date = prog.delopt("date")
    if date:
        prog.addopt("input", "/user/hadoop/joblogs/" + date)
        prog.addopt("name", "jobstats-" + date)

if __name__ == "__main__":
    main(runner, starter)

From the output of this program, you can easily generate a few charts that show you which jobs are slowest. We recently started playing with this at Last.fm, mainly because such charts allows us to identify the jobs on which we should focus our optimization efforts.


Indexing typed bytes

March 3, 2009

In order to be able to do something useful with the output of a Dumbo program, you often need to index it first. Suppose, for instance, that you ran a program that computes JIRA issue recommendations on lots and lots of issues, saving the output to jira/recs on the DFS. If the HADOOP-1722 patch has been applied to your Hadoop build, you can then dump this output to a fairly compact typed bytes file recs.tb as follows:

$ /path/to/hadoop/bin/hadoop jar \
/path/to/hadoop/build/contrib/streaming/*.jar dumptb jira/recs > recs.tb

Such a typed bytes file isn’t very convenient for doing lookups though, since you basically have to go through the whole thing to find the recommendations corresponding to a particular issue. Keeping it completely in memory as a hash table might be an option if you have tons of RAM, but indexing it and putting only the index in memory can do the trick as well.

Here’s a very simple, but nevertheless quite effective, way of generating an index for a typed bytes file:

import sys
import typedbytes

infile = open(sys.argv[1], "rb")
outfile = open(sys.argv[2], "wb")

input = typedbytes.PairedInput(infile)
output = typedbytes.PairedOutput(outfile)
pos = 0
for key, value in input:
    output.write((key, pos))
    pos = infile.tell()

infile.close()
outfile.close()

This Python program relies on the typedbytes module to read a given typed bytes file and generate a second one containing (key, position) pairs. More concretely, running it with the arguments recs.tb recs.tb.idx leads to an index file recs.tb.idx for recs.tb. If you put

file = open("recs.tb")
input = typedbytes.PairedInput(file)
index = dict(typedbytes.PairedInput(open("recs.tb.idx")))

in the initialization part of a Python program, you can then lookup the recommendations for a particular issuenr as follows:

file.seek(index[issuenr])
readnr, recs = input.read()
if readnr != issuenr:
    raise ValueError("corrupt index")

When dealing with huge output files, you might have to use a second-level index on (a sorted version of) the index obtained in this way, but in many cases this simple approach gets the job done just fine.


Mapper and reducer classes

February 26, 2009

As explained in the short tutorial, Dumbo provides the ability to use a class as mapper and/or reducer (and also as combiner, of course, but a combiner is really just a reducer with a slightly different purpose). Up until now, the main benefit of this was the possibility that it creates to use the constructor for initializations like, e.g., loading the contents of a file into memory. Version 0.20.27 introduces another reason to use classes instead of functions, however, as illustrated by the following extended version of the sampling program from my previous post:

class Mapper:
    def __init__(self):
        self.status = "Initialization started"
        from random import Random
        self.randgen = Random()
        self.cutoff = float(self.params["percentage"]) / 100
        self.samplesize = self.counters["Sample size"]
        self.status = "Initialization done"
    def __call__(self, key, value):
        if self.randgen.random() < self.cutoff:
            self.samplesize += 1
            yield key, value

if __name__ == "__main__":
    from dumbo import run
    run(Mapper)

The things to note in this code are the class variables params, counters, and status, which seem to come out of nowhere. From version 0.20.27 onwards, Dumbo will instantiate a dynamically generated class that inherits from both the class supplied by the programmer and dumbo.MapRedBase, resulting in the seemingly magical addition of the following fields:

  • params: A dictionary that contains the parameters specified using -param <key>=<value> options.
  • counters: You can think of this field as a defaultdict containing dumbo.Counter objects. When a given key has no corresponding counter yet, a new counter is created using the key as name for it. You can still change the name afterwards by assigning a different string to the counter’s name field, but by using a suitable key you can put out two candles with one blow.
  • status: Strings assigned to this field will show up as status message for the task in Hadoop’s web interface.

And as if this is not enough, you can now also use the

counter += amount

syntax to increment counters, instead of the less fancy (and harder to remember)

counter.incr(amount)

method call.


Random sampling

February 25, 2009

Simple random sampling is a technique often used in data analysis to (substantially) reduce the amount of data to be processed. In this post, I’ll take a stab at explaining how this can be done with Dumbo.

Consider the file commentcounts.txt, derived as follows from the comments.txt file that was generated as part of an earlier post:

$ sort comments.txt | uniq -c | \
sed 's/^[^0-9]*\([0-9]*\) \(.*\)*/\2\t\1/' > commentcounts.txt

which leads to lines of the form:

<Hadoop JIRA issue number>\t<comment author>\t<number of comments>

By means of the following Dumbo program, some statistics can be computed for each of the comment authors that occur in this file:

def mapper(key, value):
    issuenr, commenter, count = value.split("\t")
    yield commenter, int(count)

if __name__ == "__main__":
    from dumbo import run, statsreducer, statscombiner
    run(mapper, statsreducer, statscombiner)

The following three lines are part of the output obtained by running this program on commentcounts.txt:

Doug Cutting	1487    2.28984532616   2.94902633612   1       52
Owen O'Malley	1062    1.90301318267   1.87736300761   1       21
Tom White	268     2.26492537313   2.45723899975   1       24

Each of these lines ends with a number sequence consisting of the total number of issues commented on, followed by the mean, standard deviation, minimum, and maximum, respectively, of the number of comments per issue.

An easy way to take a random sample from commentcounts.txt is by means of the following Dumbo program:

from random import random

def mapper(key, value):
    if random() < 0.5: yield key, value

if __name__ == "__main__":
    from dumbo import run
    run(mapper)

Running the statistics program on the output of this sampling program gave me the following lines for Doug, Owen, and Tom:

Doug Cutting    760     2.22105263158   2.94351722074   1       52
Owen O'Malley   506     1.84584980237   1.69629699266   1       19
Tom White       131     2.23664122137   2.0296732376    1       11

A quick comparison of these lines with the ones above reveals that:

  • The number of issues about halved for each of the considered comment authors, which makes sense since Python’s random() function generates uniformly distributed floats between 0.0 (inclusive) and 1.0 (exclusive).
  • The means and standard deviations computed from the sample are quite close to the ones obtained from the complete dataset.
  • Two out of the three maximums computed from the sample aren’t very close to the real one. In Tom’s case it even isn’t close at all.

So, apparently, we can reliably compute means and standard deviations — but not maximums — from a sample, which shouldn’t come as a surprise to anyone who has some basic knowledge of statistics. Just like maximums, minimums can, in general, not be computed from samples by the way, even though this happens to work fine in the example above.

Even for means and standard deviations, some caution has to be taken when computing them from a random sample though. In my case, for instance, the mean computed from the sample is 4.0, whereas the real mean is 7.0:

Klaas Bosteels	4	7.0	6.0415229868	1	16
Klaas Bosteels	3	4.0	3.55902608401	1	9

The reason for this inaccuracy is that I only commented on four Hadoop issues so far. Instead of regarding all of this as taking one random sample from commentcounts.txt, it’s better to think of it as taking a random sample for each comment author mentioned in this file. The number of issues for each author then corresponds to the sample size, which is inversely proportional to the sampling error you can expect for many statistics. More precisely, the error will, on average, be inversely proportional to the square root of the sample size for many statistics, including the mean.

Now, you might wonder why random sampling is useful in a Hadoop context. After all, the point of Hadoop is to allow you to easily process very large datasets, and hence you usually shouldn’t have to revert to random sampling to compute the numbers you’re interested in. Nevertheless, random sampling does have some advantages though, the main one being that it can allow you to get a representative dataset that fits entirely into your desktop’s memory, which creates the possibility of using a convenient software package like Pylab, R, Matlab, or even Excel for your analysis. Furthermore, random sampling is often also a faster way of computing the statistics you’re after, especially when your Hadoop cluster is very busy, since the sampling doesn’t require any reduce task slots (which can be hard to get a hold of on a busy cluster).


Follow

Get every new post delivered to your Inbox.