Measuring our elephants’ appetite

April 17, 2009

We recently wanted to have a rough idea of how much data our Hadoop clusters process on a daily basis. Here’s the Dumbo program I used to obtain this information:

from datetime import date

class Mapper:
    def __init__(self):
        from re import compile
        self.numbers = []
        self.numbers.append(compile("HDFS bytes read:([0-9]+)"))
        self.numbers.append(compile("Local bytes read:([0-9]+)"))
        self.finish = compile('FINISH_TIME="([^"]+)"')
    def __call__(self, key, value):
        if value.startswith("Job") and "COUNTERS" in value:
            gb = 0  # gigabytes processed
            for number in self.numbers:
                mo =
                if mo: gb += int(round(float( / 2**30))
            ts = float( / 1000
            datetuple = date.fromtimestamp(ts).timetuple()[:3]
            yield datetuple, gb

if __name__ == "__main__":
    from dumbo import run, sumreducer
    run(Mapper, sumreducer, combiner=sumreducer)

Running this on the job logs for one of our clusters (which are gathered by the shell script discussed in this previous post) led to the following graph:

Bytes processed daily by one of our Hadoop clusters

This graph clearly shows why some of us get annoyed sometimes when they want to explore data on this cluster on certain days of the week or month…

Simple job logs analysis

March 4, 2009

Computing some simple statistics for the jobs that run on your Hadoop cluster can be very useful in practice. Data collection systems like Chuckwa probably allow you to do this, but if you don’t have hundreds of nodes, simply running the following shell script daily on your master might be all you need:

find /path/to/hadoop/logs/history/ -daystart -ctime 1 | \
grep -v 'xml$' | grep -v 'crc$' | while read FILE; do
NAME="`basename $FILE`"
sed 's/ *$//' $FILE | sed "s/\$/ JOBNAME=\"$NAME\"/"
done > /tmp/joblogs.txt

DATE="`date -d yesterday +"%Y/%m/%d"`"
/path/to/hadoop/bin/hadoop dfs -mkdir /user/hadoop/joblogs/$DATE
/path/to/hadoop/bin/hadoop dfs -put /tmp/joblogs.txt \
/user/hadoop/joblogs/$DATE/joblogs.txt || exit 1

rm /tmp/joblogs.txt

This script takes all of yesterday’s job logs, adds a JOBNAME=”<filename>” field to each line, puts everything in a single file, and uploads this file to the DFS. Once you got this set up, you can use Hadoop to analyse your job logs. Here’s an example in the form of a Dumbo program:

from dumbo import sumsreducer, statsreducer, statscombiner, main

class Mapper1:
    def __init__(self):
        from re import compile
        self.fieldselector = compile('([A-Z_]+)="([^"]*)"')
    def __call__(self, key, value):
        logtype = value.split(" ", 1)[0]
        if logtype.endswith("Attempt"):
                fields = dict(self.fieldselector.findall(value))
                jobname = fields["JOBNAME"]
                taskid = fields["TASK_ATTEMPT_ID"]
                if "START_TIME" in fields:
                    start = int(fields["START_TIME"])
                    yield (logtype, jobname, taskid), (start, 0)
                elif "FINISH_TIME" in fields and not "ERROR" in fields:
                    stop = int(fields["FINISH_TIME"])
                    yield (logtype, jobname, taskid), (0, stop)
            except KeyError:
                self.counters["Broken loglines"] += 1

def mapper2(key, value):
    if value[0] > 0 and value[1] > 0:
        yield key[:2], (value[1] - value[0]) / 1000.0

def runner(job):
    job.additer(Mapper1, sumsreducer, combiner=sumsreducer)
    job.additer(mapper2, statsreducer, statscombiner)

def starter(prog):
    date = prog.delopt("date")
    if date:
        prog.addopt("input", "/user/hadoop/joblogs/" + date)
        prog.addopt("name", "jobstats-" + date)

if __name__ == "__main__":
    main(runner, starter)

From the output of this program, you can easily generate a few charts that show you which jobs are slowest. We recently started playing with this at, mainly because such charts allows us to identify the jobs on which we should focus our optimization efforts.