Earlier today I released Dumbo 0.20.29, which adds built-in support for parser and record classes. The easiest way to explain the use of such classes is by means of an example, so lets just dive in straight away and look at some code:
class JobLog: def __init__(self, logline=None): from re import compile self.fieldselector = compile('([A-Z_]+)="([^"]*)"') if logline: self.parse(logline) def parse(self, logline): self.type = logline.split(" ", 1) fields = dict(self.fieldselector.findall(logline)) self.fields = fields if self.type.endswith("Attempt"): try: self.jobname = fields["JOBNAME"] self.taskid = fields["TASK_ATTEMPT_ID"] if "START_TIME" in fields: self.start = int(fields["START_TIME"]) self.stop = None self.status = None elif "FINISH_TIME" in fields: self.start = None self.stop = int(fields["FINISH_TIME"]) self.status = fields["TASK_STATUS"] except KeyError, err: raise ValueError(str(err)) return self
This class is a parser because:
- It has a parse method that takes a line of text as input and returns an object that represents this line.
- Its parse method raises a ValueError when the given line of text is in an unexpected format.
Since the objects returned by the parse method only have to stay valid until the next time parse is called, the same object can be reused. It’s even possible to return the parser object itself, as illustrated by the example above.
Now, when you put JobLog in parsers.py and add the option -file parsers.py to the dumbo start command, you can rewrite the job statistics program from my previous post as follows:
from dumbo import sumsreducer, statsreducer, statscombiner, main def mapper1(key, log): if log.type.endswith("Attempt"): if log.start: yield (log.type, log.jobname, log.taskid), (log.start, 0) elif log.stop and log.status == "SUCCESS": yield (log.type, log.jobname, log.taskid), (0, log.stop) def mapper2(key, value): if value > 0 and value > 0: yield key[:2], (value - value) / 1000.0 def runner(job): job.additer(Mapper1, sumsreducer, combiner=sumsreducer) job.additer(mapper2, statsreducer, statscombiner) def starter(prog): prog.addopt("parser", "parsers.JobLog") date = prog.delopt("date") if date: prog.addopt("input", "/user/hadoop/joblogs/" + date) prog.addopt("name", "jobstats-" + date) if __name__ == "__main__": main(runner, starter)
The main thing to note in this code is the line
By means of the new -parser <full name of Python class> option, you can tell Dumbo to convert all values to objects using a given parser, before passing them on to your mapper. When a ValueError occurs during this process, Dumbo will ignore the corresponding value and increment the counter “Bad inputs”, unless you indicated that errors shouldn’t be caught by using the option -debug yes.
Record classes are similar to parser classes, but instead of a parse method they have a set method that can take multiple values as input. The corresponding option is -record <full name of Python class>, which makes Dumbo create an instance of the given record class and call instance.set(*value) for each value. Since Dumbo represents Hadoop records as sequences of Python values corresponding to the different fields, this is especially useful when consuming files that contain such Hadoop records (which explains why I refer to these value-transforming Python classes as “records”).
At Last.fm, we’ve been using parser and record classes for quite a while now, because they allow us to both abstract away file format details and handle many bug fixes and improvements centrally. We bundle them in Python eggs and make sure they are submitted along with every Dumbo program by putting the necessary -libegg options in /etc/dumbo.conf. Always having to manually create and invoke the parser or record class quickly becomes a bit tiring though, so the new -parser and -record options are definitely a welcome improvement.