Simple job logs analysis

March 4, 2009

Computing some simple statistics for the jobs that run on your Hadoop cluster can be very useful in practice. Data collection systems like Chuckwa probably allow you to do this, but if you don’t have hundreds of nodes, simply running the following shell script daily on your master might be all you need:

find /path/to/hadoop/logs/history/ -daystart -ctime 1 | \
grep -v 'xml$' | grep -v 'crc$' | while read FILE; do
NAME="`basename $FILE`"
sed 's/ *$//' $FILE | sed "s/\$/ JOBNAME=\"$NAME\"/"
done > /tmp/joblogs.txt

DATE="`date -d yesterday +"%Y/%m/%d"`"
/path/to/hadoop/bin/hadoop dfs -mkdir /user/hadoop/joblogs/$DATE
/path/to/hadoop/bin/hadoop dfs -put /tmp/joblogs.txt \
/user/hadoop/joblogs/$DATE/joblogs.txt || exit 1

rm /tmp/joblogs.txt

This script takes all of yesterday’s job logs, adds a JOBNAME=”<filename>” field to each line, puts everything in a single file, and uploads this file to the DFS. Once you got this set up, you can use Hadoop to analyse your job logs. Here’s an example in the form of a Dumbo program:

from dumbo import sumsreducer, statsreducer, statscombiner, main

class Mapper1:
    def __init__(self):
        from re import compile
        self.fieldselector = compile('([A-Z_]+)="([^"]*)"')
    def __call__(self, key, value):
        logtype = value.split(" ", 1)[0]
        if logtype.endswith("Attempt"):
            try:
                fields = dict(self.fieldselector.findall(value))
                jobname = fields["JOBNAME"]
                taskid = fields["TASK_ATTEMPT_ID"]
                if "START_TIME" in fields:
                    start = int(fields["START_TIME"])
                    yield (logtype, jobname, taskid), (start, 0)
                elif "FINISH_TIME" in fields and not "ERROR" in fields:
                    stop = int(fields["FINISH_TIME"])
                    yield (logtype, jobname, taskid), (0, stop)
            except KeyError:
                self.counters["Broken loglines"] += 1

def mapper2(key, value):
    if value[0] > 0 and value[1] > 0:
        yield key[:2], (value[1] - value[0]) / 1000.0

def runner(job):
    job.additer(Mapper1, sumsreducer, combiner=sumsreducer)
    job.additer(mapper2, statsreducer, statscombiner)

def starter(prog):
    date = prog.delopt("date")
    if date:
        prog.addopt("input", "/user/hadoop/joblogs/" + date)
        prog.addopt("name", "jobstats-" + date)

if __name__ == "__main__":
    main(runner, starter)

From the output of this program, you can easily generate a few charts that show you which jobs are slowest. We recently started playing with this at Last.fm, mainly because such charts allows us to identify the jobs on which we should focus our optimization efforts.


Indexing typed bytes

March 3, 2009

In order to be able to do something useful with the output of a Dumbo program, you often need to index it first. Suppose, for instance, that you ran a program that computes JIRA issue recommendations on lots and lots of issues, saving the output to jira/recs on the DFS. If the HADOOP-1722 patch has been applied to your Hadoop build, you can then dump this output to a fairly compact typed bytes file recs.tb as follows:

$ /path/to/hadoop/bin/hadoop jar \
/path/to/hadoop/build/contrib/streaming/*.jar dumptb jira/recs > recs.tb

Such a typed bytes file isn’t very convenient for doing lookups though, since you basically have to go through the whole thing to find the recommendations corresponding to a particular issue. Keeping it completely in memory as a hash table might be an option if you have tons of RAM, but indexing it and putting only the index in memory can do the trick as well.

Here’s a very simple, but nevertheless quite effective, way of generating an index for a typed bytes file:

import sys
import typedbytes

infile = open(sys.argv[1], "rb")
outfile = open(sys.argv[2], "wb")

input = typedbytes.PairedInput(infile)
output = typedbytes.PairedOutput(outfile)
pos = 0
for key, value in input:
    output.write((key, pos))
    pos = infile.tell()

infile.close()
outfile.close()

This Python program relies on the typedbytes module to read a given typed bytes file and generate a second one containing (key, position) pairs. More concretely, running it with the arguments recs.tb recs.tb.idx leads to an index file recs.tb.idx for recs.tb. If you put

file = open("recs.tb")
input = typedbytes.PairedInput(file)
index = dict(typedbytes.PairedInput(open("recs.tb.idx")))

in the initialization part of a Python program, you can then lookup the recommendations for a particular issuenr as follows:

file.seek(index[issuenr])
readnr, recs = input.read()
if readnr != issuenr:
    raise ValueError("corrupt index")

When dealing with huge output files, you might have to use a second-level index on (a sorted version of) the index obtained in this way, but in many cases this simple approach gets the job done just fine.


Mapper and reducer classes

February 26, 2009

As explained in the short tutorial, Dumbo provides the ability to use a class as mapper and/or reducer (and also as combiner, of course, but a combiner is really just a reducer with a slightly different purpose). Up until now, the main benefit of this was the possibility that it creates to use the constructor for initializations like, e.g., loading the contents of a file into memory. Version 0.20.27 introduces another reason to use classes instead of functions, however, as illustrated by the following extended version of the sampling program from my previous post:

class Mapper:
    def __init__(self):
        self.status = "Initialization started"
        from random import Random
        self.randgen = Random()
        self.cutoff = float(self.params["percentage"]) / 100
        self.samplesize = self.counters["Sample size"]
        self.status = "Initialization done"
    def __call__(self, key, value):
        if self.randgen.random() < self.cutoff:
            self.samplesize += 1
            yield key, value

if __name__ == "__main__":
    from dumbo import run
    run(Mapper)

The things to note in this code are the class variables params, counters, and status, which seem to come out of nowhere. From version 0.20.27 onwards, Dumbo will instantiate a dynamically generated class that inherits from both the class supplied by the programmer and dumbo.MapRedBase, resulting in the seemingly magical addition of the following fields:

  • params: A dictionary that contains the parameters specified using -param <key>=<value> options.
  • counters: You can think of this field as a defaultdict containing dumbo.Counter objects. When a given key has no corresponding counter yet, a new counter is created using the key as name for it. You can still change the name afterwards by assigning a different string to the counter’s name field, but by using a suitable key you can put out two candles with one blow.
  • status: Strings assigned to this field will show up as status message for the task in Hadoop’s web interface.

And as if this is not enough, you can now also use the

counter += amount

syntax to increment counters, instead of the less fancy (and harder to remember)

counter.incr(amount)

method call.


Random sampling

February 25, 2009

Simple random sampling is a technique often used in data analysis to (substantially) reduce the amount of data to be processed. In this post, I’ll take a stab at explaining how this can be done with Dumbo.

Consider the file commentcounts.txt, derived as follows from the comments.txt file that was generated as part of an earlier post:

$ sort comments.txt | uniq -c | \
sed 's/^[^0-9]*\([0-9]*\) \(.*\)*/\2\t\1/' > commentcounts.txt

which leads to lines of the form:

<Hadoop JIRA issue number>\t<comment author>\t<number of comments>

By means of the following Dumbo program, some statistics can be computed for each of the comment authors that occur in this file:

def mapper(key, value):
    issuenr, commenter, count = value.split("\t")
    yield commenter, int(count)

if __name__ == "__main__":
    from dumbo import run, statsreducer, statscombiner
    run(mapper, statsreducer, statscombiner)

The following three lines are part of the output obtained by running this program on commentcounts.txt:

Doug Cutting	1487    2.28984532616   2.94902633612   1       52
Owen O'Malley	1062    1.90301318267   1.87736300761   1       21
Tom White	268     2.26492537313   2.45723899975   1       24

Each of these lines ends with a number sequence consisting of the total number of issues commented on, followed by the mean, standard deviation, minimum, and maximum, respectively, of the number of comments per issue.

An easy way to take a random sample from commentcounts.txt is by means of the following Dumbo program:

from random import random

def mapper(key, value):
    if random() < 0.5: yield key, value

if __name__ == "__main__":
    from dumbo import run
    run(mapper)

Running the statistics program on the output of this sampling program gave me the following lines for Doug, Owen, and Tom:

Doug Cutting    760     2.22105263158   2.94351722074   1       52
Owen O'Malley   506     1.84584980237   1.69629699266   1       19
Tom White       131     2.23664122137   2.0296732376    1       11

A quick comparison of these lines with the ones above reveals that:

  • The number of issues about halved for each of the considered comment authors, which makes sense since Python’s random() function generates uniformly distributed floats between 0.0 (inclusive) and 1.0 (exclusive).
  • The means and standard deviations computed from the sample are quite close to the ones obtained from the complete dataset.
  • Two out of the three maximums computed from the sample aren’t very close to the real one. In Tom’s case it even isn’t close at all.

So, apparently, we can reliably compute means and standard deviations — but not maximums — from a sample, which shouldn’t come as a surprise to anyone who has some basic knowledge of statistics. Just like maximums, minimums can, in general, not be computed from samples by the way, even though this happens to work fine in the example above.

Even for means and standard deviations, some caution has to be taken when computing them from a random sample though. In my case, for instance, the mean computed from the sample is 4.0, whereas the real mean is 7.0:

Klaas Bosteels	4	7.0	6.0415229868	1	16
Klaas Bosteels	3	4.0	3.55902608401	1	9

The reason for this inaccuracy is that I only commented on four Hadoop issues so far. Instead of regarding all of this as taking one random sample from commentcounts.txt, it’s better to think of it as taking a random sample for each comment author mentioned in this file. The number of issues for each author then corresponds to the sample size, which is inversely proportional to the sampling error you can expect for many statistics. More precisely, the error will, on average, be inversely proportional to the square root of the sample size for many statistics, including the mean.

Now, you might wonder why random sampling is useful in a Hadoop context. After all, the point of Hadoop is to allow you to easily process very large datasets, and hence you usually shouldn’t have to revert to random sampling to compute the numbers you’re interested in. Nevertheless, random sampling does have some advantages though, the main one being that it can allow you to get a representative dataset that fits entirely into your desktop’s memory, which creates the possibility of using a convenient software package like Pylab, R, Matlab, or even Excel for your analysis. Furthermore, random sampling is often also a faster way of computing the statistics you’re after, especially when your Hadoop cluster is very busy, since the sampling doesn’t require any reduce task slots (which can be hard to get a hold of on a busy cluster).