The @opt decorator

May 13, 2009

I added the @opt decorator to Dumbo yesterday, and decided it was useful enough to justify a new minor release. Using this decorator, mappers and reducers can specify (additional) options they rely on. For instance, you can now write

from dumbo import opt, run, sumreducer

@opt("addpath", "yes")
def mapper(key, value):
    for word in value.split():
        yield (word, key[0]), 1

if __name__ == "__main__":
    run(mapper, sumreducer, combiner=sumreducer)

instead of

from dumbo import main, sumreducer

def mapper(key, value):
    for word in value.split():
        yield (word, key[0]), 1

def runner(job):
    job.additer(mapper, sumreducer, combiner=sumreducer)

def starter(prog):
    prog.addopt("addpath", "yes")

if __name__ == "__main__":
    main(runner, starter)

to count words on a file per file basis (recall that the -addpath yes option makes sure the file path is prepended to the key passed to the mapper). The former version is not only shorter, but also more clear, since the option specification is closer to the code that relies on it.

Under the hood, the @opt decorator appends the given option to an opts list attribute. For mapper and reducer classes, you can just set this attribute directly:

class Mapper:
    opts = [("addpath", "yes")]
    def __call__(self, key, value):
        for word in value.split():
            yield (word, key[0]), 1

if __name__ == "__main__":
    from dumbo import run, sumreducer
    run(Mapper, sumreducer, combiner=sumreducer)

Furthermore, it’s now also possible to pass an options list to run() or additer() via the opts argument:

def mapper(key, value):
    for word in value.split():
        yield (word, key[0]), 1

if __name__ == "__main__":
    from dumbo import run, sumreducer
    opts = [("addpath", "yes")]
    run(mapper, sumreducer, combiner=sumreducer, opts=opts)

which could be handy when you want to use join keys for only one iteration, for example.


Computing TF-IDF weights

March 15, 2009

The guys from Cloudera are now providing their basic Hadoop training for free online. The lecture about MapReduce algorithms was the first one I watched, and I couldn’t resist writing a Dumbo program that implements the last algorithm discussed in this lecture:

from dumbo import main, sumreducer
from math import log

def mapper1(key, value):
    for word in value.split():
        yield (word, key[0]), 1

def mapper2(key, value):
    yield key[1], (key[0], value)

def reducer2(key, values):
    values = list(values)
    N = sum(value[1] for value in values)
    for (word, n) in values:
        yield (word, key), (n, N)

def mapper3(key, value):
    yield key[0], (key[1], value[0], value[1], 1)

class Reducer3:
    def __init__(self):
        self.doccount = float(self.params["doccount"])
    def __call__(self, key, values):
        values = list(values)
        m = sum(value[3] for value in values)
        for (doc, n, N) in (value[:3] for value in values):
            yield (key, doc), (float(n) / N) * log(self.doccount / m)

def runner(job):
    job.additer(mapper1, sumreducer, combiner=sumreducer)
    job.additer(mapper2, reducer2)
    job.additer(mapper3, Reducer3)

def starter(prog):
    prog.addopt("addpath", "yes")

if __name__ == "__main__":
    main(runner, starter)

As suggested on the next to last slide, I avoided the need for a 4th iteration by already computing the TF-IDF weights in the 3th reducer, but apart from that, this program works exactly as explained in the lecture. When running it, you have to use the parameter option -param doccount=<number of documents> to specify the total number of documents (which could be calculated by another Dumbo program, if necessary), and since the first mapper expects the keys to be of the form (file path, offset in file), the option -addpath yes is also required, but this one is automatically added by the starter function.

The line

values = list(values)

in Reducer3.__call__ is where the buffering issue discussed in the lecture occurs. For words like “the”, it might not be possible to put all of the values returned by the iterator values into memory as a list, and while trying to do so the program might even cause some cluster nodes to start swapping like crazy, up till the point where they become completely unresponsive and need to be power cycled in order to bring them back. This happened a few times at Last.fm, which is why we added the -memlimit <number of bytes> option to Dumbo. By putting the line