diff --git a/bin/aggregator-config.js b/bin/aggregator-config.js index 09f2001..d7bfb2f 100644 --- a/bin/aggregator-config.js +++ b/bin/aggregator-config.js @@ -1,28 +1,11 @@ module.exports = { - mongo: [ - { - "mongo-host": "192.168.56.101", - "mongo-port": 27017, - "mongo-database": "shopcade_cube_events", - "mongo-username": null, - "mongo-password": null - }, - { - "mongo-host": "192.168.56.101", - "mongo-port": 27017, - "mongo-database": "shopcade_cube_aggregations", - "mongo-username": null, - "mongo-password": null - } - ], "aggregations": [ { - "reagragatable": true, + "reaggragatable": true, "agg1": [ { "$match": { - "type": "t1", - "t": "This gets overwritten with the correct expressions for the 1 or 5 minute intervals" + "type": "t1" //mandatory }}, { "$project": { @@ -83,73 +66,73 @@ module.exports = { } ] } -// , -// { -// "reagragatable": true, -// "t2": [ -// { -// "$match": { -// "type": "t2", -// "t": "This gets overwritten with the correct expressions for the 1 or 5 minute intervals" -// }}, -// { -// "$project": { -// _id: 0, -// type: "$type", -// v1: "$d.v1", -// v2: "$d.v2", -// v3: { -// $cond: [ -// { -// $eq: ["$d.v3", "US"] -// }, -// "US", -// { -// $cond: [ -// { -// $eq: ["$d.v3", "GB"] -// }, -// "GB", -// { -// $cond: [ -// { -// $eq: ["$d.v3", "JP"] -// }, -// "JP", -// { -// $cond: [ -// { -// $eq: ["$d.v3", "IN"] -// }, -// "IN", -// "OTHER" -// ] -// } -// ] -// } -// ] -// } -// ] -// } -// } -// }, -// { -// "$group": { -// "_id": { -// "type": "$type", -// "v1": "$v1", -// "v2": "$v2", -// "v3": "$v3" -// }, -// "count": { -// $sum: 1 -// }, -// "index": { -// $avg: "$index" -// } -// } -// } -// ] -// } + // , + // { + // "reaggragatable": true, + // "t2": [ + // { + // "$match": { + // "type": "t2", + // "t": "This gets overwritten with the correct expressions for the 1 or 5 minute intervals" + // }}, + // { + // "$project": { + // _id: 0, + // type: "$type", + // v1: "$d.v1", + // v2: "$d.v2", + // v3: { + // $cond: [ + // { + // $eq: ["$d.v3", "US"] + // }, + // "US", + // { + // $cond: [ + // { + // $eq: ["$d.v3", "GB"] + // }, + // "GB", + // { + // $cond: [ + // { + // $eq: ["$d.v3", "JP"] + // }, + // "JP", + // { + // $cond: [ + // { + // $eq: ["$d.v3", "IN"] + // }, + // "IN", + // "OTHER" + // ] + // } + // ] + // } + // ] + // } + // ] + // } + // } + // }, + // { + // "$group": { + // "_id": { + // "type": "$type", + // "v1": "$v1", + // "v2": "$v2", + // "v3": "$v3" + // }, + // "count": { + // $sum: 1 + // }, + // "index": { + // $avg: "$index" + // } + // } + // } + // ] + // } ] }; diff --git a/bin/collector-config.js b/bin/collector-config.js index 36f5958..6b9738d 100644 --- a/bin/collector-config.js +++ b/bin/collector-config.js @@ -1,13 +1,4 @@ module.exports = { - mongo: [ - { - "mongo-host": "192.168.56.101", - "mongo-port": 27017, - "mongo-database": "shopcade_cube_events", - "mongo-username": null, - "mongo-password": null - } - ], "http-port": 1080, "udp-port": 1180 }; diff --git a/bin/databases-config.js b/bin/databases-config.js new file mode 100644 index 0000000..062edde --- /dev/null +++ b/bin/databases-config.js @@ -0,0 +1,17 @@ +module.exports = { + events: { + "mongo-host": "192.168.56.101", + "mongo-port": 27017, + "mongo-database": "shopcade_cube_events", + "mongo-username": null, + "mongo-password": null + }, + aggregations: { + "mongo-host": "192.168.56.101", + "mongo-port": 27017, + "mongo-database": "shopcade_cube_aggregations", + "mongo-username": null, + "mongo-password": null, + "collectionSize": 256 * 1024 * 1024 + } +}; \ No newline at end of file diff --git a/bin/disperser-config.js b/bin/disperser-config.js index da81848..a12f29c 100644 --- a/bin/disperser-config.js +++ b/bin/disperser-config.js @@ -1,19 +1,3 @@ module.exports = { - mongo: [ - { - "mongo-host": "192.168.56.101", - "mongo-port": 27017, - "mongo-database": "shopcade_cube_events", - "mongo-username": null, - "mongo-password": null - }, - { - "mongo-host": "192.168.56.101", - "mongo-port": 27017, - "mongo-database": "shopcade_cube_aggregations", - "mongo-username": null, - "mongo-password": null - } - ], "http-port": 1081 }; \ No newline at end of file diff --git a/examples/event-stream/event-get.html b/examples/event-stream/event-get.html index 742ca7a..8d0bcb4 100644 --- a/examples/event-stream/event-get.html +++ b/examples/event-stream/event-get.html @@ -30,7 +30,8 @@

Streaming Events

console.log("connected!"); socket.send(JSON.stringify({ type: type, - start: new Date() + stop: new Date(), + start: new Date().getTime()-5*60*1000 })); }; @@ -38,12 +39,12 @@

Streaming Events

var event; if (message) { console.log(index++); - // console.log(message); + //console.log(message); if (message.data) { event = JSON.parse(message.data); - // console.log("received", message.data); + //console.log("received", message.data); if (event && event.data) { - // console.log("received", event.data); + //console.log("received", event.data); } } } diff --git a/examples/random-emitter/random-config.js b/examples/random-emitter/random-config.js deleted file mode 100644 index 7860f93..0000000 --- a/examples/random-emitter/random-config.js +++ /dev/null @@ -1,15 +0,0 @@ -module.exports = { - - // The collector to send events to. - "collector": "ws://127.0.0.1:1080", - - // The offset and duration to backfill, in milliseconds. - // For example, if the offset is minus four hours, then the first event that - // the random emitter sends will be four hours old. It will then generate more - // recent events based on the step interval, all the way up to the duration. - "offset": -4 * 60 * 60 * 1000, - "duration": 8 * 60 * 60 * 1000, - - // The time between random events. - "step": 1000 * 10 -}; diff --git a/examples/random-emitter/random-emitter.js b/examples/random-emitter/random-emitter.js deleted file mode 100644 index a9ce70f..0000000 --- a/examples/random-emitter/random-emitter.js +++ /dev/null @@ -1,30 +0,0 @@ -process.env.TZ = 'UTC'; - -var util = require("util"), - cube = require("../../"), // replace with require("cube") - options = require("./random-config"); - -util.log("starting emitter"); -var emitter = cube.emitter(options["collector"]); - -var start = Date.now() + options["offset"], - stop = start + options["duration"], - step = options["step"], - value = 0, - count = 0; - -while (start < stop) { - emitter.send({ - type: "random", - time: new Date(start), - data: { - value: value += Math.random() - .5 - } - }); - start += step; - ++count; -} - -util.log("sent " + count + " events"); -util.log("stopping emitter"); -emitter.close(); diff --git a/lib/cube/aggregation.js b/lib/cube/aggregation.js index c7a6af8..6805440 100644 --- a/lib/cube/aggregation.js +++ b/lib/cube/aggregation.js @@ -2,88 +2,6 @@ var mongodb = require("mongodb"), genericGetter = require("./genericGetter.js"); -//exports.putterInit = function (db) { -// var eventsCollectionCreated = 0, -// eventsToSave = [], -// event; -// -// function handle(error) { -// if (error) { -// throw error; -// } -// } -// -// function save(event) { -// db.collection("events").insert(event, {w: 0}); -// } -// -// function putterInit(request, callback) { -// var time = request.hasOwnProperty("time") ? new Date(request.time) : new Date(); -// -// function saveEvents() { -// eventsToSave.forEach(function (event) { -// save(event); -// }); -// eventsToSave = []; -// } -// -// // Validate the date and type. -// if (!type_re.test(request.type)) { -// callback({error: "invalid type"}); -// return -1; -// } -// if (isNaN(time)) { -// callback({error: "invalid time"}); -// return -1; -// } -// -// // If an id is specified, promote it to Mongo's primary key. -// event = {t: time, d: request.data, type: request.type}; -// if (request.hasOwnProperty("id")) { -// event._id = request.id; -// } -// // If eventsCollectionCreated, save immediately. -// if (eventsCollectionCreated === 1) { -// return save(event); -// } -// -// // If someone is already creating the event collection -// // then append this event to the queue for later save. -// if (eventsCollectionCreated === 0.5) { -// return eventsToSave.push(event); -// } -// eventsCollectionCreated = 0.5; -// -// // Otherwise, it's up to us to see if the collection exists, verify the -// // associated indexes and save -// // any events that have queued up in the interim! -// -// // First add the new event to the queue. -// eventsToSave.push(event); -// -// // If the events collection exists, then we assume the indexes do -// // too. Otherwise, we must create the required collections and indexes. -// -// db.collectionNames("events", {}, function (error, names) { -// if (error) { -// throw error; -// } -// if (names.length) { -// eventsCollectionCreated = 1; -// return saveEvents(); -// } -// var events = db.collection("events"); -// -// // Events are indexed by time. -// events.ensureIndex({"t": 1}, handle); -// eventsCollectionCreated = 1; -// saveEvents(); -// }); -// } -// -// return putterInit; -//}; - exports.getterInit = function (aggregationDB) { return genericGetter(aggregationDB); }; diff --git a/lib/cube/aggregator.js b/lib/cube/aggregator.js index 05153d7..6ca13fa 100644 --- a/lib/cube/aggregator.js +++ b/lib/cube/aggregator.js @@ -1,216 +1,443 @@ "use strict"; /* - * Aggregation have fixed times of execution: 1m and 5m + * Aggregation have fixed times of execution: 1m, 5m and 1h */ var util = require('util'), - mongoutils = require("./mongoutils.js"), - lock = false; + myutils = require("./myutils.js"), + mongodb = require("mongodb"), + mongoConfigs = require("../../bin/databases-config"), + lock = {}, + myLog = function (param1, param2) { + console.log(new Date() + " Agg: " + param1 + " :: " + param2); + }; //noinspection JSLint exports.register = function (dbs, endpoints, options) { - if (dbs.length !== 2) { - throw "We need to receive exactly 2(two) databases"; - } - var eventsDB = dbs[0], - aggregationsDB = dbs[1]; - - function executeAggregationCore5m(aggregation, aggregationName, t0_5m, t1_1m, reagregatable) { - var aggGrKey, - fiveMinAggregation, - aggregationGroup, - t1_5m, - i; - - t1_5m = t0_5m + 300000; - t1_5m = new Date(t1_5m); - - if (t1_5m <= t1_1m) { - console.log("Starting 5 minute aggregation from time: " + new Date(t0_5m)); - - //this way we can insure the 1 minute aggregation happens before the 5 minute one, so the 5 minute one can use the result of the first - if (reagregatable === true) { - for (i = 0; i < aggregation.length; i++) { - if (aggregation[i].$group !== undefined) { - aggregationGroup = JSON.parse(JSON.stringify(aggregation[i].$group)); - break; - } + var eventsDB = dbs.events, + aggregationsDB = dbs.aggregations, + save = function (aggregationName, documentsArray, interval) { + var i; + for (i = 0; i < documentsArray.length; i++) { + if (documentsArray[i] === undefined || documentsArray[i].t === undefined) { + myLog(aggregationName, util.inspect(documentsArray)); + throw "Failed to get t"; } - if (aggregationGroup === undefined) { - throw "Could not find $group statement for aggregation " + aggregationName; + documentsArray[i]._id = new mongodb.ObjectID(documentsArray[i].t / 1000); + } + //noinspection JSLint + aggregationsDB.collection(aggregationName + "_" + interval).insert(documentsArray, {w: 1}, function (error, result) { + if (error) { + throw error; + } + myLog(aggregationName, "Inserted " + documentsArray.length + " documents for the " + interval + " interval"); } - aggregationGroup._id = "$key"; - - console.log("Replacing $group params with new ones for reaggregating"); - //we can handle $sum,$avg,$min,$max:x - where x is not the same field or is a constant - for (aggGrKey in aggregationGroup) { - if (aggregationGroup.hasOwnProperty(aggGrKey) && aggGrKey !== "_id") { - //console.log(util.inspect([aggregationGroup, aggGrKey], {color: true, depth: null})); - - if (aggregationGroup[aggGrKey].$sum !== undefined) { - aggregationGroup[aggGrKey].$sum = "$" + aggGrKey; - } else if (aggregationGroup[aggGrKey].$avg !== undefined) { - delete aggregationGroup[aggGrKey].$avg; - aggregationGroup[aggGrKey].$sum = "$" + aggGrKey + "/5"; - } else if (aggregationGroup[aggGrKey].$min !== undefined) { - aggregationGroup[aggGrKey].$min = "$" + aggGrKey; - } else if (aggregationGroup[aggGrKey].$max !== undefined) { - aggregationGroup[aggGrKey].$max = "$" + aggGrKey; + ); + }, + executeAggregationCore1h = function (aggregation, aggregationName, lastPossibleAggregatableElementTimestamp) { + var reaggregation, + aggregationGroup, + t0_1h, + t1_1h, + i, + determineTimestamps = function (callback) { + //try to continue from where we left off + aggregationsDB.collection(aggregationName + "_1h").find({}, {}, {sort: {_id: -1}, limit: 1}).toArray(function (error, result) { + if (error) { + throw error; + } + + if (result === null || result.length === 0) { + //no previous aggregations were found. look for the first event of the aggregation type and start from there + aggregationsDB.collection(aggregationName + "_5m").find({}, {}, {sort: {_id: 1}, limit: 1}).toArray(function (error, result) { + if (error) { + throw error; + } + + if (!Array.isArray(result) || result.length === 0) { + myLog(aggregationName, "There are no 5m records, so we cannot aggregate for 1 hour."); + t0_1h = undefined; + } else { + t0_1h = result[0].t; + t0_1h = t0_1h - t0_1h % 3600000; + } + + if (t0_1h === undefined) { + lock[aggregationName].inProgress = false; + } else { + t1_1h = t0_1h + 3600000; + callback(aggregation, t0_1h, t1_1h); + } + }); } else { - throw "Unrecognised keyword. We only accept $min, $max, $sum and $avg"; + // the t field represents the beginning of the time slot + // start aggregating from the next slot + t0_1h = result[0].t + 3600000; + t1_1h = t0_1h + 3600000; + callback(aggregation, t0_1h, t1_1h); + } + }); + }, + prepareReaggregation = function (aggregation) { + var aggGrKey; + for (i = 0; i < aggregation.length; i++) { + if (aggregation[i].$group !== undefined) { + aggregationGroup = JSON.parse(JSON.stringify(aggregation[i].$group)); + break; } } - } + if (aggregationGroup === undefined) { + throw "Could not find $group statement for aggregation " + aggregationName; + } + aggregationGroup._id = "$key"; - fiveMinAggregation = [ - {"$match": {_id: {$gte: mongoutils.objectIdFromDate(t0_5m), $lt: mongoutils.objectIdFromDate(t0_5m + 300000)}}}, - {"$group": aggregationGroup} - ]; - - console.log("Here is how the new aggregation for 5 minutes looks like: " + util.inspect(fiveMinAggregation, {color: true, depth: null})); - aggregationsDB.collection(aggregationName + "_1m").aggregate( - fiveMinAggregation, {}, function (err, result) { - var insertEmptyRecord; - if (err) { - throw err; + //myLog(aggregationName,"Replacing $group params with new ones for reaggregating ..."); + //we can handle $sum,$avg,$min,$max:x - where x is not the same field or is a constant + for (aggGrKey in aggregationGroup) { + if (aggregationGroup.hasOwnProperty(aggGrKey) && aggGrKey !== "_id") { + if (aggregationGroup[aggGrKey].$sum !== undefined) { + aggregationGroup[aggGrKey].$sum = "$" + aggGrKey; + } else if (aggregationGroup[aggGrKey].$avg !== undefined) { + delete aggregationGroup[aggGrKey].$avg; + aggregationGroup[aggGrKey].$sum = "$" + aggGrKey + "/12"; + } else if (aggregationGroup[aggGrKey].$min !== undefined) { + aggregationGroup[aggGrKey].$min = "$" + aggGrKey; + } else if (aggregationGroup[aggGrKey].$max !== undefined) { + aggregationGroup[aggGrKey].$max = "$" + aggGrKey; + } else { + throw "Unrecognised keyword. We only accept $min, $max, $sum and $avg"; + } } - if (result.length > 0) { - for (i = 0; i < result.length; i++) { - if (result[i]._id === undefined || result[i]._id === null) { - insertEmptyRecord = true; - } - result[i].key = result[i]._id; - result[i].t = new Date(t0_5m); - delete result[i]._id; + } + + reaggregation = [ + {"$match": {_id: {$gte: myutils.objectIdFromDate(t0_1h), $lt: myutils.objectIdFromDate(t0_1h + 3600000)}}}, + {"$group": aggregationGroup} + ]; + //myLog(aggregationName,"Here is how the new aggregation for 1 hour looks like: " + util.inspect(reaggregation, {color: true, depth: null})); + }, + core = function (aggregation, t0_1h, t1_1h) { + if (t1_1h <= lastPossibleAggregatableElementTimestamp) { + myLog(aggregationName, "Starting 1 hour aggregation from time: " + new Date(t0_1h)); + prepareReaggregation(aggregation); + aggregationsDB.collection(aggregationName + "_5m").aggregate(reaggregation, {}, function (error, result) { + if (error) { + throw error; } + if (result.length > 0) { + myLog(aggregationName, "Finished aggregating documents from the 5 minute aggregation into the 1 hour aggregation ... now inserting."); - //console.log(util.inspect(result, {color: true, depth: null})); - //noinspection JSLint - aggregationsDB.collection(aggregationName + "_5m").insert(result, {w: 1}, function (err, result) { - if (err) { - throw err; + //put _id in key + for (i = 0; i < result.length; i++) { + if (result[i]._id !== undefined && result[i]._id !== null) { + result[i].key = result[i]._id; + result[i].t = t0_1h; + delete result[i]._id; + } + else { + result.splice(i, 1); + i--; + } + } + if (result.length > 0) { + save(aggregationName, result, "1h"); } + else { + save(aggregationName, new Array({t: t0_1h}), "1h"); + } + } + else { + myLog(aggregationName, "Still inserting an empty entry with t :" + t0_1h + " so we can keep track of what ran when"); + save(aggregationName, new Array({t: t0_1h}), "1h"); + } + lock[aggregationName].inProgress = false; - lock = false; - }); - } - else { - insertEmptyRecord = true; + }); + + } else { + lock[aggregationName].inProgress = false; + } + + }; + determineTimestamps(core); + }, + + executeAggregationCore5m = function (aggregation, aggregationName, lastPossibleAggregatableElementTimestamp) { + var reaggregation, + aggregationGroup, + t0_5m, + t1_5m, + i, + determineTimestamps = function (callback) { + //try to continue from where we left off + aggregationsDB.collection(aggregationName + "_5m").find({}, {}, {sort: {_id: -1}, limit: 1}).toArray(function (error, result) { + if (error) { + throw error; } - if (insertEmptyRecord) { - //noinspection JSLint - aggregationsDB.collection(aggregationName + "_5m").insert({t: new Date(t0_5m)}, {w: 1}, function (err, result) { - if (err) { - throw err; + + if (result === null || result.length === 0) { + //no previous aggregations were found. look for the first event of the aggregation type and start from there + aggregationsDB.collection(aggregationName + "_1m").find({}, {}, {sort: {_id: 1}, limit: 1}).toArray(function (error, result) { + if (error) { + throw error; + } + + if (!Array.isArray(result) || result.length === 0) { + myLog(aggregationName, "There are no 1m records, so we cannot aggregate for 5 minutes."); + t0_5m = undefined; + } else { + t0_5m = result[0].t; + t0_5m = t0_5m - t0_5m % 300000; } - lock = false; + if (t0_5m === undefined) { + lock[aggregationName].inProgress = false; + } else { + t1_5m = t0_5m + 300000; + callback(aggregation, t0_5m, t1_5m); + } }); + } else { + // the t field represents the beginning of the time slot + // start aggregating from the next slot + t0_5m = result[0].t + 300000; + t1_5m = t0_5m + 300000; + callback(aggregation, t0_5m, t1_5m); } }); - } - else { - lock = false; - throw "5 minute aggregations that are not reaggregatable are not yet implemented"; - } - } else { - lock = false; - } - } - - function executeAggregationCore1m(aggregation, aggregationName, t0_1m, reagregatable) { - var i, - t1_1m = t0_1m + 60000, - t0_5m; - - console.log("Starting 1 minute aggregation from time: " + new Date(t0_1m)); - - t0_1m = new Date(t0_1m); - t1_1m = new Date(t1_1m); - if (t1_1m < new Date()) { - //start aggregating - aggregation[0].$match._id = {"$gte": mongoutils.objectIdFromDate(t0_1m), "$lt": mongoutils.objectIdFromDate(t1_1m)}; - //console.log(util.inspect(aggregation, { depth: null, colors: true})); - eventsDB.collection("events").aggregate(aggregation, {}, function (err, result) { - if (err) { - throw err; - } - //console.log(util.inspect(result, { depth: null, colors: true})); - if (result.length > 0) { - console.log("Finished aggregating events for 1 minute for aggregation " + aggregationName); - - for (i = 0; i < result.length; i++) { - result[i].key = result[i]._id; - result[i].t = t0_1m; - delete result[i]._id; + }, + prepareReaggregation = function (aggregation) { + var aggGrKey; + for (i = 0; i < aggregation.length; i++) { + if (aggregation[i].$group !== undefined) { + aggregationGroup = JSON.parse(JSON.stringify(aggregation[i].$group)); + break; + } + } + if (aggregationGroup === undefined) { + throw "Could not find $group statement for aggregation " + aggregationName; } - //noinspection JSLint - aggregationsDB.collection(aggregationName + "_1m").insert(result, {w: 1}, function (err, result) { - if (err) { - throw err; + aggregationGroup._id = "$key"; + + //myLog(aggregationName, "Replacing $group params with new ones for reaggregating"); + //we can handle $sum,$avg,$min,$max:x - where x is not the same field or is a constant + for (aggGrKey in aggregationGroup) { + if (aggregationGroup.hasOwnProperty(aggGrKey) && aggGrKey !== "_id") { + if (aggregationGroup[aggGrKey].$sum !== undefined) { + aggregationGroup[aggGrKey].$sum = "$" + aggGrKey; + } else if (aggregationGroup[aggGrKey].$avg !== undefined) { + delete aggregationGroup[aggGrKey].$avg; + aggregationGroup[aggGrKey].$sum = "$" + aggGrKey + "/5"; + } else if (aggregationGroup[aggGrKey].$min !== undefined) { + aggregationGroup[aggGrKey].$min = "$" + aggGrKey; + } else if (aggregationGroup[aggGrKey].$max !== undefined) { + aggregationGroup[aggGrKey].$max = "$" + aggGrKey; + } else { + throw "Unrecognised keyword. We only accept $min, $max, $sum and $avg"; + } + } + } + + reaggregation = [ + {"$match": {_id: {$gte: myutils.objectIdFromDate(t0_5m), $lt: myutils.objectIdFromDate(t0_5m + 300000)}}}, + {"$group": aggregationGroup} + ]; + //myLog(aggregationName,"Here is how the new aggregation for 5 minutes looks like: " + util.inspect(reaggregation, {color: true, depth: null})); + }, + core = function (aggregation, t0_5m, t1_5m) { + if (t1_5m <= lastPossibleAggregatableElementTimestamp) { + myLog(aggregationName, "Starting 5 minute aggregation from time: " + new Date(t0_5m)); + prepareReaggregation(aggregation); + aggregationsDB.collection(aggregationName + "_1m").aggregate(reaggregation, {}, function (error, result) { + if (error) { + throw error; } - console.log("Inserted events for 1 minute for aggregation " + aggregationName); + if (result.length > 0) { + myLog(aggregationName, "Finished aggregating documents from the 1 minute aggregation into the 5 minutes aggregation ... now inserting."); - //now run the 5 minute aggregation - //First find at what point to start - say last 5 minute aggregation - aggregationsDB.collection(aggregationName + "_5m").find({}, {}, {sort: {_id: -1}, limit: 1}).toArray(function (err, result) { - if (err) { - throw err; + //put _id in key + for (i = 0; i < result.length; i++) { + if (result[i]._id !== undefined && result[i]._id !== null) { + result[i].key = result[i]._id; + result[i].t = t0_5m; + delete result[i]._id; + } + else { + result.splice(i, 1); + i--; + } } - //noinspection JSLint - if (result !== null && result.length > 0) { - t0_5m = result[0].t.getTime() + 300000; - executeAggregationCore5m(aggregation, aggregationName, t0_5m, t1_1m, reagregatable); + if (result.length > 0) { + save(aggregationName, result, "5m"); } else { - //no 5 minute aggregation yet - //check the first 1 minute aggregation available - aggregationsDB.collection(aggregationName + "_1m").find({}, {}, {sort: {_id: 1}, limit: 1}).toArray(function (err, result) { - if (err) { - throw err; - } - if (result === null || result.length === 0) { - throw "Cannot find a 1 minute aggregation. This should never happen as this is only triggered after inserting a 1 minute aggregation"; - } + save(aggregationName, new Array({t: t0_5m}), "5m"); + } + } + else { + myLog(aggregationName, "Still inserting an empty entry with t :" + t0_5m + " so we can keep track of what ran when"); + save(aggregationName, new Array({t: t0_5m}), "5m"); + } + executeAggregationCore1h(aggregation, aggregationName, t0_5m); + }); + } else { + lock[aggregationName].inProgress = false; + } - t0_5m = result[0].t.getTime() - result[0].t.getTime() % 300000; // the t field represents the beginning of the time slot - executeAggregationCore5m(aggregation, aggregationName, t0_5m, t1_1m, reagregatable); - }); + }; + determineTimestamps(core); + }, + + executeAggregationCore1m = function (aggregation, aggregationName, lastPossibleAggregatableElementTimestamp) { + var t0_1m, + i, + t1_1m, + determineTimestamps = function (callback) { + //try to continue from where we left off + aggregationsDB.collection(aggregationName + "_1m").find({}, {}, {sort: {_id: -1}, limit: 1}).toArray(function (error, result) { + if (error) { + throw error; + } + + if (result === null || result.length === 0) { + //no previous aggregations were found. look for the first event of the aggregation type and start from there + eventsDB.collection("events").find({type: aggregation[0].$match.type}, {}, {sort: {_id: 1}, limit: 1}).toArray(function (error, result) { + if (error) { + throw error; + } + + if (!Array.isArray(result) || result.length === 0) { + myLog(aggregationName, "There are no events of type " + aggregation[0].$match.type + ", so we cannot aggregate."); + t0_1m = undefined; + } else { + t0_1m = result[0].t; + t0_1m = t0_1m - t0_1m % 60000; + } + + if (t0_1m === undefined) { + lock[aggregationName].inProgress = false; + } else { + t1_1m = t0_1m + 60000; + callback(aggregation, t0_1m, t1_1m); } }); + } else { + // the t field represents the beginning of the time slot + // start aggregating from the next slot + t0_1m = result[0].t + 60000; + t1_1m = t0_1m + 60000; + callback(aggregation, t0_1m, t1_1m); } - ); - } - else { - console.log("Still inserting an empty entry with t :" + t0_1m + " so we can keep track of what ran"); - //noinspection JSLint - aggregationsDB.collection(aggregationName + "_1m").insert({t: t0_1m}, {w: 1}, function (err, result) { - if (err) { - throw err; - } - lock = false; }); + }, + core = function (aggregation, t0_1m, t1_1m) { + if (t1_1m <= lastPossibleAggregatableElementTimestamp) { + myLog(aggregationName, "Starting 1 minute aggregation from time: " + new Date(t0_1m)); + aggregation[0].$match._id = {"$gte": myutils.objectIdFromDate(t0_1m), "$lt": myutils.objectIdFromDate(t1_1m)}; + eventsDB.collection("events").aggregate(aggregation, {}, function (error, result) { + if (error) { + throw error; + } + if (result.length > 0) { + myLog(aggregationName, "Finished aggregating documents from the events collection into the 1 minute aggregation ... now inserting."); + + //put _id in key + for (i = 0; i < result.length; i++) { + if (result[i]._id !== undefined && result[i]._id !== null) { + result[i].key = result[i]._id; + result[i].t = t0_1m; + delete result[i]._id; + } + else { + result.splice(i, 1); + i--; + } + } + if (result.length > 0) { + save(aggregationName, result, "1m"); + } + else { + save(aggregationName, new Array({t: t0_1m}), "1m"); + } + } + else { + myLog(aggregationName, "Still inserting an empty entry with t :" + t0_1m + " so we can keep track of what ran when"); + //noinspection JSLint + save(aggregationName, new Array({t: t0_1m}), "1m"); + } + executeAggregationCore5m(aggregation, aggregationName, t0_1m); + }); + } + else { + lock[aggregationName].inProgress = false; + } + }; + + determineTimestamps(core); + }, + + createCappedCollectionsForAggregation = function (aggregationName) { + var intervals = ["1m", "5m", "1h"], + createCollection = function (callback, i) { + if (intervals[i] === undefined) { + lock[aggregationName].collectionsCreated = 1; + } + else { + aggregationsDB.collectionNames(aggregationName + "_" + intervals[i], {}, function (error, results) { + if (error) { + myLog(aggregationName, "Error: " + util.inspect(error)); + } else { + if (results.length === 0) { + //noinspection JSLint,JSUnusedLocalSymbols + aggregationsDB.createCollection(aggregationName + "_" + intervals[i], {capped: true, autoIndexId: true, size: mongoConfigs.aggregations.collectionSize}, function (error, result) { + if (error) { + myLog(aggregationName, "Error: " + util.inspect(error)); + } else { + myLog(aggregationName, "Created capped collection " + aggregationName + "_" + intervals[i] + " with a size of " + mongoConfigs.aggregations.collectionSize + " ..."); + } + callback(callback, i + 1); + }); + } + else { + myLog(aggregationName, "Collection " + aggregationName + "_" + intervals[i] + " already exists."); + callback(callback, i + 1); + } + } + }); + } + }; + if (lock[aggregationName].collectionsCreated === 0) { + lock[aggregationName].collectionsCreated = 0.5; + myLog(aggregationName, "Preparing to create collections"); + createCollection(createCollection, 0); + } + }, + + executeAggregation = function (aggregation, aggregationName) { + myLog(aggregationName, "---***---***---***--- Starting new iteration... ---***---***---***---"); + createCappedCollectionsForAggregation(aggregationName); + if (lock[aggregationName].inProgress !== true && lock[aggregationName].collectionsCreated === 1) { + lock[aggregationName].inProgress = true; + + if (lock[aggregationName].collectionsCreated === 1) { + executeAggregationCore1m(aggregation, aggregationName, new Date().getTime()); } - }); - } - else { - lock = false; - } - } - - function executeAggregation(aggregationObject) { - var keys = 0, - aggregation, - keyName, - t0, - aggregationName, - reagregatable = false; - - if (lock !== true) { - lock = true; + } + else { + myLog(aggregationName, "Will not run as another aggregation is in progress or the capped collections are not yet in place"); + } + }, + + validateAndRunAggregation = function (aggregationObject, interval) { + var keys = 0, + aggregation, + keyName, + aggregationName, + reagregatable = false; + if (typeof aggregationObject === "object") { for (keyName in aggregationObject) { if (aggregationObject.hasOwnProperty(keyName)) { - if (keyName !== "reagragatable") { + if (keyName !== "reaggragatable") { aggregationName = keyName; keys++; aggregation = aggregationObject[keyName]; @@ -222,42 +449,29 @@ exports.register = function (dbs, endpoints, options) { } } } + if (!reagregatable) { + throw "Reaggregatable aggregations are not yet implemented"; + } + if (keys !== 1) { throw "Too many keys"; } - //get the last appearance of this aggregation and start from there - aggregationsDB.collection(aggregationName + "_1m").find({}, {}, {sort: {_id: -1}, limit: 1}).toArray(function (err, result) { - if (err) { - throw err; - } - if (result === null || result.length === 0) { - //no aggregations were found - eventsDB.collection("events").find({}, {}, {sort: {_id: 1}, limit: 1}).toArray(function (err, result) { - if (err) { - throw err; - } + if (!(aggregation && aggregation[0].$match && aggregation[0].$match.type)) { + throw "You are missing the $match.type for aggregation " + aggregationName; + } - if (result === null || result.length === 0) { - throw "There are no events, so we cannot aggregate"; - } - t0 = result[0].t.getTime(); - t0 = t0 - t0 % 60000; - executeAggregationCore1m(aggregation, aggregationName, t0, reagregatable); - }); - } else { - t0 = result[0].t.getTime() + 60000; // the t field represents the beginning of the time slot - executeAggregationCore1m(aggregation, aggregationName, t0, reagregatable); - } - }); - } else { - console.log("Will not run as another aggregation is in progress"); - } - } - - options.aggregations.forEach(function (aggregation) { - setInterval(function () { - executeAggregation(aggregation); - }, 1000); + lock[aggregationName] = {}; + lock[aggregationName].collectionsCreated = 0; + + setInterval(function () { + executeAggregation(aggregation, aggregationName); + }, interval); + }; + + options.aggregations.forEach(function (aggregationObject) { + validateAndRunAggregation(aggregationObject, 20); }); -}; \ No newline at end of file +} +; + diff --git a/lib/cube/collector.js b/lib/cube/collector.js index c2754a8..9c726ba 100644 --- a/lib/cube/collector.js +++ b/lib/cube/collector.js @@ -28,11 +28,8 @@ function post(putter) { //noinspection JSLint exports.register = function (dbs, endpoints, options) { - if (dbs.length !== 1) { - throw "dbs param should be an array with one element"; - } - var db = dbs[0], - putter = require("./event.js").putterInit(db), + var db = dbs.events, + putter = require("./event.js").putterInit(db, options), poster = post(putter); endpoints.ws.push( diff --git a/lib/cube/database.js b/lib/cube/database.js index 1d7cb9b..2b7c80d 100644 --- a/lib/cube/database.js +++ b/lib/cube/database.js @@ -1,18 +1,27 @@ "use strict"; -var mongodb = require("mongodb"); +var mongodb = require("mongodb"), + mongoConfigs = require("../../bin/databases-config"), + database = module.exports = {}, + myConnect = function (url, options, name, callback) { + mongodb.Db.connect(url, options, function (error, db) { + callback(error, db, name); + }); + }; -var database = module.exports = {}; +database.openConnections = function (callback) { + var mongoDBType, + url, + options, + mongoConfig; -database.openConnections = function (config, callback) { - var mongoConfigs = config.mongo; - if (!Array.isArray(mongoConfigs)) { - throw "Bad configuration of mongo"; - } - mongoConfigs.forEach(function (mongoConfig) { - var url = database.configurl(mongoConfig), + for (mongoDBType in mongoConfigs) { + if (mongoConfigs.hasOwnProperty(mongoDBType)) { + mongoConfig = mongoConfigs[mongoDBType]; + url = database.configurl(mongoConfig); options = mongoConfig["mongo-options"] || database.configOptions(mongoConfig); - mongodb.Db.connect(url, options, callback); - }); + myConnect(url, options, mongoDBType, callback); + } + } }; database.configurl = function (config) { diff --git a/lib/cube/disperser.js b/lib/cube/disperser.js index f48811b..f6c2e5a 100644 --- a/lib/cube/disperser.js +++ b/lib/cube/disperser.js @@ -9,11 +9,8 @@ var endpoint = require("./endpoint.js"), //noinspection JSLint exports.register = function (dbs, endpoints, options) { - if (dbs.length !== 2) { - throw "We need to receive exactly 2(two) databases"; - } - var eventsDB = dbs[0], - aggregationsDB = dbs[1], + var eventsDB = dbs.events, + aggregationsDB = dbs.aggregations, eventGetter = require("./event.js").getterInit(eventsDB), aggregationGetter = require("./aggregation.js").getterInit(aggregationsDB); diff --git a/lib/cube/event.js b/lib/cube/event.js index 66cc452..a1f2ca7 100644 --- a/lib/cube/event.js +++ b/lib/cube/event.js @@ -2,12 +2,17 @@ var mongodb = require("mongodb"), type_re = /^[a-z][a-zA-Z0-9_]+$/, genericGetter = require("./genericGetter.js"), - eventsSize = 1024 * 1024 * 1024 * 20; -exports.putterInit = function (db) { + myutils = require("./myutils.js"); + +exports.putterInit = function (db, options) { var eventsCollectionCreated = 0, eventsToSave = [], - event; + event, + collectionSize = options.collectionSize; + if (myutils.isInt(collectionSize)) { + throw "Invalid collection size: " + collectionSize; + } function handle(error) { if (error) { throw error; @@ -18,8 +23,8 @@ exports.putterInit = function (db) { db.collection("events").insert(event, {w: 0}); } - function putter(request, callback) { - var time = request.hasOwnProperty("time") ? new Date(request.time) : new Date(); + function putter(request, messageSenderCallback) { + var time = new Date().getTime(); function saveEvents() { eventsToSave.forEach(function (event) { @@ -28,13 +33,13 @@ exports.putterInit = function (db) { eventsToSave = []; } - // Validate the date and type. + // validations if (!type_re.test(request.type)) { - callback({error: "invalid type"}); + messageSenderCallback({error: "invalid type"}); return -1; } if (isNaN(time)) { - callback({error: "invalid time"}); + messageSenderCallback({error: "invalid time"}); return -1; } @@ -73,16 +78,12 @@ exports.putterInit = function (db) { eventsCollectionCreated = 1; return saveEvents(); } - var events = db.collection("events"); - // Events are indexed by time. - db.createCollection("events", {capped: true, autoIndexId: true, size: eventsSize}, function (err, result) { - handle(err); - db.collection("events").ensureIndex({"t": 1}, function (err, result) { - handle(err); - eventsCollectionCreated = 1; - saveEvents(); - }); + // Events are indexed by time which is _id, which is natural order. + db.createCollection("events", {capped: true, autoIndexId: true, size: collectionSize}, function (error, result) { + handle(error); + eventsCollectionCreated = 1; + saveEvents(); }); }); } diff --git a/lib/cube/genericGetter.js b/lib/cube/genericGetter.js index 2f99b44..8b15b3b 100644 --- a/lib/cube/genericGetter.js +++ b/lib/cube/genericGetter.js @@ -1,12 +1,11 @@ "use strict"; -var util = require("util"), - mongoutils = require("./mongoutils.js"), +var myutils = require("./myutils.js"), mongodb = require("mongodb"), type_re = /^[a-z][a-zA-Z0-9_]+$/, MAX_RETURNED_RECORDS = 10000; function customQuery(collectionObj, filter, sort, limit, batchSize, streamified, documentHandler) { // set MongoDB cursor options - var cursorOptions, + var cursorOptions={}, cursor, stream; @@ -51,7 +50,6 @@ function customQuery(collectionObj, filter, sort, limit, batchSize, streamified, // A null name indicates that there are no more results. if (document) { - console.log(util.inspect(document)); documentHandler({id: document._id instanceof mongodb.ObjectID ? undefined : document._id, time: document.t, data: document.d}); } else { @@ -103,9 +101,7 @@ module.exports = function (db) { documentHandler({error: "invalid stop"}); return -1; } - stop = new Date(stop); } - start = new Date(start); // Set an optional limit on the number of documents to return. sort = {_id: -1}; @@ -119,9 +115,9 @@ module.exports = function (db) { } // Copy any expression filters into the match object. - filter = {_id: {$gte: mongoutils.objectIdFromDate(start)}}; + filter = {_id: {$gte: myutils.objectIdFromDate(start)}}; if (stop !== undefined) { - filter._id.$lt = mongoutils.objectIdFromDate(stop); + filter._id.$lt = myutils.objectIdFromDate(stop); } if (name === undefined) { diff --git a/lib/cube/mongoutils.js b/lib/cube/myutils.js similarity index 79% rename from lib/cube/mongoutils.js rename to lib/cube/myutils.js index 4d66c12..4377a07 100644 --- a/lib/cube/mongoutils.js +++ b/lib/cube/myutils.js @@ -12,4 +12,7 @@ module.exports.epochFromObjectId = function (obj) { }; module.exports.epochMSFromObjectId = function (obj) { return parseInt(obj.valueOf().slice(0, 8), 16); +}; +module.exports.isInt = function (n) { + return typeof n === 'number' && parseFloat(n) === parseInt(n, 10) && !isNaN(n); }; \ No newline at end of file diff --git a/lib/cube/server.js b/lib/cube/server.js index 68edb66..95931b0 100644 --- a/lib/cube/server.js +++ b/lib/cube/server.js @@ -22,6 +22,7 @@ var util = require("util"), //}; function ignore() { + console.log("Ignoring..."); // Responses for UDP are ignored; there's nowhere for them to go! } @@ -64,7 +65,7 @@ module.exports = function (options) { i, n, endpoint, - callback; + messageSender; // Forward messages to the appropriate endpoint, or close the connection. n = endpoints.ws.length; @@ -75,23 +76,23 @@ module.exports = function (options) { } } if (foundMatch) { - callback = function (response) { + messageSender = function (response) { socket.send(JSON.stringify(response)); }; - callback.id = ++id; + messageSender.id = ++id; // Listen for socket disconnect. if (endpoint.dispatch.close) { socket.on("end", function () { - endpoint.dispatch.close(callback); + endpoint.dispatch.close(messageSender); }); } socket.on("message", function (message) { console.log("Got message: " + message); console.log("Current WS status is " + socket.readyState); - endpoint.dispatch(JSON.parse(message), callback); + endpoint.dispatch(JSON.parse(message), messageSender); }); return; } @@ -131,17 +132,19 @@ module.exports = function (options) { } }); + server.dbLength = 0; server.start = function () { // Connect to mongodb. util.log("starting mongodb client"); - var dbs = []; + var dbs = {}; - database.openConnections(options, function (error, db) { + database.openConnections(function (error, db, name) { if (error) { throw error; } - dbs.push(db); - if (dbs.length === options.mongo.length) { + dbs[name] = db; + server.dbLength++; + if (server.dbLength === 2) { server.register(dbs, endpoints, options); if (options["http-port"] !== undefined) { util.log("starting http server on port " + options["http-port"]);