Thursday, August 29, 2013

MongoDB-Hadoop integration and Data processing with Apache Pig. Running pig script with Node.js


I have a weather data in MongoDB database, As a experiment weather data  porting into hadoop environment and processing using Pig.

By using mongodb-hadoop connector, from Pig one can able to read from / write /to mongoDB. once output written into MongoDB from browser user can able to view the result data.

I've created simple web application with MongoDB and Node.js using Express framework. also executing Pig script with in Node.



trades collection data

>  use week6                                      
> db.trades.findOne()
{
 "_id" : ObjectId("51b05fbce3600f7b48448eda"),
 "ticker" : "abcd",
 "time" : ISODate("2012-03-03T04:24:13.003Z"),
 "price" : 110,
 "shares" : 200,
 "ticket" : "z135" // this key used for group in pig script
}



Pig script


root@boss[bin]#vi cntWthr.pig

.....-- MongoDB Java driver
REGISTER  /opt/hadoop-1.1.0/lib/mongo-2.10.1.jar;
-- Core Mongo-Hadoop Library
REGISTER /opt/bigdata/mongo-hadoop/core/target/mongo-hadoop-core_1.1.2-1.1.0.jar;
-- mongo-hadoop pig support
REGISTER /opt/bigdata/mongo-hadoop/pig/target/mongo-hadoop-pig_1.1.2-1.1.0.jar;

trades = LOAD 'mongodb://localhost:27017/week6.trades' using com.mongodb.hadoop.pig.MongoLoader; 
grp = GROUP trades by $0#'ticket';
cnt = FOREACH grp GENERATE group,COUNT(trades);
--dump cnt;
STORE cnt INTO 'mongodb://localhost:27017/mongo_hadoop.yield_historical.outt' USING com.mongodb.hadoop.pig.MongoInsertStorage('group:float,cnt:int', 'group');


in above script

  • LOAD the MongoDB data from week6 database and trades collection.
  • GROUP the loaded data based on the key ticket
  • GENERATE the COUNT for each ticket 
  • Instead of display the result in console or store into hdfs, here I'm STORE back to MongoDB  mongo_hadoop and collection yield_historical.
Node.js 

root@boss[bin]#vi mongoHadoopNode.js


// This script must be run from PIG_HOME/bin and file cntWthr.pig must be exist in same path.
var express = require('express'),     
    app = express(),
    cons = require('consolidate'), 
    mongoClient = require('mongodb').MongoClient,
    Server  = require('mongodb').Server;
// Configuring view template
app.engine('html',cons.swig);
app.set('view engine','html');
app.set('views', __dirname + "/views");
// Running pig script file var spawn = require('child_process').spawn, runPig = spawn('pig',['cntWthr.pig']); // Handling the output runPig.stdout.on('data',function(data){ console.log('stdout : '+data); }); runPig.stderr.on('data',function(data){ console.log('stderr : '+data ); }); app.get('/',function(req,res){ mongoClient.connect('mongodb://localhost:27017/mongo_hadoop',function(err,db) { if(err) throw err; db.collection('yield_historical.outt').findOne({},function(err,doc){ res.render('template',{'Group':doc.group,'Value':doc.val_0}); db.close(); });; }); }); app.get('*',function(req,res){ res.send("Page Not Found !!!! "); }); mongo.open(function(err,mongo){ if(err) throw err; app.listen(8000); console.log("Express server started successfully localhost:8000") });

simple Html page ( view/template.html )

MongoDB-Hadoop integraton and Job processing with PIG 
Group : {{Group}}  Value : {{Value}}

Run node.js 
root@boss[bin]#node mongoHadoopNode.js
Express server started successfully localhost:8000

MongoDB new collection created  under mongo_hadoop database
>  use mongo_hadoop
> db.yield_historical.outt.findOne()
{ "_id" : ObjectId("521f0ebf908dfe1853af7c01"), "group" : "z135", "val_0" : NumberLong(1667) }

enter the IP in browser, which will show the output like
http://localhost:8000

MongoDB-Hadoop integraton and Job processing with PIG
Group : z447 
Value : 834


Post a Comment