I have a weather data in MongoDB database, As a experiment weather data porting into hadoop environment and processing using Pig.
By using mongodb-hadoop connector, from Pig one can able to read from / write /to mongoDB. once output written into MongoDB from browser user can able to view the result data.
I've created simple web application with MongoDB and Node.js using Express framework. also executing Pig script with in Node.
trades collection data
> use week6
> db.trades.findOne()
{
"_id" : ObjectId("51b05fbce3600f7b48448eda"),
"ticker" : "abcd",
"time" : ISODate("2012-03-03T04:24:13.003Z"),
"price" : 110,
"shares" : 200,
"ticket" : "z135" // this key used for group in pig script
}
Pig script
root@boss[bin]#vi cntWthr.pig
.....-- MongoDB Java driver
REGISTER /opt/hadoop-1.1.0/lib/mongo-2.10.1.jar;
-- Core Mongo-Hadoop Library
REGISTER /opt/bigdata/mongo-hadoop/core/target/mongo-hadoop-core_1.1.2-1.1.0.jar;
-- mongo-hadoop pig support
REGISTER /opt/bigdata/mongo-hadoop/pig/target/mongo-hadoop-pig_1.1.2-1.1.0.jar;
trades = LOAD 'mongodb://localhost:27017/week6.trades' using com.mongodb.hadoop.pig.MongoLoader;
grp = GROUP trades by $0#'ticket';
cnt = FOREACH grp GENERATE group,COUNT(trades);
--dump cnt;
STORE cnt INTO 'mongodb://localhost:27017/mongo_hadoop.yield_historical.outt' USING com.mongodb.hadoop.pig.MongoInsertStorage('group:float,cnt:int', 'group');
in above script
- LOAD the MongoDB data from week6 database and trades collection.
- GROUP the loaded data based on the key ticket
- GENERATE the COUNT for each ticket
- Instead of display the result in console or store into hdfs, here I'm STORE back to MongoDB mongo_hadoop and collection yield_historical.
Node.js
root@boss[bin]#vi mongoHadoopNode.js
// This script must be run from PIG_HOME/bin and file cntWthr.pig must be exist in same path.
var express = require('express'),
app = express(),
cons = require('consolidate'),
mongoClient = require('mongodb').MongoClient,
Server = require('mongodb').Server;// Configuring view template
app.engine('html',cons.swig);
app.set('view engine','html');
app.set('views', __dirname + "/views");
// Running pig script file
var spawn = require('child_process').spawn,
runPig = spawn('pig',['cntWthr.pig']);
// Handling the output
runPig.stdout.on('data',function(data){
console.log('stdout : '+data);
});
runPig.stderr.on('data',function(data){
console.log('stderr : '+data );
});
app.get('/',function(req,res){
mongoClient.connect('mongodb://localhost:27017/mongo_hadoop',function(err,db) {
if(err) throw err;
db.collection('yield_historical.outt').findOne({},function(err,doc){
res.render('template',{'Group':doc.group,'Value':doc.val_0});
db.close();
});;
});
});
app.get('*',function(req,res){
res.send("Page Not Found !!!! ");
});
mongo.open(function(err,mongo){
if(err) throw err;
app.listen(8000);
console.log("Express server started successfully localhost:8000")
});
simple Html page ( view/template.html )
MongoDB-Hadoop integraton and Job processing with PIG
Group : {{Group}} Value : {{Value}}
Run node.js
root@boss[bin]#node mongoHadoopNode.js
Express server started successfully localhost:8000
MongoDB new collection created under mongo_hadoop database
> use mongo_hadoop
> db.yield_historical.outt.findOne()
{
"_id" : ObjectId("521f0ebf908dfe1853af7c01"),
"group" : "z135",
"val_0" : NumberLong(1667)
}
enter the IP in browser, which will show the output like
MongoDB-Hadoop integraton and Job processing with PIG
Group : z447
Value : 834