Computing TF-IDF weights

March 15, 2009

The guys from Cloudera are now providing their basic Hadoop training for free online. The lecture about MapReduce algorithms was the first one I watched, and I couldn’t resist writing a Dumbo program that implements the last algorithm discussed in this lecture:

from dumbo import main, sumreducer
from math import log

def mapper1(key, value):
    for word in value.split():
        yield (word, key[0]), 1

def mapper2(key, value):
    yield key[1], (key[0], value)

def reducer2(key, values):
    values = list(values)
    N = sum(value[1] for value in values)
    for (word, n) in values:
        yield (word, key), (n, N)

def mapper3(key, value):
    yield key[0], (key[1], value[0], value[1], 1)

class Reducer3:
    def __init__(self):
        self.doccount = float(self.params["doccount"])
    def __call__(self, key, values):
        values = list(values)
        m = sum(value[3] for value in values)
        for (doc, n, N) in (value[:3] for value in values):
            yield (key, doc), (float(n) / N) * log(self.doccount / m)

def runner(job):
    job.additer(mapper1, sumreducer, combiner=sumreducer)
    job.additer(mapper2, reducer2)
    job.additer(mapper3, Reducer3)

def starter(prog):
    prog.addopt("addpath", "yes")

if __name__ == "__main__":
    main(runner, starter)

As suggested on the next to last slide, I avoided the need for a 4th iteration by already computing the TF-IDF weights in the 3th reducer, but apart from that, this program works exactly as explained in the lecture. When running it, you have to use the parameter option -param doccount=<number of documents> to specify the total number of documents (which could be calculated by another Dumbo program, if necessary), and since the first mapper expects the keys to be of the form (file path, offset in file), the option -addpath yes is also required, but this one is automatically added by the starter function.

The line

values = list(values)

in Reducer3.__call__ is where the buffering issue discussed in the lecture occurs. For words like “the”, it might not be possible to put all of the values returned by the iterator values into memory as a list, and while trying to do so the program might even cause some cluster nodes to start swapping like crazy, up till the point where they become completely unresponsive and need to be power cycled in order to bring them back. This happened a few times at Last.fm, which is why we added the -memlimit <number of bytes> option to Dumbo. By putting the line


Mapper and reducer classes

February 26, 2009

As explained in the short tutorial, Dumbo provides the ability to use a class as mapper and/or reducer (and also as combiner, of course, but a combiner is really just a reducer with a slightly different purpose). Up until now, the main benefit of this was the possibility that it creates to use the constructor for initializations like, e.g., loading the contents of a file into memory. Version 0.20.27 introduces another reason to use classes instead of functions, however, as illustrated by the following extended version of the sampling program from my previous post:

class Mapper:
    def __init__(self):
        self.status = "Initialization started"
        from random import Random
        self.randgen = Random()
        self.cutoff = float(self.params["percentage"]) / 100
        self.samplesize = self.counters["Sample size"]
        self.status = "Initialization done"
    def __call__(self, key, value):
        if self.randgen.random() < self.cutoff:
            self.samplesize += 1
            yield key, value

if __name__ == "__main__":
    from dumbo import run
    run(Mapper)

The things to note in this code are the class variables params, counters, and status, which seem to come out of nowhere. From version 0.20.27 onwards, Dumbo will instantiate a dynamically generated class that inherits from both the class supplied by the programmer and dumbo.MapRedBase, resulting in the seemingly magical addition of the following fields:

  • params: A dictionary that contains the parameters specified using -param <key>=<value> options.
  • counters: You can think of this field as a defaultdict containing dumbo.Counter objects. When a given key has no corresponding counter yet, a new counter is created using the key as name for it. You can still change the name afterwards by assigning a different string to the counter’s name field, but by using a suitable key you can put out two candles with one blow.
  • status: Strings assigned to this field will show up as status message for the task in Hadoop’s web interface.

And as if this is not enough, you can now also use the

counter += amount

syntax to increment counters, instead of the less fancy (and harder to remember)

counter.incr(amount)

method call.