Computing some simple statistics for the jobs that run on your Hadoop cluster can be very useful in practice. Data collection systems like Chuckwa probably allow you to do this, but if you don’t have hundreds of nodes, simply running the following shell script daily on your master might be all you need:
find /path/to/hadoop/logs/history/ -daystart -ctime 1 | \ grep -v 'xml$' | grep -v 'crc$' | while read FILE; do NAME="`basename $FILE`" sed 's/ *$//' $FILE | sed "s/\$/ JOBNAME=\"$NAME\"/" done > /tmp/joblogs.txt DATE="`date -d yesterday +"%Y/%m/%d"`" /path/to/hadoop/bin/hadoop dfs -mkdir /user/hadoop/joblogs/$DATE /path/to/hadoop/bin/hadoop dfs -put /tmp/joblogs.txt \ /user/hadoop/joblogs/$DATE/joblogs.txt || exit 1 rm /tmp/joblogs.txt
This script takes all of yesterday’s job logs, adds a JOBNAME=”<filename>” field to each line, puts everything in a single file, and uploads this file to the DFS. Once you got this set up, you can use Hadoop to analyse your job logs. Here’s an example in the form of a Dumbo program:
from dumbo import sumsreducer, statsreducer, statscombiner, main class Mapper1: def __init__(self): from re import compile self.fieldselector = compile('([A-Z_]+)="([^"]*)"') def __call__(self, key, value): logtype = value.split(" ", 1) if logtype.endswith("Attempt"): try: fields = dict(self.fieldselector.findall(value)) jobname = fields["JOBNAME"] taskid = fields["TASK_ATTEMPT_ID"] if "START_TIME" in fields: start = int(fields["START_TIME"]) yield (logtype, jobname, taskid), (start, 0) elif "FINISH_TIME" in fields and not "ERROR" in fields: stop = int(fields["FINISH_TIME"]) yield (logtype, jobname, taskid), (0, stop) except KeyError: self.counters["Broken loglines"] += 1 def mapper2(key, value): if value > 0 and value > 0: yield key[:2], (value - value) / 1000.0 def runner(job): job.additer(Mapper1, sumsreducer, combiner=sumsreducer) job.additer(mapper2, statsreducer, statscombiner) def starter(prog): date = prog.delopt("date") if date: prog.addopt("input", "/user/hadoop/joblogs/" + date) prog.addopt("name", "jobstats-" + date) if __name__ == "__main__": main(runner, starter)
From the output of this program, you can easily generate a few charts that show you which jobs are slowest. We recently started playing with this at Last.fm, mainly because such charts allows us to identify the jobs on which we should focus our optimization efforts.