The guys from Cloudera are now providing their basic Hadoop training for free online. The lecture about MapReduce algorithms was the first one I watched, and I couldn’t resist writing a Dumbo program that implements the last algorithm discussed in this lecture:
from dumbo import main, sumreducer from math import log def mapper1(key, value): for word in value.split(): yield (word, key), 1 def mapper2(key, value): yield key, (key, value) def reducer2(key, values): values = list(values) N = sum(value for value in values) for (word, n) in values: yield (word, key), (n, N) def mapper3(key, value): yield key, (key, value, value, 1) class Reducer3: def __init__(self): self.doccount = float(self.params["doccount"]) def __call__(self, key, values): values = list(values) m = sum(value for value in values) for (doc, n, N) in (value[:3] for value in values): yield (key, doc), (float(n) / N) * log(self.doccount / m) def runner(job): job.additer(mapper1, sumreducer, combiner=sumreducer) job.additer(mapper2, reducer2) job.additer(mapper3, Reducer3) def starter(prog): prog.addopt("addpath", "yes") if __name__ == "__main__": main(runner, starter)
As suggested on the next to last slide, I avoided the need for a 4th iteration by already computing the TF-IDF weights in the 3th reducer, but apart from that, this program works exactly as explained in the lecture. When running it, you have to use the parameter option -param doccount=<number of documents> to specify the total number of documents (which could be calculated by another Dumbo program, if necessary), and since the first mapper expects the keys to be of the form (file path, offset in file), the option -addpath yes is also required, but this one is automatically added by the starter function.
values = list(values)
in Reducer3.__call__ is where the buffering issue discussed in the lecture occurs. For words like “the”, it might not be possible to put all of the values returned by the iterator values into memory as a list, and while trying to do so the program might even cause some cluster nodes to start swapping like crazy, up till the point where they become completely unresponsive and need to be power cycled in order to bring them back. This happened a few times at Last.fm, which is why we added the -memlimit <number of bytes> option to Dumbo. By putting the line