From 588e66e5aeac10442d9d7f9b4624bf7d866dbd1c Mon Sep 17 00:00:00 2001 From: Mircea Danila Dumitrescu Date: Fri, 16 May 2014 14:23:36 +0100 Subject: [PATCH] Fixed issue with wrongly ignored folder --- .gitignore | 1 - lib/analytics/aggregation.js | 8 + lib/analytics/aggregator.js | 476 +++++++++++++++++++++++++++++++++ lib/analytics/collector.js | 44 +++ lib/analytics/database.js | 43 +++ lib/analytics/disperser.js | 86 ++++++ lib/analytics/endpoint.js | 25 ++ lib/analytics/event.js | 96 +++++++ lib/analytics/genericGetter.js | 196 ++++++++++++++ lib/analytics/index.js | 5 + lib/analytics/myutils.js | 18 ++ lib/analytics/server.js | 166 ++++++++++++ 12 files changed, 1163 insertions(+), 1 deletion(-) create mode 100644 lib/analytics/aggregation.js create mode 100644 lib/analytics/aggregator.js create mode 100644 lib/analytics/collector.js create mode 100644 lib/analytics/database.js create mode 100644 lib/analytics/disperser.js create mode 100644 lib/analytics/endpoint.js create mode 100644 lib/analytics/event.js create mode 100644 lib/analytics/genericGetter.js create mode 100644 lib/analytics/index.js create mode 100644 lib/analytics/myutils.js create mode 100644 lib/analytics/server.js diff --git a/.gitignore b/.gitignore index fc87a58..18ae288 100644 --- a/.gitignore +++ b/.gitignore @@ -3,7 +3,6 @@ .jshintrc atlassian-ide-plugin.xml bower_components -lib node_modules tmp public/lib diff --git a/lib/analytics/aggregation.js b/lib/analytics/aggregation.js new file mode 100644 index 0000000..6805440 --- /dev/null +++ b/lib/analytics/aggregation.js @@ -0,0 +1,8 @@ +"use strict"; +var mongodb = require("mongodb"), + genericGetter = require("./genericGetter.js"); + +exports.getterInit = function (aggregationDB) { + return genericGetter(aggregationDB); +}; + diff --git a/lib/analytics/aggregator.js b/lib/analytics/aggregator.js new file mode 100644 index 0000000..ecf01a7 --- /dev/null +++ b/lib/analytics/aggregator.js @@ -0,0 +1,476 @@ +"use strict"; +/* + * Aggregations have fixed times of execution: 1m, 5m and 1h + */ +var util = require('util'), + myutils = require("./myutils.js"), + mongodb = require("mongodb"), + mongoConfigs = require("../../bin/databases-config"), + lock = {}, + myLog = function (param1, param2) { + console.log(new Date() + " Agg: " + param1 + " :: " + param2); + }; +//noinspection JSLint +exports.register = function (dbs, endpoints, options) { + var eventsDB = dbs.events, + aggregationsDB = dbs.aggregations, + save = function (aggregationName, documentsArray, interval) { + var i; + for (i = 0; i < documentsArray.length; i++) { + if (documentsArray[i] === undefined || documentsArray[i].t === undefined) { + myLog(aggregationName, util.inspect(documentsArray)); + throw "Failed to get t"; + } + documentsArray[i]._id = new mongodb.ObjectID(documentsArray[i].t / 1000); + } + //noinspection JSLint + aggregationsDB.collection(aggregationName + "_" + interval).insert(documentsArray, {w: 1}, function (error, result) { + if (error) { + throw error; + } + myLog(aggregationName, "Inserted " + documentsArray.length + " documents for the " + interval + " interval"); + } + ); + }, + executeAggregationCore1h = function (aggregation, aggregationName, lastPossibleAggregatableElementTimestamp) { + var reaggregation, + aggregationGroup, + t0_1h, + t1_1h, + i, + determineTimestamps = function (callback) { + //try to continue from where we left off + aggregationsDB.collection(aggregationName + "_1h").find({}, {}, {sort: {_id: -1}, limit: 1}).toArray(function (error, result) { + if (error) { + throw error; + } + + if (result === null || result.length === 0) { + //no previous aggregations were found. look for the first event of the aggregation type and start from there + aggregationsDB.collection(aggregationName + "_5m").find({}, {}, {sort: {_id: 1}, limit: 1}).toArray(function (error, result) { + if (error) { + throw error; + } + + if (!Array.isArray(result) || result.length === 0) { + myLog(aggregationName, "There are no 5m records, so we cannot aggregate for 1 hour."); + t0_1h = undefined; + } else { + t0_1h = result[0].t; + t0_1h = t0_1h - t0_1h % 3600000; + } + + if (t0_1h === undefined) { + lock[aggregationName].inProgress = false; + } else { + t1_1h = t0_1h + 3600000; + callback(aggregation, t0_1h, t1_1h); + } + }); + } else { + // the t field represents the beginning of the time slot + // start aggregating from the next slot + t0_1h = result[0].t + 3600000; + t1_1h = t0_1h + 3600000; + callback(aggregation, t0_1h, t1_1h); + } + }); + }, + prepareReaggregation = function (aggregation) { + var aggGrKey; + for (i = 0; i < aggregation.length; i++) { + if (aggregation[i].$group !== undefined) { + aggregationGroup = JSON.parse(JSON.stringify(aggregation[i].$group)); + break; + } + } + if (aggregationGroup === undefined) { + throw "Could not find $group statement for aggregation " + aggregationName; + } + aggregationGroup._id = "$key"; + + //myLog(aggregationName,"Replacing $group params with new ones for reaggregating ..."); + //we can handle $sum,$avg,$min,$max:x - where x is not the same field or is a constant + for (aggGrKey in aggregationGroup) { + if (aggregationGroup.hasOwnProperty(aggGrKey) && aggGrKey !== "_id") { + if (aggregationGroup[aggGrKey].$sum !== undefined) { + aggregationGroup[aggGrKey].$sum = "$" + aggGrKey; + } else if (aggregationGroup[aggGrKey].$avg !== undefined) { + delete aggregationGroup[aggGrKey].$avg; + aggregationGroup[aggGrKey].$sum = "$" + aggGrKey + "/12"; + } else if (aggregationGroup[aggGrKey].$min !== undefined) { + aggregationGroup[aggGrKey].$min = "$" + aggGrKey; + } else if (aggregationGroup[aggGrKey].$max !== undefined) { + aggregationGroup[aggGrKey].$max = "$" + aggGrKey; + } else { + throw "Unrecognised keyword. We only accept $min, $max, $sum and $avg"; + } + } + } + + reaggregation = [ + {"$match": {_id: {$gte: myutils.objectIdFromDate(t0_1h), $lt: myutils.objectIdFromDate(t0_1h + 3600000)}}}, + {"$group": aggregationGroup} + ]; + //myLog(aggregationName,"Here is how the new aggregation for 1 hour looks like: " + util.inspect(reaggregation, {color: true, depth: null})); + }, + core = function (aggregation, t0_1h, t1_1h) { + if (t1_1h <= lastPossibleAggregatableElementTimestamp) { + myLog(aggregationName, "Starting 1 hour aggregation from time: " + new Date(t0_1h)); + prepareReaggregation(aggregation); + aggregationsDB.collection(aggregationName + "_5m").aggregate(reaggregation, {}, function (error, result) { + if (error) { + throw error; + } + if (result.length > 0) { + myLog(aggregationName, "Finished aggregating documents from the 5 minute aggregation into the 1 hour aggregation ... now inserting."); + + //put _id in key + for (i = 0; i < result.length; i++) { + if (result[i]._id !== undefined && result[i]._id !== null) { + result[i].key = result[i]._id; + result[i].t = t0_1h; + delete result[i]._id; + } + else { + result.splice(i, 1); + i--; + } + } + if (result.length > 0) { + save(aggregationName, result, "1h"); + } + else { + save(aggregationName, new Array({t: t0_1h}), "1h"); + } + } + else { + myLog(aggregationName, "Still inserting an empty entry with t :" + t0_1h + " so we can keep track of what ran when"); + save(aggregationName, new Array({t: t0_1h}), "1h"); + } + lock[aggregationName].inProgress = false; + + }); + + } else { + lock[aggregationName].inProgress = false; + } + + }; + determineTimestamps(core); + }, + + executeAggregationCore5m = function (aggregation, aggregationName, lastPossibleAggregatableElementTimestamp) { + var reaggregation, + aggregationGroup, + t0_5m, + t1_5m, + i, + determineTimestamps = function (callback) { + //try to continue from where we left off + aggregationsDB.collection(aggregationName + "_5m").find({}, {}, {sort: {_id: -1}, limit: 1}).toArray(function (error, result) { + if (error) { + throw error; + } + + if (result === null || result.length === 0) { + //no previous aggregations were found. look for the first event of the aggregation type and start from there + aggregationsDB.collection(aggregationName + "_1m").find({}, {}, {sort: {_id: 1}, limit: 1}).toArray(function (error, result) { + if (error) { + throw error; + } + + if (!Array.isArray(result) || result.length === 0) { + myLog(aggregationName, "There are no 1m records, so we cannot aggregate for 5 minutes."); + t0_5m = undefined; + } else { + t0_5m = result[0].t; + t0_5m = t0_5m - t0_5m % 300000; + } + + if (t0_5m === undefined) { + lock[aggregationName].inProgress = false; + } else { + t1_5m = t0_5m + 300000; + callback(aggregation, t0_5m, t1_5m); + } + }); + } else { + // the t field represents the beginning of the time slot + // start aggregating from the next slot + t0_5m = result[0].t + 300000; + t1_5m = t0_5m + 300000; + callback(aggregation, t0_5m, t1_5m); + } + }); + }, + prepareReaggregation = function (aggregation) { + var aggGrKey; + for (i = 0; i < aggregation.length; i++) { + if (aggregation[i].$group !== undefined) { + aggregationGroup = JSON.parse(JSON.stringify(aggregation[i].$group)); + break; + } + } + if (aggregationGroup === undefined) { + throw "Could not find $group statement for aggregation " + aggregationName; + } + aggregationGroup._id = "$key"; + + //myLog(aggregationName, "Replacing $group params with new ones for reaggregating"); + //we can handle $sum,$avg,$min,$max:x - where x is not the same field or is a constant + for (aggGrKey in aggregationGroup) { + if (aggregationGroup.hasOwnProperty(aggGrKey) && aggGrKey !== "_id") { + if (aggregationGroup[aggGrKey].$sum !== undefined) { + aggregationGroup[aggGrKey].$sum = "$" + aggGrKey; + } else if (aggregationGroup[aggGrKey].$avg !== undefined) { + delete aggregationGroup[aggGrKey].$avg; + aggregationGroup[aggGrKey].$sum = "$" + aggGrKey + "/5"; + } else if (aggregationGroup[aggGrKey].$min !== undefined) { + aggregationGroup[aggGrKey].$min = "$" + aggGrKey; + } else if (aggregationGroup[aggGrKey].$max !== undefined) { + aggregationGroup[aggGrKey].$max = "$" + aggGrKey; + } else { + throw "Unrecognised keyword. We only accept $min, $max, $sum and $avg"; + } + } + } + + reaggregation = [ + {"$match": {_id: {$gte: myutils.objectIdFromDate(t0_5m), $lt: myutils.objectIdFromDate(t0_5m + 300000)}}}, + {"$group": aggregationGroup} + ]; + //myLog(aggregationName,"Here is how the new aggregation for 5 minutes looks like: " + util.inspect(reaggregation, {color: true, depth: null})); + }, + core = function (aggregation, t0_5m, t1_5m) { + if (t1_5m <= lastPossibleAggregatableElementTimestamp) { + myLog(aggregationName, "Starting 5 minute aggregation from time: " + new Date(t0_5m)); + prepareReaggregation(aggregation); + aggregationsDB.collection(aggregationName + "_1m").aggregate(reaggregation, {}, function (error, result) { + if (error) { + throw error; + } + if (result.length > 0) { + myLog(aggregationName, "Finished aggregating documents from the 1 minute aggregation into the 5 minutes aggregation ... now inserting."); + + //put _id in key + for (i = 0; i < result.length; i++) { + if (result[i]._id !== undefined && result[i]._id !== null) { + result[i].key = result[i]._id; + result[i].t = t0_5m; + delete result[i]._id; + } + else { + result.splice(i, 1); + i--; + } + } + if (result.length > 0) { + save(aggregationName, result, "5m"); + } + else { + save(aggregationName, new Array({t: t0_5m}), "5m"); + } + } + else { + myLog(aggregationName, "Still inserting an empty entry with t :" + t0_5m + " so we can keep track of what ran when"); + save(aggregationName, new Array({t: t0_5m}), "5m"); + } + executeAggregationCore1h(aggregation, aggregationName, t0_5m); + }); + } else { + lock[aggregationName].inProgress = false; + } + + }; + determineTimestamps(core); + }, + + executeAggregationCore1m = function (aggregation, aggregationName, lastPossibleAggregatableElementTimestamp) { + var t0_1m, + i, + t1_1m, + determineTimestamps = function (callback) { + //try to continue from where we left off + aggregationsDB.collection(aggregationName + "_1m").find({}, {}, {sort: {_id: -1}, limit: 1}).toArray(function (error, result) { + if (error) { + throw error; + } + + if (result === null || result.length === 0) { + //no previous aggregations were found. look for the first event of the aggregation type and start from there + eventsDB.collection("events").find({type: aggregation[0].$match.type}, {}, {sort: {_id: 1}, limit: 1}).toArray(function (error, result) { + if (error) { + throw error; + } + + if (!Array.isArray(result) || result.length === 0) { + myLog(aggregationName, "There are no events of type " + aggregation[0].$match.type + ", so we cannot aggregate."); + t0_1m = undefined; + } else { + t0_1m = result[0].t; + t0_1m = t0_1m - t0_1m % 60000; + } + + if (t0_1m === undefined) { + lock[aggregationName].inProgress = false; + } else { + t1_1m = t0_1m + 60000; + callback(aggregation, t0_1m, t1_1m); + } + }); + } else { + // the t field represents the beginning of the time slot + // start aggregating from the next slot + t0_1m = result[0].t + 60000; + t1_1m = t0_1m + 60000; + callback(aggregation, t0_1m, t1_1m); + } + }); + }, + core = function (aggregation, t0_1m, t1_1m) { + if (t1_1m <= lastPossibleAggregatableElementTimestamp) { + myLog(aggregationName, "Starting 1 minute aggregation from time: " + new Date(t0_1m)); + aggregation[0].$match._id = {"$gte": myutils.objectIdFromDate(t0_1m), "$lt": myutils.objectIdFromDate(t1_1m)}; + eventsDB.collection("events").aggregate(aggregation, {}, function (error, result) { + if (error) { + throw error; + } + if (result.length > 0) { + myLog(aggregationName, "Finished aggregating documents from the events collection into the 1 minute aggregation ... now inserting."); + + //put _id in key + for (i = 0; i < result.length; i++) { + if (result[i]._id !== undefined && result[i]._id !== null) { + result[i].key = result[i]._id; + result[i].t = t0_1m; + delete result[i]._id; + } + else { + result.splice(i, 1); + i--; + } + } + if (result.length > 0) { + save(aggregationName, result, "1m"); + } + else { + save(aggregationName, new Array({t: t0_1m}), "1m"); + } + } + else { + myLog(aggregationName, "Still inserting an empty entry with t :" + t0_1m + " so we can keep track of what ran when"); + //noinspection JSLint + save(aggregationName, new Array({t: t0_1m}), "1m"); + } + executeAggregationCore5m(aggregation, aggregationName, t0_1m); + }); + } + else { + lock[aggregationName].inProgress = false; + } + }; + + determineTimestamps(core); + }, + + createCappedCollectionsForAggregation = function (aggregationName) { + var intervals = ["1m", "5m", "1h"], + createCollection = function (callback, i) { + if (intervals[i] === undefined) { + lock[aggregationName].collectionsCreated = 1; + } + else { + aggregationsDB.collectionNames(aggregationName + "_" + intervals[i], {}, function (error, results) { + if (error) { + myLog(aggregationName, "Error: " + util.inspect(error)); + } else { + if (results.length === 0) { + //noinspection JSLint,JSUnusedLocalSymbols + aggregationsDB.createCollection(aggregationName + "_" + intervals[i], {capped: true, autoIndexId: true, size: mongoConfigs.aggregations.collectionSize}, function (error, result) { + if (error) { + myLog(aggregationName, "Error: " + util.inspect(error)); + } else { + myLog(aggregationName, "Created capped collection " + aggregationName + "_" + intervals[i] + " with a size of " + mongoConfigs.aggregations.collectionSize + " ..."); + } + callback(callback, i + 1); + }); + } + else { + myLog(aggregationName, "Collection " + aggregationName + "_" + intervals[i] + " already exists."); + callback(callback, i + 1); + } + } + }); + } + }; + + if (lock[aggregationName].collectionsCreated === 0) { + lock[aggregationName].collectionsCreated = 0.5; + myLog(aggregationName, "Preparing to create collections"); + createCollection(createCollection, 0); + } + }, + + executeAggregation = function (aggregation, aggregationName) { + myLog(aggregationName, "---***---***---***--- Starting new iteration... ---***---***---***---"); + createCappedCollectionsForAggregation(aggregationName); + if (lock[aggregationName].inProgress !== true && lock[aggregationName].collectionsCreated === 1) { + lock[aggregationName].inProgress = true; + + if (lock[aggregationName].collectionsCreated === 1) { + executeAggregationCore1m(aggregation, aggregationName, new Date().getTime()); + } + } + else { + myLog(aggregationName, "Will not run as another aggregation is in progress or the capped collections are not yet in place"); + } + }, + + validateAndRunAggregation = function (aggregationObject, interval) { + var keys = 0, + aggregation, + keyName, + aggregationName, + reagregatable = false; + + if (typeof aggregationObject === "object") { + for (keyName in aggregationObject) { + if (aggregationObject.hasOwnProperty(keyName)) { + if (keyName !== "reaggragatable") { + aggregationName = keyName; + keys++; + aggregation = aggregationObject[keyName]; + } else { + if (aggregationObject[keyName] === true) { + reagregatable = true; + } + } + } + } + } + if (!reagregatable) { + throw "Reaggregatable aggregations are not yet implemented"; + } + + if (keys !== 1) { + throw "Too many keys"; + } + + if (!(aggregation && aggregation[0].$match && aggregation[0].$match.type)) { + throw "You are missing the $match.type for aggregation " + aggregationName; + } + + lock[aggregationName] = {}; + lock[aggregationName].collectionsCreated = 0; + + setInterval(function () { + executeAggregation(aggregation, aggregationName); + }, interval); + }; + + options.aggregations.forEach(function (aggregationObject) { + validateAndRunAggregation(aggregationObject, 1000); + }); +}; + diff --git a/lib/analytics/collector.js b/lib/analytics/collector.js new file mode 100644 index 0000000..9c726ba --- /dev/null +++ b/lib/analytics/collector.js @@ -0,0 +1,44 @@ +"use strict"; +var endpoint = require("./endpoint.js"); + +var headers = { + "Content-Type": "application/json", + "Access-Control-Allow-Origin": "*" +}; + +function post(putter) { + return function (request, response) { + var content = ""; + request.on("data", function (chunk) { + content += chunk; + }); + request.on("end", function () { + try { + JSON.parse(content).forEach(putter); + } catch (e) { + response.writeHead(400, headers); + response.end(JSON.stringify({error: e.toString()})); + return; + } + response.writeHead(200, headers); + response.end("{}"); + }); + }; +} + +//noinspection JSLint +exports.register = function (dbs, endpoints, options) { + var db = dbs.events, + putter = require("./event.js").putterInit(db, options), + poster = post(putter); + + endpoints.ws.push( + endpoint("/1.0/event/put", putter) + ); + + endpoints.http.push( + endpoint("POST", "/1.0/event/put", poster) + ); + + endpoints.udp = putter; +}; diff --git a/lib/analytics/database.js b/lib/analytics/database.js new file mode 100644 index 0000000..48c322b --- /dev/null +++ b/lib/analytics/database.js @@ -0,0 +1,43 @@ +"use strict"; +var mongodb = require("mongodb"), + mongoConfigs = require("../../bin/databases-config"), + database = module.exports = {}, + myConnect = function (url, options, name, callback) { + mongodb.Db.connect(url, options, function (error, db) { + callback(error, db, name); + }); + }; + +database.openConnections = function (callback) { + var mongoDBType, + url, + options, + mongoConfig; + + for (mongoDBType in mongoConfigs) { + if (mongoConfigs.hasOwnProperty(mongoDBType)) { + mongoConfig = mongoConfigs[mongoDBType]; + url = database.configurl(mongoConfig); + options = mongoConfig["mongo-options"] || database.configOptions(mongoConfig); + myConnect(url, options, mongoDBType, callback); + } + } +}; + +database.configurl = function (config) { + var user = config["mongo-username"], + pass = config["mongo-password"], + host = config["mongo-host"] || "localhost", + port = config["mongo-port"] || 27017, + name = config["mongo-database"] || "analytics", + auth = user ? user + ":" + pass + "@" : ""; + return "mongodb://" + auth + host + ":" + port + "/" + name; +}; + +database.configOptions = function (config) { + return { + db: config["mongo-database-options"] || { safe: false }, + server: config["mongo-server-options"] || { auto_reconnect: true }, + replSet: { read_secondary: true } + }; +}; \ No newline at end of file diff --git a/lib/analytics/disperser.js b/lib/analytics/disperser.js new file mode 100644 index 0000000..f6c2e5a --- /dev/null +++ b/lib/analytics/disperser.js @@ -0,0 +1,86 @@ +"use strict"; +var endpoint = require("./endpoint.js"), + url = require("url"), + LIMIT_MAX = 1e4, + headers = { + "Content-Type": "application/json", + "Access-Control-Allow-Origin": "*" + }; + +//noinspection JSLint +exports.register = function (dbs, endpoints, options) { + var eventsDB = dbs.events, + aggregationsDB = dbs.aggregations, + eventGetter = require("./event.js").getterInit(eventsDB), + aggregationGetter = require("./aggregation.js").getterInit(aggregationsDB); + + endpoints.ws.push( + endpoint("/1.0/event/get", eventGetter), + endpoint("/1.0/aggregation/get", aggregationGetter) + ); + + function eventGet(request, response) { + request = url.parse(request.url, true).query; + var data = []; + + if (!request.hasOwnProperty('start')) { + request.start = 0; + } + if ((request.hasOwnProperty('limit') && (request.limit >= LIMIT_MAX))) { + request.limit = LIMIT_MAX; + } + + function documentHandler(dataElem) { + if (dataElem === null) { + response.end(JSON.stringify(data.reverse())); + } + else { + data.push(dataElem); + } + } + + if (eventGetter(request, documentHandler) < 0) { + response.writeHead(400, headers); + response.end(JSON.stringify(data[0])); + } else { + response.writeHead(200, headers); + } + + } + + function aggregationGet(request, response) { + request = url.parse(request.url, true).query; + var data = []; + + if (!request.hasOwnProperty('start')) { + request.start = 0; + } + if ((request.hasOwnProperty('limit') && (request.limit >= LIMIT_MAX))) { + request.limit = LIMIT_MAX; + } + + function documentHandler(dataElem) { + if (dataElem === null) { + response.end(JSON.stringify(data.reverse())); + } + else { + data.push(dataElem); + } + } + + if (aggregationGetter(request, documentHandler) < 0) { + response.writeHead(400, headers); + response.end(JSON.stringify(data[0])); + } else { + response.writeHead(200, headers); + } + + } + + endpoints.http.push( + endpoint("GET", "/1.0/event/get", eventGet), + endpoint("GET", "/1.0/aggregation/get", aggregationGet) + ); + +}; + diff --git a/lib/analytics/endpoint.js b/lib/analytics/endpoint.js new file mode 100644 index 0000000..062a387 --- /dev/null +++ b/lib/analytics/endpoint.js @@ -0,0 +1,25 @@ +"use strict"; +// creates an endpoint with given HTTP method, URL path and dispatch (function) +// (method argument is optional) +// endpoints are evaluated in server.js and +// dispatch(request, response) is called if path/method matches +module.exports = function (method, path, dispatch) { + var match; + + if (arguments.length < 3) { + dispatch = path; + path = method; + match = function (p) { + return p === path; + }; + } else { + match = function (p, m) { + return m === method && p === path; + }; + } + + return { + match: match, + dispatch: dispatch + }; +}; diff --git a/lib/analytics/event.js b/lib/analytics/event.js new file mode 100644 index 0000000..a1f2ca7 --- /dev/null +++ b/lib/analytics/event.js @@ -0,0 +1,96 @@ +"use strict"; +var mongodb = require("mongodb"), + type_re = /^[a-z][a-zA-Z0-9_]+$/, + genericGetter = require("./genericGetter.js"), + myutils = require("./myutils.js"); + +exports.putterInit = function (db, options) { + var eventsCollectionCreated = 0, + eventsToSave = [], + event, + collectionSize = options.collectionSize; + + if (myutils.isInt(collectionSize)) { + throw "Invalid collection size: " + collectionSize; + } + function handle(error) { + if (error) { + throw error; + } + } + + function save(event) { + db.collection("events").insert(event, {w: 0}); + } + + function putter(request, messageSenderCallback) { + var time = new Date().getTime(); + + function saveEvents() { + eventsToSave.forEach(function (event) { + save(event); + }); + eventsToSave = []; + } + + // validations + if (!type_re.test(request.type)) { + messageSenderCallback({error: "invalid type"}); + return -1; + } + if (isNaN(time)) { + messageSenderCallback({error: "invalid time"}); + return -1; + } + + // If an id is specified, promote it to Mongo's primary key. + event = {t: time, d: request.data, type: request.type}; + if (request.hasOwnProperty("id")) { + event._id = request.id; + } + // If eventsCollectionCreated, save immediately. + if (eventsCollectionCreated === 1) { + return save(event); + } + + // If someone is already creating the event collection + // then append this event to the queue for later save. + if (eventsCollectionCreated === 0.5) { + return eventsToSave.push(event); + } + eventsCollectionCreated = 0.5; + + // Otherwise, it's up to us to see if the collection exists, verify the + // associated indexes and save + // any events that have queued up in the interim! + + // First add the new event to the queue. + eventsToSave.push(event); + + // If the events collection exists, then we assume the indexes do + // too. Otherwise, we must create the required collections and indexes. + + db.collectionNames("events", {}, function (error, names) { + if (error) { + throw error; + } + if (names.length) { + eventsCollectionCreated = 1; + return saveEvents(); + } + + // Events are indexed by time which is _id, which is natural order. + db.createCollection("events", {capped: true, autoIndexId: true, size: collectionSize}, function (error, result) { + handle(error); + eventsCollectionCreated = 1; + saveEvents(); + }); + }); + } + + return putter; +}; + +exports.getterInit = function (eventsDB) { + return genericGetter(eventsDB); +}; diff --git a/lib/analytics/genericGetter.js b/lib/analytics/genericGetter.js new file mode 100644 index 0000000..d9bf122 --- /dev/null +++ b/lib/analytics/genericGetter.js @@ -0,0 +1,196 @@ +"use strict"; +var myutils = require("./myutils.js"), + mongodb = require("mongodb"), + type_re = /^[a-z][a-zA-Z0-9_]+$/, + MAX_RETURNED_RECORDS = 10000; +function customQuery(collectionObj, filter, sort, limit, batchSize, streamified, documentHandler) { + // set MongoDB cursor options + var cursorOptions = {}, + cursor, + stream; + + if (filter === undefined) { + filter = {}; + } + if (sort === undefined) { + sort = {$natural: -1}; + } + + if (streamified) { + cursorOptions = { + tailable: true, + awaitdata: true, + numberOfRetries: -1 + }; + } + + cursor = collectionObj.find(filter, cursorOptions); + if (streamified) { + stream = cursor.sort({$natural: -1}).stream(); + stream.on('data', function (document) { + documentHandler({id: document._id, time: document.t, data: (document.d === undefined ? document.key : document.d)}); + }); + } + else { + cursor = cursor.sort(sort).limit(limit).batchSize(batchSize); + cursor.each(function (error, document) { + if (documentHandler.closed) { + //noinspection JSLint + return cursor.close(function (error, result) { + if (error) { + throw error; + } + //do nothing + }); + } + + if (error) { + throw error; + } + + // A null name indicates that there are no more results. + if (document) { + documentHandler({id: document._id, time: document.t, data: (document.d === undefined ? document.key : document.d)}); + } + else { + documentHandler(null); + } + }); + } +} + +module.exports = function (db) { + var collection, + streamsByName = {}; + + function open(documentHandler) { + return !documentHandler.closed; + } + + function getter(request, documentHandler) { + + var stream = !request.hasOwnProperty("stop"), + start = new Date(request.start).getTime(), + stop = stream ? undefined : (request.stop ? (new Date(request.stop)).getTime() : undefined), + name = request.hasOwnProperty("name") ? request.name : undefined, + type = request.hasOwnProperty("type") ? request.type : undefined, + limit, + sort, + batchSize, + filter, + streams, + collectionName; + + // Validate the date and type. + if (!type_re.test(request.type)) { + documentHandler({error: "invalid type"}); + return -1; + } + + // Validate the dates. + if (isNaN(start)) { + documentHandler({error: "invalid start"}); + return -1; + } + if (isNaN(stop)) { + stop = undefined; + } + + if (!stream) { + if (stop === undefined) { + documentHandler({error: "invalid stop"}); + return -1; + } + } + + // Set an optional limit on the number of documents to return. + sort = {_id: -1}; + batchSize = 10000; + if (request.hasOwnProperty("limit")) { + limit = request.limit; + } + + if (limit !== undefined || limit > MAX_RETURNED_RECORDS) { + limit = MAX_RETURNED_RECORDS; + } + + // Copy any expression filters into the match object. + filter = {_id: {$gte: myutils.objectIdFromDate(start)}}; + if (stop !== undefined) { + filter._id.$lt = myutils.objectIdFromDate(stop); + } + + if (name === undefined) { + collectionName = "events"; + if (type !== undefined) { + filter.type = type; + } + else { + throw "We cannot query events without a type"; + } + } else { + collectionName = name; + } + + db.collectionNames(collectionName, {}, function (err, result) { + if (err) { + throw err; + } + if (result.length !== 1) { + throw "The number of results for collection: " + collectionName + " is different than 1"; + } + collection = db.collection(collectionName); + + // For streaming queries, share streams for efficient polling. + if (stream && collectionName && streamsByName) { + streams = streamsByName[collectionName]; + + if (streams && streams.waiting && Array.isArray(streams.waiting)) { + streams.waiting.push(documentHandler); + customQuery(collection, filter, sort, limit, batchSize, true, documentHandler); + } + else { + if (streamsByName && collectionName) { + streams = streamsByName[collectionName] = {waiting: [], active: [documentHandler]}; + customQuery(collection, filter, sort, limit, batchSize, true, function (document) { + + // If there's an name, send it to all active, open clients. + if (document) { + streams.active.forEach(function (documentHandler) { + if (!documentHandler.closed) { + documentHandler(document); + } + }); + } + else { + streams.active = streams.active.concat(streams.waiting).filter(open); + streams.waiting = []; + + // If no clients remain, then it's safe to delete the shared + // stream, and we'll no longer be responsible for polling. + if (!streams.active.length) { + delete streamsByName[collectionName]; + } + } + }); + } + } + } + else { + customQuery(collection, filter, sort, limit, batchSize, false, documentHandler); + + } + }); + + } + + getter.close = function (documentHandler) { + documentHandler.closed = true; + }; + + return getter; +}; + + + + diff --git a/lib/analytics/index.js b/lib/analytics/index.js new file mode 100644 index 0000000..8ad10a4 --- /dev/null +++ b/lib/analytics/index.js @@ -0,0 +1,5 @@ +exports.server = require("./server.js"); +exports.collector = require("./collector.js"); +exports.disperser = require("./disperser.js"); +exports.aggregator = require("./aggregator.js"); +exports.endpoint = require("./endpoint.js"); diff --git a/lib/analytics/myutils.js b/lib/analytics/myutils.js new file mode 100644 index 0000000..4377a07 --- /dev/null +++ b/lib/analytics/myutils.js @@ -0,0 +1,18 @@ +"use strict"; + +var mongodb = require("mongodb"); +module.exports.objectIdFromDate = function (ts) { + return new mongodb.ObjectID(Math.floor(ts / 1000).toString(16) + "0000000000000000"); +}; +module.exports.dateFromObjectId = function (obj) { + return obj.getTimestamp(); +}; +module.exports.epochFromObjectId = function (obj) { + return parseInt(obj.valueOf().slice(0, 8), 16); +}; +module.exports.epochMSFromObjectId = function (obj) { + return parseInt(obj.valueOf().slice(0, 8), 16); +}; +module.exports.isInt = function (n) { + return typeof n === 'number' && parseFloat(n) === parseInt(n, 10) && !isNaN(n); +}; \ No newline at end of file diff --git a/lib/analytics/server.js b/lib/analytics/server.js new file mode 100644 index 0000000..95931b0 --- /dev/null +++ b/lib/analytics/server.js @@ -0,0 +1,166 @@ +"use strict"; +var util = require("util"), + url = require("url"), + http = require("http"), + dgram = require("dgram"), + websocket = require("ws"), + staticModule = require("node-static"), + database = require('./database.js'); + +// Configuration for WebSocket requests. + +//var wsOptions = { +// maxReceivedFrameSize: 0x10000, +// maxReceivedMessageSize: 0x100000, +// fragmentOutgoingMessages: true, +// fragmentationThreshold: 0x4000, +// keepalive: true, +// keepaliveInterval: 20000, +// assembleFragments: true, +// disableNagleAlgorithm: true, +// closeTimeout: 5000 +//}; + +function ignore() { + console.log("Ignoring..."); + // Responses for UDP are ignored; there's nowhere for them to go! +} + +module.exports = function (options) { + + // Don't crash on errors. + process.on("uncaughtException", function (error) { + util.log("uncaught exception: " + error); + util.log(error.stack); + }); + + var server = {}, + httpServer = http.createServer(), + wsOptions = {server: httpServer}, + WebsocketServer = websocket.Server, + file = new staticModule.Server("static"), + endpoints = {ws: [], http: []}, + id = 0, + wss = new WebsocketServer(wsOptions); + + // Register httpServer WebSocket listener with fallback. + // httpServer.on("upgrade", function (request, socket, head) { + // if ("sec-websocket-version" in request.headers) { + // request = new websocket.request(socket, request, wsOptions); + // request.readHandshake(); + // connect(request.accept(request.requestedProtocols[0], request.origin), request.httpRequest); + // } else if (request.method === "GET" && /^websocket$/i.test(request.headers.upgrade) && /^upgrade$/i.test(request.headers.connection)) { + // wss.Connection(wss.manager, wss.options, request, socket, head); + // } + // }); + + wss.on("upgrade", function () { + console.log("-----upgrade-----"); + }); + + // Register wss WebSocket listener. + wss.on("connection", function (socket) { + var url = socket.upgradeReq.url, + foundMatch = false, + i, + n, + endpoint, + messageSender; + + // Forward messages to the appropriate endpoint, or close the connection. + n = endpoints.ws.length; + for (i = 0; i < n; i++) { + if ((endpoint = endpoints.ws[i]).match(url)) { + foundMatch = true; + break; + } + } + if (foundMatch) { + messageSender = function (response) { + socket.send(JSON.stringify(response)); + }; + + messageSender.id = ++id; + + // Listen for socket disconnect. + if (endpoint.dispatch.close) { + socket.on("end", function () { + endpoint.dispatch.close(messageSender); + }); + } + + socket.on("message", function (message) { + console.log("Got message: " + message); + console.log("Current WS status is " + socket.readyState); + endpoint.dispatch(JSON.parse(message), messageSender); + }); + return; + } + socket.close(); + }); + + // Register HTTP listener. + httpServer.on("request", function (request, response) { + var u = url.parse(request.url), + i, + n, + endpoint; + + // Forward messages to the appropriate endpoint, or 404. + n = endpoints.http.length; + for (i = 0; i < n; i++) { + if ((endpoint = endpoints.http[i]).match(u.pathname, request.method)) { + endpoint.dispatch(request, response); + return; + } + } + + // If this request wasn't matched, see if there's a static file to serve. + request.on("end", function () { + file.serve(request, response, function (error) { + if (error) { + response.writeHead(error.status, {"Content-Type": "text/plain"}); + //noinspection JSLint + response.end(error.status + ""); + } + }); + }); + + // as of node v0.10, 'end' is not emitted unless read() called + if (request.read !== undefined) { + request.read(); + } + }); + + server.dbLength = 0; + server.start = function () { + // Connect to mongodb. + util.log("starting mongodb client"); + var dbs = {}; + + database.openConnections(function (error, db, name) { + if (error) { + throw error; + } + dbs[name] = db; + server.dbLength++; + if (server.dbLength === 2) { + server.register(dbs, endpoints, options); + if (options["http-port"] !== undefined) { + util.log("starting http server on port " + options["http-port"]); + httpServer.listen(options["http-port"]); + if (endpoints.udp && options["udp-port"] !== undefined) { + util.log("starting udp server on port " + options["udp-port"]); + var udp = dgram.createSocket("udp4"); + udp.on("message", function (message) { + endpoints.udp(JSON.parse(message.toString("utf8")), ignore); + }); + udp.bind(options["udp-port"]); + } + } + } + }); + }; + + return server; +};