Consuming Dumbo output with Pig

February 5, 2010

Although it abstracts and simplifies it all quite a bit, Dumbo still forces you to think in MapReduce, which might not be ideal if you want to implement complex data flows in a limited amount of time. Personally, I think that Dumbo still occupies a useful space within the Hadoop ecosystem, but in some cases it makes sense to work at an even higher level and use something like Pig or Hive. In fact, sometimes it makes sense to combine the two and do some parts of your data flow in Dumbo and others in Pig. To make this possible, I recently wrote a Pig loader function for sequence files that contain TypedBytesWritables, which is the file format Dumbo uses by default to store all its output on Hadoop. Here’s an example of a Pig script that reads Dumbo output:

register pigtail.jar;  -- http://github.com/klbostee/pigtail

a = load '/hdfs/path/to/dumbo/output'
    using fm.last.pigtail.storage.TypedBytesSequenceFileLoader()
    as (artist:int, val:(listeners:int, listens:int));
b = foreach a generate artist, val.listeners as listeners;
c = order b by listeners;
d = limit c 100;

dump d;

You basically just have to specify names and types for the components of the key/value pairs and you’re good to go.

A possibly useful side-effect of writing this loader is the ability it creates of reading all sorts of file formats with Pig. Everything that Dumbo can read can also be consumed by Pig scripts now, all you have to do is write a simple Dumbo script that converts it to typed bytes sequence files:

from dumbo import run
from dumbo.lib import identitymapper

if __name__ == "__main__":
    run(identitymapper)

The proper solution is of course to write custom Pig loaders, but this gets the job done too and doesn’t slow things down that much.


Fast Python module for typed bytes

April 13, 2009

Over the past few days, I spent some time implementing a typed bytes Python module in C. It’s probably not quite ready for production use yet, and it still falls back to the pure python module for floats, but it seems to work fine and already leads to substantial speedups.

For example, the Python program

from typedbytes import Output
Output(open("test.tb", "wb")).writes(xrange(10**7))

needs 18.8 secs to finish on this laptop, whereas it requires only 0.9 secs after replacing typedbytes with ctypedbytes. Similarly, the running time for

from typedbytes import Input
for item in Input(open("test.tb", "rb")).reads(): pass

can be reduced from 22.9 to merely 1.7 secs by using ctypedbytes instead of typedbytes.

Obviously, Dumbo programs can benefit from this faster typed bytes module as well, but the gains probably won’t be as spectacular as for the simple test programs above. To give it a go, make sure you’re using the latest version of Dumbo, build an egg for the ctypedbytes module, and add the following option to your start command:

-libegg <path to ctypedbytes egg>

From what I’ve seen so far, this can speed up Dumbo programs by 30%, which definitely makes it worth the effort if you ask me. In fact, the Dumbo program would now probably beat the Java program in the benchmark discussed here, but, unfortunately, this wouldn’t be a very fair comparison. Johan recently made me aware of the fact that it’s better to avoid Java’s split() method for strings when you don’t need regular expression support, and using a combination of substring() and indexOf() instead seems to make the Java program about 40% faster. So we’re not quite as fast as Java yet, but at least the gap got narrowed down some more.


Indexing typed bytes

March 3, 2009

In order to be able to do something useful with the output of a Dumbo program, you often need to index it first. Suppose, for instance, that you ran a program that computes JIRA issue recommendations on lots and lots of issues, saving the output to jira/recs on the DFS. If the HADOOP-1722 patch has been applied to your Hadoop build, you can then dump this output to a fairly compact typed bytes file recs.tb as follows:

$ /path/to/hadoop/bin/hadoop jar \
/path/to/hadoop/build/contrib/streaming/*.jar dumptb jira/recs > recs.tb

Such a typed bytes file isn’t very convenient for doing lookups though, since you basically have to go through the whole thing to find the recommendations corresponding to a particular issue. Keeping it completely in memory as a hash table might be an option if you have tons of RAM, but indexing it and putting only the index in memory can do the trick as well.

Here’s a very simple, but nevertheless quite effective, way of generating an index for a typed bytes file:

import sys
import typedbytes

infile = open(sys.argv[1], "rb")
outfile = open(sys.argv[2], "wb")

input = typedbytes.PairedInput(infile)
output = typedbytes.PairedOutput(outfile)
pos = 0
for key, value in input:
    output.write((key, pos))
    pos = infile.tell()

infile.close()
outfile.close()

This Python program relies on the typedbytes module to read a given typed bytes file and generate a second one containing (key, position) pairs. More concretely, running it with the arguments recs.tb recs.tb.idx leads to an index file recs.tb.idx for recs.tb. If you put

file = open("recs.tb")
input = typedbytes.PairedInput(file)
index = dict(typedbytes.PairedInput(open("recs.tb.idx")))

in the initialization part of a Python program, you can then lookup the recommendations for a particular issuenr as follows:

file.seek(index[issuenr])
readnr, recs = input.read()
if readnr != issuenr:
    raise ValueError("corrupt index")

When dealing with huge output files, you might have to use a second-level index on (a sorted version of) the index obtained in this way, but in many cases this simple approach gets the job done just fine.


HADOOP-1722 and typed bytes

February 24, 2009

The Hadoop guys recently accepted a patch that adds support for binary communication formats to Streaming, the Hadoop component on which Dumbo is based. As described in the comments for the corresponding JIRA issue (HADOOP-1722), this patch basically abstracts away the used communication format by introducing the classes InputWriter and OutputReader. It provides extensions of these classes that implement three different communication formats, including the so called typed bytes format, which happens to be a great alternative to the functionality provided by the Java code that is currently part of Dumbo.

A while ago, Johan and I ran four versions of the IP count program on about 300 gigs of weblogs, which resulted in the following timings:

  1. Java: 8 minutes
  2. Dumbo with typed bytes: 10 minutes
  3. Hive: 13 minutes
  4. Dumbo without typed bytes: 16 minutes

Hence, thanks to typed bytes, Java is only 20% faster than Dumbo anymore, and there might still be some room for narrowing this gap even further. For instance, I wouldn’t be surprised if another minute could be chopped off by implementing the typed bytes serialization at the Python side in C instead of pure Python, just like the Thrift guys did with their fastbinary Python module. Since typed bytes are also a cleaner and more maintainable solution, the Java code will probably be removed in Dumbo 0.21, making typed bytes the only supported approach.

Unfortunately, all Hadoop releases before 0.21.0 will not yet include typed bytes support, but I do plan to attach backported patches to HADOOP-1722. The patch for 0.18 is already there, and backports for more recent versions of Hadoop will follow as soon as we start using those versions at Last.fm. In order to use typed bytes with Dumbo 0.20, you just have to make sure the typedbytes module is installed on the machine from which you run your Dumbo programs, add typedbytes: all and type: typedbytes to the sections [streaming] and [cat], respectively, of your /etc/dumbo.conf or ~/.dumborc file, and apply the HADOOP-1722 patch to your Hadoop build:

$ cd /path/to/hadoop-0.18
$ wget 'https://issues.apache.org/jira/secure/attachment/'\
'12400166/HADOOP-1722-branch-0.18.patch'
$ patch -p0 < HADOOP-1722-branch-0.18.patch
$ ant package

For now, Dumbo will still look for its jar though, so you cannot get rid of src/contrib/dumbo in your Hadoop directory just yet.