Consuming Dumbo output with Pig

February 5, 2010

Although it abstracts and simplifies it all quite a bit, Dumbo still forces you to think in MapReduce, which might not be ideal if you want to implement complex data flows in a limited amount of time. Personally, I think that Dumbo still occupies a useful space within the Hadoop ecosystem, but in some cases it makes sense to work at an even higher level and use something like Pig or Hive. In fact, sometimes it makes sense to combine the two and do some parts of your data flow in Dumbo and others in Pig. To make this possible, I recently wrote a Pig loader function for sequence files that contain TypedBytesWritables, which is the file format Dumbo uses by default to store all its output on Hadoop. Here’s an example of a Pig script that reads Dumbo output:

register pigtail.jar;  -- http://github.com/klbostee/pigtail

a = load '/hdfs/path/to/dumbo/output'
    using fm.last.pigtail.storage.TypedBytesSequenceFileLoader()
    as (artist:int, val:(listeners:int, listens:int));
b = foreach a generate artist, val.listeners as listeners;
c = order b by listeners;
d = limit c 100;

dump d;

You basically just have to specify names and types for the components of the key/value pairs and you’re good to go.

A possibly useful side-effect of writing this loader is the ability it creates of reading all sorts of file formats with Pig. Everything that Dumbo can read can also be consumed by Pig scripts now, all you have to do is write a simple Dumbo script that converts it to typed bytes sequence files:

from dumbo import run
from dumbo.lib import identitymapper

if __name__ == "__main__":
    run(identitymapper)

The proper solution is of course to write custom Pig loaders, but this gets the job done too and doesn’t slow things down that much.


Analysing Apache logs

June 18, 2009

The Cloudera guys blogged about using Pig for examining Apache logs yesterday. Although it nicely illustrates several lesser-known Pig features, I’m not overly impressed with the described program to be honest. Having to revert to three different scripting languages to do some GeoIP lookups just complicates things too much if you ask me. Personally, I’d much prefer writing something like:

class Mapper:
    def __init__(self):
        from re import compile
        self.regex = compile(r'(?P<ip>[\d\.\-]+) (?P<id>[\w\-]+) ' \
                             r'(?P<user>[\w\-]+) \[(?P<time>[^\]]+)\] ' \
                             r'"(?P<request>[^"]+)" (?P<status>[\d\-]+) ' \
                             r'(?P<bytes>[\d\-]+) "(?P<referer>[^"]+)" ' \
                             r'"(?P<agent>[^"]+)"')
        from pygeoip import GeoIP, MEMORY_CACHE
        self.geoip = GeoIP(self.params["geodata"], flags=MEMORY_CACHE)
    def __call__(self, key, value):
        mo = self.regex.match(value)
        if mo:
            request, bytes = mo.group("request"), mo.group("bytes")
            if request.startswith("GET") and bytes != "-":
                rec = self.geoip.record_by_addr(mo.group("ip"))
                country = rec["country_code"] if rec else "-"
                yield country, (1, int(bytes))

if __name__ == "__main__":
    from dumbo import run, sumsreducer
    run(Mapper, sumsreducer, combiner=sumsreducer)

After installing Python 2.6, I tested this hits_by_country.py program on my chrooted Cloudera-flavored Hadoop server as follows:

$ wget http://pygeoip.googlecode.com/files/pygeoip-0.1.1-py2.6.egg
$ wget http://bit.ly/geolitecity
$ wget http://bit.ly/randomapachelog  # found via Google
$ dumbo put access.log access.log -hadoop /usr/lib/hadoop
$ dumbo start hits_by_country.py -hadoop /usr/lib/hadoop \
-input access.log -output hits_by_country \
-python python2.6 -libegg pygeoip-0.1.1-py2.6.egg \
-file GeoLiteCity.dat -param geodata=GeoLiteCity.dat
$ dumbo cat hits_by_country/part-00000 -hadoop /usr/lib/hadoop/ | \
sort -k2,2nr | head -n 5
US      9400    388083137
KR      6714    2655270
DE      1859    32131992
RU      1838    44073038
CA      1055    23035208

At Last.fm, we use the GeoIP Python bindings instead of the pure-Python pygeoip module, which is nearly identical API-wise but might be a bit slower. Also, we abstract away the format of our Apache logs by using a parser class and we have some library code for identifying hits from robots as well, much like the IsBotUA() method in the Pig example.