Today, we welcome Arthur Lessuise, a student in last year in Master in Computer Science at the Université Libre de Bruxelles (Belgium). He spent 6 weeks at Euranova R&D for its internship. He studied the ability to swap HDFS in Pig Latin by a NoSQL storage. This post is a summary of his amazing work. Enjoy!
What’s Pig Latin?
Pig Latin[1] is a data processing language that offers a tuple-based data model and relational-like operators. The language focuses on data-parallel problems of massive scale, which is why it is implemented on top of Hadoop[2], Apache’s MapReduce-like clustering platform.
MapReduce[3] (and Hadoop) provide a voluntarily restricted programming model, which enables the runtime to automatically parallelize the program on very large clusters, taking full care of issues like networking, failure handling, and so on. In this model, a program consists in a map function and a reduce function, written in an ordinary programming language (Java in Hadoop’s case). The map function will be called on each record in the input, producing as a result a set of intermediary key-value pairs. Then, the reduce function will be called on each distinct key, with the set of intermediary values corresponding to that key. Data is fetched from and stored to a distributed filesystem (DFS), usually hosted by the same cluster nodes as those doing the computation.
Pig Latin was developed at Yahoo[4], one of Hadoop’s largest contributor and earliest user, as an easier way to write MapReduce programs that perform basic data manipulations, while still allowing custom processing if needed. Here is a quick Pig Latin program for illustration purpose:
visits = LOAD ‘page_visits’ USING PigStorage(‘,’) AS (userid, webpage, visitdate, timespent) ;
recentvisits = FILTER visits BY visitdate >= 20100220 ;
pages = GROUP recentvisits BY webpage ;
avgtimespent = FOREACH pages GENERATE group, AVERAGE(recentvisits.timespent) ;
STORE avgtimespent INTO ‘output’ USING PigStorage(‘,’) ;
These SQL-like semantics make it easier for programmers to write queries; it may even be seen as an easier language than SQL altogether, because of the more exploded, more procedural-looking style and syntax.
What happens when data is not on HDFS?
One big usability drawback is that the data comes from the big, unwieldy distributed filesystem; however, such data is usually generated through more dynamic means, for example by a website. In this case, data would have to be incrementally collected from the website’s DB and warehoused into the DFS.
Furthermore, the output data resulting from Pig Latin jobs might need to be reinserted somewhere, like e.g. back into the website’s DB for presentation. This calls for an intermediary layer of glue that would have to be maintained along with the website-related and data-processing code, making the whole process somewhat clumsy (see fig. 1 on the right).
For my internship at Euranova, I’ve built a proof-of-concept implementation of a bridge that allows Pig Latin programs to make use of MongoDB[5] as a data source and/or sink. MongoDB being suitable as a website-backing database, such a bridge can be useful in reducing the gap between data-generation and data-processing, as outlined in the last paragraph (see fig. 2 on the right).
The resulting source code for this project is released under the 3-clauses BSD licences (see below for the souce code). So far it has the following features and issues:
Features:
- Exploits MongoDB’s sharding for parallel execution of Pig Latin queries
- Maps [a subset of] MongoDB’s data model to Pig Latin’s and back
Issues:
- Some parts of either data models don’t map together
- Proper failure handling is missing
- MongoDB-related problems arise with large shards
As a usage example, here follows code for the same example as before, adapted to use our MongoDB bridge, and MongoDB collections as data source and sink:
visits = LOAD ‘mongodb://mydbhost:27017/mydb/page_visits’ USING MongoLoader() ;
recentvisits = FILTER visits BY visitdate >= 20100220 ;
pages = GROUP recentvisits BY webpage ;
avgtimespent = FOREACH pages GENERATE group, AVERAGE(recentvisits.timespent) ;
STORE avgtimespent INTO ‘mongodb://mydbhost:27017/mydb/output’ USING MongoStorer() ;
Besides using our MongoLoader and MongoStorer in place of the PigStorage loader and storer, this code presents another difference with the previous one: the first line no longer specifies the schema (field names and datatypes) of the data. That’s because this schema can be inferred from incoming MongoDB objects, which our loader does.
Additional optimization opportunities permitted by Pig have been implemented; most notably, projection operations in Pig queries can be pushed up to MongoDB, and MongoDB indexes can be used to provide data in sorted order, enabling Pig to use efficient algorithms for joins and multi-table grouping. However, the current implementation of Pig makes the latter optimization awkward to implement and use, limiting its usability at the moment. This calls for some reseach, on Pig’s side, on how to make better use of sorted input data.
Source code
The source code of the project is available read-only at Euranova’s Subversion repository (svn://dev.euranova.eu/pigmongo).
Building from the command line
A Makefile for compiling the code and a README file with usage and dependency informations are provided with the source code. The Makefile can be used to generate a JAR package for deployment to any compatible Pig installation.
Using Eclipse IDE
These instructions will help you setup the Eclipse IDE for developing the project.
- Since one of the source files in the project is written in the Scala programming language, start by installing the Scala IDE for Eclipse using the update site provided on that page.
- Create a new Java project using the “new project” wizard
- In the first page, select “Create project from existing sources” and browse to the directory where you checked out the project
- In the project’s build configuration, ensure that the JARs in the lib/ subdirectory are all on the build path; also add the JARs of external dependencies:
- Hadoop 0.20.*
- Pig 0.8 (unreleased; check here for build instructions)
- The MongoDB Java driver
- Eclipse will create the project, but you will see compile errors due to Scala not being set up. To fix that, right click on the project in the Eclipse package explorer, and choose Configure -> Add Scala nature. Finally, refresh the project (e.g. right click, refresh); it will rebuild itself and all should be well.
Structure overview
The project is divided between two packages:
- the package implementing Hadoop-related classes (eu.euranova.pigmongo.hadoop), and
- the one implementing Pig-related classes (eu.euranova.pigmongo.pig).
Here is a simplified class diagram of the project:
References
[1] Apache Pig. http://pig.apache.org/
[2] Apache Hadoop. http://hadoop.apache.org/
[3] Dean, J. and Ghemawat, S., MapReduce: simplified data processing on large clusters, 2008
[4] Yahoo Research. http://research.yahoo.com/
[5] MongoDB. http://www.mongodb.org/