TF-IDF revisited

May 17, 2009

Remember the buffering problems for the TF-IDF program discussed in a previous post as well as the lecture about MapReduce algorithms from Cloudera‘s free Hadoop training? Thanks to the new joining abstraction (and the minor fixes and enhancements in Dumbo 0.21.13 and 0.21.14), these problems can now easily be avoided:

from dumbo import *
from math import log

@opt("addpath", "yes")
def mapper1(key, value):
    for word in value.split():
        yield (key[0], word), 1

@primary
def mapper2a(key, value):
    yield key[0], value

@secondary
def mapper2b(key, value):
    yield key[0], (key[1], value)

@primary
def mapper3a(key, value):
    yield value[0], 1

@secondary
def mapper3b(key, value):
    yield value[0], (key, value[1])

class Reducer(JoinReducer):
    def __init__(self):
        self.sum = 0
    def primary(self, key, values):
        self.sum = sum(values)

class Combiner(JoinCombiner):
    def primary(self, key, values):
        yield key, sum(values)

class Reducer1(Reducer):
    def secondary(self, key, values):
        for (doc, n) in values:
            yield key, (doc, float(n) / self.sum)

class Reducer2(Reducer):
    def __init__(self):
        Reducer.__init__(self)
        self.doccount = float(self.params["doccount"])
    def secondary(self, key, values):
        idf = log(self.doccount / self.sum)
        for (doc, tf) in values:
            yield (key, doc), tf * idf

def runner(job):
    job.additer(mapper1, sumreducer, combiner=sumreducer)
    multimapper = MultiMapper()
    multimapper.add("", mapper2a)
    multimapper.add("", mapper2b)
    job.additer(multimapper, Reducer1, Combiner)
    multimapper = MultiMapper()
    multimapper.add("", mapper3a)
    multimapper.add("", mapper3b)
    job.additer(multimapper, Reducer2, Combiner)

if __name__ == "__main__":
    main(runner)

Most of this Dumbo program shouldn’t be hard to understand if you had a peek at the posts about hiding join keys and the @opt decorator, except maybe for the following things:

  • The first argument supplied to MultiMapper‘s add method is a string corresponding to the pattern that has to occur in the file path in order for the added mapper to run on the key/value pairs in a given file. Since the empty string “” is considered to occur in every possible path string, all added mappers run on each input file in this example program.
  • It is possible (but not necessary) to yield key/value pairs in a JoinReducer‘s primary method, as illustrated by the Combiner class in this example.
  • The default implementations of JoinReducer‘s primary and secondary methods are identity operations, so Combiner combines the primary pairs and just passes on the secondary ones.

Writing this program went surprisingly smooth and didn’t take much effort at all. Apparently, the “primary/secondary abstraction” works really well for me.


Computing TF-IDF weights

March 15, 2009

The guys from Cloudera are now providing their basic Hadoop training for free online. The lecture about MapReduce algorithms was the first one I watched, and I couldn’t resist writing a Dumbo program that implements the last algorithm discussed in this lecture:

from dumbo import main, sumreducer
from math import log

def mapper1(key, value):
    for word in value.split():
        yield (word, key[0]), 1

def mapper2(key, value):
    yield key[1], (key[0], value)

def reducer2(key, values):
    values = list(values)
    N = sum(value[1] for value in values)
    for (word, n) in values:
        yield (word, key), (n, N)

def mapper3(key, value):
    yield key[0], (key[1], value[0], value[1], 1)

class Reducer3:
    def __init__(self):
        self.doccount = float(self.params["doccount"])
    def __call__(self, key, values):
        values = list(values)
        m = sum(value[3] for value in values)
        for (doc, n, N) in (value[:3] for value in values):
            yield (key, doc), (float(n) / N) * log(self.doccount / m)

def runner(job):
    job.additer(mapper1, sumreducer, combiner=sumreducer)
    job.additer(mapper2, reducer2)
    job.additer(mapper3, Reducer3)

def starter(prog):
    prog.addopt("addpath", "yes")

if __name__ == "__main__":
    main(runner, starter)

As suggested on the next to last slide, I avoided the need for a 4th iteration by already computing the TF-IDF weights in the 3th reducer, but apart from that, this program works exactly as explained in the lecture. When running it, you have to use the parameter option -param doccount=<number of documents> to specify the total number of documents (which could be calculated by another Dumbo program, if necessary), and since the first mapper expects the keys to be of the form (file path, offset in file), the option -addpath yes is also required, but this one is automatically added by the starter function.

The line

values = list(values)

in Reducer3.__call__ is where the buffering issue discussed in the lecture occurs. For words like “the”, it might not be possible to put all of the values returned by the iterator values into memory as a list, and while trying to do so the program might even cause some cluster nodes to start swapping like crazy, up till the point where they become completely unresponsive and need to be power cycled in order to bring them back. This happened a few times at Last.fm, which is why we added the -memlimit <number of bytes> option to Dumbo. By putting the line