Measuring our elephants’ appetite

April 17, 2009

We recently wanted to have a rough idea of how much data our Hadoop clusters process on a daily basis. Here’s the Dumbo program I used to obtain this information:

from datetime import date

class Mapper:
    def __init__(self):
        from re import compile
        self.numbers = []
        self.numbers.append(compile("HDFS bytes read:([0-9]+)"))
        self.numbers.append(compile("Local bytes read:([0-9]+)"))
        self.finish = compile('FINISH_TIME="([^"]+)"')
    def __call__(self, key, value):
        if value.startswith("Job") and "COUNTERS" in value:
            gb = 0  # gigabytes processed
            for number in self.numbers:
                mo =
                if mo: gb += int(round(float( / 2**30))
            ts = float( / 1000
            datetuple = date.fromtimestamp(ts).timetuple()[:3]
            yield datetuple, gb

if __name__ == "__main__":
    from dumbo import run, sumreducer
    run(Mapper, sumreducer, combiner=sumreducer)

Running this on the job logs for one of our clusters (which are gathered by the shell script discussed in this previous post) led to the following graph:

Bytes processed daily by one of our Hadoop clusters

This graph clearly shows why some of us get annoyed sometimes when they want to explore data on this cluster on certain days of the week or month…

Simple job logs analysis

March 4, 2009

Computing some simple statistics for the jobs that run on your Hadoop cluster can be very useful in practice. Data collection systems like Chuckwa probably allow you to do this, but if you don’t have hundreds of nodes, simply running the following shell script daily on your master might be all you need:

find /path/to/hadoop/logs/history/ -daystart -ctime 1 | \
grep -v 'xml$' | grep -v 'crc$' | while read FILE; do
NAME="`basename $FILE`"
sed 's/ *$//' $FILE | sed "s/\$/ JOBNAME=\"$NAME\"/"
done > /tmp/joblogs.txt

DATE="`date -d yesterday +"%Y/%m/%d"`"
/path/to/hadoop/bin/hadoop dfs -mkdir /user/hadoop/joblogs/$DATE
/path/to/hadoop/bin/hadoop dfs -put /tmp/joblogs.txt \
/user/hadoop/joblogs/$DATE/joblogs.txt || exit 1

rm /tmp/joblogs.txt

This script takes all of yesterday’s job logs, adds a JOBNAME=”<filename>” field to each line, puts everything in a single file, and uploads this file to the DFS. Once you got this set up, you can use Hadoop to analyse your job logs. Here’s an example in the form of a Dumbo program:

from dumbo import sumsreducer, statsreducer, statscombiner, main

class Mapper1:
    def __init__(self):
        from re import compile
        self.fieldselector = compile('([A-Z_]+)="([^"]*)"')
    def __call__(self, key, value):
        logtype = value.split(" ", 1)[0]
        if logtype.endswith("Attempt"):
                fields = dict(self.fieldselector.findall(value))
                jobname = fields["JOBNAME"]
                taskid = fields["TASK_ATTEMPT_ID"]
                if "START_TIME" in fields:
                    start = int(fields["START_TIME"])
                    yield (logtype, jobname, taskid), (start, 0)
                elif "FINISH_TIME" in fields and not "ERROR" in fields:
                    stop = int(fields["FINISH_TIME"])
                    yield (logtype, jobname, taskid), (0, stop)
            except KeyError:
                self.counters["Broken loglines"] += 1

def mapper2(key, value):
    if value[0] > 0 and value[1] > 0:
        yield key[:2], (value[1] - value[0]) / 1000.0

def runner(job):
    job.additer(Mapper1, sumsreducer, combiner=sumsreducer)
    job.additer(mapper2, statsreducer, statscombiner)

def starter(prog):
    date = prog.delopt("date")
    if date:
        prog.addopt("input", "/user/hadoop/joblogs/" + date)
        prog.addopt("name", "jobstats-" + date)

if __name__ == "__main__":
    main(runner, starter)

From the output of this program, you can easily generate a few charts that show you which jobs are slowest. We recently started playing with this at, mainly because such charts allows us to identify the jobs on which we should focus our optimization efforts.

Random sampling

February 25, 2009

Simple random sampling is a technique often used in data analysis to (substantially) reduce the amount of data to be processed. In this post, I’ll take a stab at explaining how this can be done with Dumbo.

Consider the file commentcounts.txt, derived as follows from the comments.txt file that was generated as part of an earlier post:

$ sort comments.txt | uniq -c | \
sed 's/^[^0-9]*\([0-9]*\) \(.*\)*/\2\t\1/' > commentcounts.txt

which leads to lines of the form:

<Hadoop JIRA issue number>\t<comment author>\t<number of comments>

By means of the following Dumbo program, some statistics can be computed for each of the comment authors that occur in this file:

def mapper(key, value):
    issuenr, commenter, count = value.split("\t")
    yield commenter, int(count)

if __name__ == "__main__":
    from dumbo import run, statsreducer, statscombiner
    run(mapper, statsreducer, statscombiner)

The following three lines are part of the output obtained by running this program on commentcounts.txt:

Doug Cutting	1487    2.28984532616   2.94902633612   1       52
Owen O'Malley	1062    1.90301318267   1.87736300761   1       21
Tom White	268     2.26492537313   2.45723899975   1       24

Each of these lines ends with a number sequence consisting of the total number of issues commented on, followed by the mean, standard deviation, minimum, and maximum, respectively, of the number of comments per issue.

An easy way to take a random sample from commentcounts.txt is by means of the following Dumbo program:

from random import random

def mapper(key, value):
    if random() < 0.5: yield key, value

if __name__ == "__main__":
    from dumbo import run

Running the statistics program on the output of this sampling program gave me the following lines for Doug, Owen, and Tom:

Doug Cutting    760     2.22105263158   2.94351722074   1       52
Owen O'Malley   506     1.84584980237   1.69629699266   1       19
Tom White       131     2.23664122137   2.0296732376    1       11

A quick comparison of these lines with the ones above reveals that:

  • The number of issues about halved for each of the considered comment authors, which makes sense since Python’s random() function generates uniformly distributed floats between 0.0 (inclusive) and 1.0 (exclusive).
  • The means and standard deviations computed from the sample are quite close to the ones obtained from the complete dataset.
  • Two out of the three maximums computed from the sample aren’t very close to the real one. In Tom’s case it even isn’t close at all.

So, apparently, we can reliably compute means and standard deviations — but not maximums — from a sample, which shouldn’t come as a surprise to anyone who has some basic knowledge of statistics. Just like maximums, minimums can, in general, not be computed from samples by the way, even though this happens to work fine in the example above.

Even for means and standard deviations, some caution has to be taken when computing them from a random sample though. In my case, for instance, the mean computed from the sample is 4.0, whereas the real mean is 7.0:

Klaas Bosteels	4	7.0	6.0415229868	1	16
Klaas Bosteels	3	4.0	3.55902608401	1	9

The reason for this inaccuracy is that I only commented on four Hadoop issues so far. Instead of regarding all of this as taking one random sample from commentcounts.txt, it’s better to think of it as taking a random sample for each comment author mentioned in this file. The number of issues for each author then corresponds to the sample size, which is inversely proportional to the sampling error you can expect for many statistics. More precisely, the error will, on average, be inversely proportional to the square root of the sample size for many statistics, including the mean.

Now, you might wonder why random sampling is useful in a Hadoop context. After all, the point of Hadoop is to allow you to easily process very large datasets, and hence you usually shouldn’t have to revert to random sampling to compute the numbers you’re interested in. Nevertheless, random sampling does have some advantages though, the main one being that it can allow you to get a representative dataset that fits entirely into your desktop’s memory, which creates the possibility of using a convenient software package like Pylab, R, Matlab, or even Excel for your analysis. Furthermore, random sampling is often also a faster way of computing the statistics you’re after, especially when your Hadoop cluster is very busy, since the sampling doesn’t require any reduce task slots (which can be hard to get a hold of on a busy cluster).