The guys from Cloudera are now providing their basic Hadoop training for free online. The lecture about MapReduce algorithms was the first one I watched, and I couldn’t resist writing a Dumbo program that implements the last algorithm discussed in this lecture:
from dumbo import main, sumreducer
from math import log
def mapper1(key, value):
for word in value.split():
yield (word, key[0]), 1
def mapper2(key, value):
yield key[1], (key[0], value)
def reducer2(key, values):
values = list(values)
N = sum(value[1] for value in values)
for (word, n) in values:
yield (word, key), (n, N)
def mapper3(key, value):
yield key[0], (key[1], value[0], value[1], 1)
class Reducer3:
def __init__(self):
self.doccount = float(self.params["doccount"])
def __call__(self, key, values):
values = list(values)
m = sum(value[3] for value in values)
for (doc, n, N) in (value[:3] for value in values):
yield (key, doc), (float(n) / N) * log(self.doccount / m)
def runner(job):
job.additer(mapper1, sumreducer, combiner=sumreducer)
job.additer(mapper2, reducer2)
job.additer(mapper3, Reducer3)
def starter(prog):
prog.addopt("addpath", "yes")
if __name__ == "__main__":
main(runner, starter)
As suggested on the next to last slide, I avoided the need for a 4th iteration by already computing the TF-IDF weights in the 3th reducer, but apart from that, this program works exactly as explained in the lecture. When running it, you have to use the parameter option -param doccount=<number of documents> to specify the total number of documents (which could be calculated by another Dumbo program, if necessary), and since the first mapper expects the keys to be of the form (file path, offset in file), the option -addpath yes is also required, but this one is automatically added by the starter function.
The line
values = list(values)
in Reducer3.__call__ is where the buffering issue discussed in the lecture occurs. For words like “the”, it might not be possible to put all of the values returned by the iterator values into memory as a list, and while trying to do so the program might even cause some cluster nodes to start swapping like crazy, up till the point where they become completely unresponsive and need to be power cycled in order to bring them back. This happened a few times at Last.fm, which is why we added the -memlimit <number of bytes> option to Dumbo. By putting the line
memlimit: <maximum number of bytes>
in the [common] section of /etc/dumbo.conf, you can set an upper limit on the amount of memory that can be used, and make your Dumbo programs fail with a MemoryError when they exceed this limit.
have you considered a reasonable default dumbo memlimit?
I’ve thought about that yes, but never got round to implement it. It’s officially on the TODO list now though, so it’ll probably get added soonish.
[...] count words on a file per file basis (recall that the -addpath yes option makes sure the file path is prepended to the key passed to the [...]
[...] Remember the buffering problems for the TF-IDF program discussed in a previous post as well as the lecture about MapReduce algorithms from Cloudera’s free Hadoop training? [...]
[...] TFIDF weights for Wikipedia articles and incorporate it into the similarity [...]
austin condos…
[...]Computing TF-IDF weights « Dumbotics[...]…