Remember the buffering problems for the TF-IDF program discussed in a previous post as well as the lecture about MapReduce algorithms from Cloudera‘s free Hadoop training? Thanks to the new joining abstraction (and the minor fixes and enhancements in Dumbo 0.21.13 and 0.21.14), these problems can now easily be avoided:
from dumbo import * from math import log @opt("addpath", "yes") def mapper1(key, value): for word in value.split(): yield (key, word), 1 @primary def mapper2a(key, value): yield key, value @secondary def mapper2b(key, value): yield key, (key, value) @primary def mapper3a(key, value): yield value, 1 @secondary def mapper3b(key, value): yield value, (key, value) class Reducer(JoinReducer): def __init__(self): self.sum = 0 def primary(self, key, values): self.sum = sum(values) class Combiner(JoinCombiner): def primary(self, key, values): yield key, sum(values) class Reducer1(Reducer): def secondary(self, key, values): for (doc, n) in values: yield key, (doc, float(n) / self.sum) class Reducer2(Reducer): def __init__(self): Reducer.__init__(self) self.doccount = float(self.params["doccount"]) def secondary(self, key, values): idf = log(self.doccount / self.sum) for (doc, tf) in values: yield (key, doc), tf * idf def runner(job): job.additer(mapper1, sumreducer, combiner=sumreducer) multimapper = MultiMapper() multimapper.add("", mapper2a) multimapper.add("", mapper2b) job.additer(multimapper, Reducer1, Combiner) multimapper = MultiMapper() multimapper.add("", mapper3a) multimapper.add("", mapper3b) job.additer(multimapper, Reducer2, Combiner) if __name__ == "__main__": main(runner)
- The first argument supplied to MultiMapper‘s add method is a string corresponding to the pattern that has to occur in the file path in order for the added mapper to run on the key/value pairs in a given file. Since the empty string “” is considered to occur in every possible path string, all added mappers run on each input file in this example program.
- It is possible (but not necessary) to yield key/value pairs in a JoinReducer‘s primary method, as illustrated by the Combiner class in this example.
- The default implementations of JoinReducer‘s primary and secondary methods are identity operations, so Combiner combines the primary pairs and just passes on the secondary ones.
Writing this program went surprisingly smooth and didn’t take much effort at all. Apparently, the “primary/secondary abstraction” works really well for me.