Computing some simple statistics for the jobs that run on your Hadoop cluster can be very useful in practice. Data collection systems like Chuckwa probably allow you to do this, but if you don’t have hundreds of nodes, simply running the following shell script daily on your master might be all you need:
find /path/to/hadoop/logs/history/ -daystart -ctime 1 | \ grep -v 'xml$' | grep -v 'crc$' | while read FILE; do NAME="`basename $FILE`" sed 's/ *$//' $FILE | sed "s/\$/ JOBNAME=\"$NAME\"/" done > /tmp/joblogs.txt DATE="`date -d yesterday +"%Y/%m/%d"`" /path/to/hadoop/bin/hadoop dfs -mkdir /user/hadoop/joblogs/$DATE /path/to/hadoop/bin/hadoop dfs -put /tmp/joblogs.txt \ /user/hadoop/joblogs/$DATE/joblogs.txt || exit 1 rm /tmp/joblogs.txt
This script takes all of yesterday’s job logs, adds a JOBNAME=”<filename>” field to each line, puts everything in a single file, and uploads this file to the DFS. Once you got this set up, you can use Hadoop to analyse your job logs. Here’s an example in the form of a Dumbo program:
from dumbo import sumsreducer, statsreducer, statscombiner, main
class Mapper1:
def __init__(self):
from re import compile
self.fieldselector = compile('([A-Z_]+)="([^"]*)"')
def __call__(self, key, value):
logtype = value.split(" ", 1)[0]
if logtype.endswith("Attempt"):
try:
fields = dict(self.fieldselector.findall(value))
jobname = fields["JOBNAME"]
taskid = fields["TASK_ATTEMPT_ID"]
if "START_TIME" in fields:
start = int(fields["START_TIME"])
yield (logtype, jobname, taskid), (start, 0)
elif "FINISH_TIME" in fields and not "ERROR" in fields:
stop = int(fields["FINISH_TIME"])
yield (logtype, jobname, taskid), (0, stop)
except KeyError:
self.counters["Broken loglines"] += 1
def mapper2(key, value):
if value[0] > 0 and value[1] > 0:
yield key[:2], (value[1] - value[0]) / 1000.0
def runner(job):
job.additer(Mapper1, sumsreducer, combiner=sumsreducer)
job.additer(mapper2, statsreducer, statscombiner)
def starter(prog):
date = prog.delopt("date")
if date:
prog.addopt("input", "/user/hadoop/joblogs/" + date)
prog.addopt("name", "jobstats-" + date)
if __name__ == "__main__":
main(runner, starter)
From the output of this program, you can easily generate a few charts that show you which jobs are slowest. We recently started playing with this at Last.fm, mainly because such charts allows us to identify the jobs on which we should focus our optimization efforts.
March 6, 2009 at 3:05 pm |
[...] Posts Simple job logs analysisRandom samplingHADOOP-1722 and typed bytes Indexing typed [...]
April 17, 2009 at 9:53 am |
[...] this on the job logs for one of our clusters (which are gathered by the shell script discussed in this previous post) led to the following [...]
June 29, 2009 at 5:41 am |
[...] thing you’ll need to do is follow Klaas’ tip for collecting job logs into HDFS using a simple cron job. If you don’t already have Dumbo and want to keep your dev environment clean, you can follow [...]