diff --git a/bin/aggregator.js b/bin/aggregator.js index 5678d9f..765a0d0 100644 --- a/bin/aggregator.js +++ b/bin/aggregator.js @@ -1,10 +1,10 @@ "use strict"; var options = require("./aggregator-config.js"), - cube = require("../"), - server = cube.server(options); + analytics = require("../"), + server = analytics.server(options); server.register = function (dbs, endpoints, options) { - cube.aggregator.register(dbs, endpoints, options); + analytics.aggregator.register(dbs, endpoints, options); }; server.start(); diff --git a/bin/collector.js b/bin/collector.js index b3083e9..c6af6b7 100644 --- a/bin/collector.js +++ b/bin/collector.js @@ -1,10 +1,10 @@ "use strict"; var options = require("./collector-config.js"), - cube = require("../"), - server = cube.server(options); + analytics = require("../"), + server = analytics.server(options); server.register = function (dbs, endpoints, options) { - cube.collector.register(dbs, endpoints, options); + analytics.collector.register(dbs, endpoints, options); }; server.start(); diff --git a/bin/databases-config.js b/bin/databases-config.js index 062edde..b031dad 100644 --- a/bin/databases-config.js +++ b/bin/databases-config.js @@ -1,15 +1,15 @@ module.exports = { events: { - "mongo-host": "192.168.56.101", + "mongo-host": "localhost", "mongo-port": 27017, - "mongo-database": "shopcade_cube_events", + "mongo-database": "events", "mongo-username": null, "mongo-password": null }, aggregations: { - "mongo-host": "192.168.56.101", + "mongo-host": "localhost", "mongo-port": 27017, - "mongo-database": "shopcade_cube_aggregations", + "mongo-database": "aggregations", "mongo-username": null, "mongo-password": null, "collectionSize": 256 * 1024 * 1024 diff --git a/bin/disperser.js b/bin/disperser.js index 6820737..9c5d9a3 100644 --- a/bin/disperser.js +++ b/bin/disperser.js @@ -1,10 +1,10 @@ "use strict"; var options = require("./disperser-config.js"), - cube = require("../"), - server = cube.server(options); + analytics = require("../"), + server = analytics.server(options); server.register = function (dbs, endpoints, options) { - cube.disperser.register(dbs, endpoints, options); + analytics.disperser.register(dbs, endpoints, options); }; server.start(); diff --git a/lib/cube/aggregation.js b/lib/cube/aggregation.js deleted file mode 100644 index 6805440..0000000 --- a/lib/cube/aggregation.js +++ /dev/null @@ -1,8 +0,0 @@ -"use strict"; -var mongodb = require("mongodb"), - genericGetter = require("./genericGetter.js"); - -exports.getterInit = function (aggregationDB) { - return genericGetter(aggregationDB); -}; - diff --git a/lib/cube/aggregator.js b/lib/cube/aggregator.js deleted file mode 100644 index a941f37..0000000 --- a/lib/cube/aggregator.js +++ /dev/null @@ -1,476 +0,0 @@ -"use strict"; -/* - * Aggregations have fixed times of execution: 1m, 5m and 1h - */ -var util = require('util'), - myutils = require("./myutils.js"), - mongodb = require("mongodb"), - mongoConfigs = require("../../bin/databases-config"), - lock = {}, - myLog = function (param1, param2) { - console.log(new Date() + " Agg: " + param1 + " :: " + param2); - }; -//noinspection JSLint -exports.register = function (dbs, endpoints, options) { - var eventsDB = dbs.events, - aggregationsDB = dbs.aggregations, - save = function (aggregationName, documentsArray, interval) { - var i; - for (i = 0; i < documentsArray.length; i++) { - if (documentsArray[i] === undefined || documentsArray[i].t === undefined) { - myLog(aggregationName, util.inspect(documentsArray)); - throw "Failed to get t"; - } - documentsArray[i]._id = new mongodb.ObjectID(documentsArray[i].t / 1000); - } - //noinspection JSLint - aggregationsDB.collection(aggregationName + "_" + interval).insert(documentsArray, {w: 1}, function (error, result) { - if (error) { - throw error; - } - myLog(aggregationName, "Inserted " + documentsArray.length + " documents for the " + interval + " interval"); - } - ); - }, - executeAggregationCore1h = function (aggregation, aggregationName, lastPossibleAggregatableElementTimestamp) { - var reaggregation, - aggregationGroup, - t0_1h, - t1_1h, - i, - determineTimestamps = function (callback) { - //try to continue from where we left off - aggregationsDB.collection(aggregationName + "_1h").find({}, {}, {sort: {_id: -1}, limit: 1}).toArray(function (error, result) { - if (error) { - throw error; - } - - if (result === null || result.length === 0) { - //no previous aggregations were found. look for the first event of the aggregation type and start from there - aggregationsDB.collection(aggregationName + "_5m").find({}, {}, {sort: {_id: 1}, limit: 1}).toArray(function (error, result) { - if (error) { - throw error; - } - - if (!Array.isArray(result) || result.length === 0) { - myLog(aggregationName, "There are no 5m records, so we cannot aggregate for 1 hour."); - t0_1h = undefined; - } else { - t0_1h = result[0].t; - t0_1h = t0_1h - t0_1h % 3600000; - } - - if (t0_1h === undefined) { - lock[aggregationName].inProgress = false; - } else { - t1_1h = t0_1h + 3600000; - callback(aggregation, t0_1h, t1_1h); - } - }); - } else { - // the t field represents the beginning of the time slot - // start aggregating from the next slot - t0_1h = result[0].t + 3600000; - t1_1h = t0_1h + 3600000; - callback(aggregation, t0_1h, t1_1h); - } - }); - }, - prepareReaggregation = function (aggregation) { - var aggGrKey; - for (i = 0; i < aggregation.length; i++) { - if (aggregation[i].$group !== undefined) { - aggregationGroup = JSON.parse(JSON.stringify(aggregation[i].$group)); - break; - } - } - if (aggregationGroup === undefined) { - throw "Could not find $group statement for aggregation " + aggregationName; - } - aggregationGroup._id = "$key"; - - //myLog(aggregationName,"Replacing $group params with new ones for reaggregating ..."); - //we can handle $sum,$avg,$min,$max:x - where x is not the same field or is a constant - for (aggGrKey in aggregationGroup) { - if (aggregationGroup.hasOwnProperty(aggGrKey) && aggGrKey !== "_id") { - if (aggregationGroup[aggGrKey].$sum !== undefined) { - aggregationGroup[aggGrKey].$sum = "$" + aggGrKey; - } else if (aggregationGroup[aggGrKey].$avg !== undefined) { - delete aggregationGroup[aggGrKey].$avg; - aggregationGroup[aggGrKey].$sum = "$" + aggGrKey + "/12"; - } else if (aggregationGroup[aggGrKey].$min !== undefined) { - aggregationGroup[aggGrKey].$min = "$" + aggGrKey; - } else if (aggregationGroup[aggGrKey].$max !== undefined) { - aggregationGroup[aggGrKey].$max = "$" + aggGrKey; - } else { - throw "Unrecognised keyword. We only accept $min, $max, $sum and $avg"; - } - } - } - - reaggregation = [ - {"$match": {_id: {$gte: myutils.objectIdFromDate(t0_1h), $lt: myutils.objectIdFromDate(t0_1h + 3600000)}}}, - {"$group": aggregationGroup} - ]; - //myLog(aggregationName,"Here is how the new aggregation for 1 hour looks like: " + util.inspect(reaggregation, {color: true, depth: null})); - }, - core = function (aggregation, t0_1h, t1_1h) { - if (t1_1h <= lastPossibleAggregatableElementTimestamp) { - myLog(aggregationName, "Starting 1 hour aggregation from time: " + new Date(t0_1h)); - prepareReaggregation(aggregation); - aggregationsDB.collection(aggregationName + "_5m").aggregate(reaggregation, {}, function (error, result) { - if (error) { - throw error; - } - if (result.length > 0) { - myLog(aggregationName, "Finished aggregating documents from the 5 minute aggregation into the 1 hour aggregation ... now inserting."); - - //put _id in key - for (i = 0; i < result.length; i++) { - if (result[i]._id !== undefined && result[i]._id !== null) { - result[i].key = result[i]._id; - result[i].t = t0_1h; - delete result[i]._id; - } - else { - result.splice(i, 1); - i--; - } - } - if (result.length > 0) { - save(aggregationName, result, "1h"); - } - else { - save(aggregationName, new Array({t: t0_1h}), "1h"); - } - } - else { - myLog(aggregationName, "Still inserting an empty entry with t :" + t0_1h + " so we can keep track of what ran when"); - save(aggregationName, new Array({t: t0_1h}), "1h"); - } - lock[aggregationName].inProgress = false; - - }); - - } else { - lock[aggregationName].inProgress = false; - } - - }; - determineTimestamps(core); - }, - - executeAggregationCore5m = function (aggregation, aggregationName, lastPossibleAggregatableElementTimestamp) { - var reaggregation, - aggregationGroup, - t0_5m, - t1_5m, - i, - determineTimestamps = function (callback) { - //try to continue from where we left off - aggregationsDB.collection(aggregationName + "_5m").find({}, {}, {sort: {_id: -1}, limit: 1}).toArray(function (error, result) { - if (error) { - throw error; - } - - if (result === null || result.length === 0) { - //no previous aggregations were found. look for the first event of the aggregation type and start from there - aggregationsDB.collection(aggregationName + "_1m").find({}, {}, {sort: {_id: 1}, limit: 1}).toArray(function (error, result) { - if (error) { - throw error; - } - - if (!Array.isArray(result) || result.length === 0) { - myLog(aggregationName, "There are no 1m records, so we cannot aggregate for 5 minutes."); - t0_5m = undefined; - } else { - t0_5m = result[0].t; - t0_5m = t0_5m - t0_5m % 300000; - } - - if (t0_5m === undefined) { - lock[aggregationName].inProgress = false; - } else { - t1_5m = t0_5m + 300000; - callback(aggregation, t0_5m, t1_5m); - } - }); - } else { - // the t field represents the beginning of the time slot - // start aggregating from the next slot - t0_5m = result[0].t + 300000; - t1_5m = t0_5m + 300000; - callback(aggregation, t0_5m, t1_5m); - } - }); - }, - prepareReaggregation = function (aggregation) { - var aggGrKey; - for (i = 0; i < aggregation.length; i++) { - if (aggregation[i].$group !== undefined) { - aggregationGroup = JSON.parse(JSON.stringify(aggregation[i].$group)); - break; - } - } - if (aggregationGroup === undefined) { - throw "Could not find $group statement for aggregation " + aggregationName; - } - aggregationGroup._id = "$key"; - - //myLog(aggregationName, "Replacing $group params with new ones for reaggregating"); - //we can handle $sum,$avg,$min,$max:x - where x is not the same field or is a constant - for (aggGrKey in aggregationGroup) { - if (aggregationGroup.hasOwnProperty(aggGrKey) && aggGrKey !== "_id") { - if (aggregationGroup[aggGrKey].$sum !== undefined) { - aggregationGroup[aggGrKey].$sum = "$" + aggGrKey; - } else if (aggregationGroup[aggGrKey].$avg !== undefined) { - delete aggregationGroup[aggGrKey].$avg; - aggregationGroup[aggGrKey].$sum = "$" + aggGrKey + "/5"; - } else if (aggregationGroup[aggGrKey].$min !== undefined) { - aggregationGroup[aggGrKey].$min = "$" + aggGrKey; - } else if (aggregationGroup[aggGrKey].$max !== undefined) { - aggregationGroup[aggGrKey].$max = "$" + aggGrKey; - } else { - throw "Unrecognised keyword. We only accept $min, $max, $sum and $avg"; - } - } - } - - reaggregation = [ - {"$match": {_id: {$gte: myutils.objectIdFromDate(t0_5m), $lt: myutils.objectIdFromDate(t0_5m + 300000)}}}, - {"$group": aggregationGroup} - ]; - //myLog(aggregationName,"Here is how the new aggregation for 5 minutes looks like: " + util.inspect(reaggregation, {color: true, depth: null})); - }, - core = function (aggregation, t0_5m, t1_5m) { - if (t1_5m <= lastPossibleAggregatableElementTimestamp) { - myLog(aggregationName, "Starting 5 minute aggregation from time: " + new Date(t0_5m)); - prepareReaggregation(aggregation); - aggregationsDB.collection(aggregationName + "_1m").aggregate(reaggregation, {}, function (error, result) { - if (error) { - throw error; - } - if (result.length > 0) { - myLog(aggregationName, "Finished aggregating documents from the 1 minute aggregation into the 5 minutes aggregation ... now inserting."); - - //put _id in key - for (i = 0; i < result.length; i++) { - if (result[i]._id !== undefined && result[i]._id !== null) { - result[i].key = result[i]._id; - result[i].t = t0_5m; - delete result[i]._id; - } - else { - result.splice(i, 1); - i--; - } - } - if (result.length > 0) { - save(aggregationName, result, "5m"); - } - else { - save(aggregationName, new Array({t: t0_5m}), "5m"); - } - } - else { - myLog(aggregationName, "Still inserting an empty entry with t :" + t0_5m + " so we can keep track of what ran when"); - save(aggregationName, new Array({t: t0_5m}), "5m"); - } - executeAggregationCore1h(aggregation, aggregationName, t0_5m); - }); - } else { - lock[aggregationName].inProgress = false; - } - - }; - determineTimestamps(core); - }, - - executeAggregationCore1m = function (aggregation, aggregationName, lastPossibleAggregatableElementTimestamp) { - var t0_1m, - i, - t1_1m, - determineTimestamps = function (callback) { - //try to continue from where we left off - aggregationsDB.collection(aggregationName + "_1m").find({}, {}, {sort: {_id: -1}, limit: 1}).toArray(function (error, result) { - if (error) { - throw error; - } - - if (result === null || result.length === 0) { - //no previous aggregations were found. look for the first event of the aggregation type and start from there - eventsDB.collection("events").find({type: aggregation[0].$match.type}, {}, {sort: {_id: 1}, limit: 1}).toArray(function (error, result) { - if (error) { - throw error; - } - - if (!Array.isArray(result) || result.length === 0) { - myLog(aggregationName, "There are no events of type " + aggregation[0].$match.type + ", so we cannot aggregate."); - t0_1m = undefined; - } else { - t0_1m = result[0].t; - t0_1m = t0_1m - t0_1m % 60000; - } - - if (t0_1m === undefined) { - lock[aggregationName].inProgress = false; - } else { - t1_1m = t0_1m + 60000; - callback(aggregation, t0_1m, t1_1m); - } - }); - } else { - // the t field represents the beginning of the time slot - // start aggregating from the next slot - t0_1m = result[0].t + 60000; - t1_1m = t0_1m + 60000; - callback(aggregation, t0_1m, t1_1m); - } - }); - }, - core = function (aggregation, t0_1m, t1_1m) { - if (t1_1m <= lastPossibleAggregatableElementTimestamp) { - myLog(aggregationName, "Starting 1 minute aggregation from time: " + new Date(t0_1m)); - aggregation[0].$match._id = {"$gte": myutils.objectIdFromDate(t0_1m), "$lt": myutils.objectIdFromDate(t1_1m)}; - eventsDB.collection("events").aggregate(aggregation, {}, function (error, result) { - if (error) { - throw error; - } - if (result.length > 0) { - myLog(aggregationName, "Finished aggregating documents from the events collection into the 1 minute aggregation ... now inserting."); - - //put _id in key - for (i = 0; i < result.length; i++) { - if (result[i]._id !== undefined && result[i]._id !== null) { - result[i].key = result[i]._id; - result[i].t = t0_1m; - delete result[i]._id; - } - else { - result.splice(i, 1); - i--; - } - } - if (result.length > 0) { - save(aggregationName, result, "1m"); - } - else { - save(aggregationName, new Array({t: t0_1m}), "1m"); - } - } - else { - myLog(aggregationName, "Still inserting an empty entry with t :" + t0_1m + " so we can keep track of what ran when"); - //noinspection JSLint - save(aggregationName, new Array({t: t0_1m}), "1m"); - } - executeAggregationCore5m(aggregation, aggregationName, t0_1m); - }); - } - else { - lock[aggregationName].inProgress = false; - } - }; - - determineTimestamps(core); - }, - - createCappedCollectionsForAggregation = function (aggregationName) { - var intervals = ["1m", "5m", "1h"], - createCollection = function (callback, i) { - if (intervals[i] === undefined) { - lock[aggregationName].collectionsCreated = 1; - } - else { - aggregationsDB.collectionNames(aggregationName + "_" + intervals[i], {}, function (error, results) { - if (error) { - myLog(aggregationName, "Error: " + util.inspect(error)); - } else { - if (results.length === 0) { - //noinspection JSLint,JSUnusedLocalSymbols - aggregationsDB.createCollection(aggregationName + "_" + intervals[i], {capped: true, autoIndexId: true, size: mongoConfigs.aggregations.collectionSize}, function (error, result) { - if (error) { - myLog(aggregationName, "Error: " + util.inspect(error)); - } else { - myLog(aggregationName, "Created capped collection " + aggregationName + "_" + intervals[i] + " with a size of " + mongoConfigs.aggregations.collectionSize + " ..."); - } - callback(callback, i + 1); - }); - } - else { - myLog(aggregationName, "Collection " + aggregationName + "_" + intervals[i] + " already exists."); - callback(callback, i + 1); - } - } - }); - } - }; - - if (lock[aggregationName].collectionsCreated === 0) { - lock[aggregationName].collectionsCreated = 0.5; - myLog(aggregationName, "Preparing to create collections"); - createCollection(createCollection, 0); - } - }, - - executeAggregation = function (aggregation, aggregationName) { - myLog(aggregationName, "---***---***---***--- Starting new iteration... ---***---***---***---"); - createCappedCollectionsForAggregation(aggregationName); - if (lock[aggregationName].inProgress !== true && lock[aggregationName].collectionsCreated === 1) { - lock[aggregationName].inProgress = true; - - if (lock[aggregationName].collectionsCreated === 1) { - executeAggregationCore1m(aggregation, aggregationName, new Date().getTime()); - } - } - else { - myLog(aggregationName, "Will not run as another aggregation is in progress or the capped collections are not yet in place"); - } - }, - - validateAndRunAggregation = function (aggregationObject, interval) { - var keys = 0, - aggregation, - keyName, - aggregationName, - reagregatable = false; - - if (typeof aggregationObject === "object") { - for (keyName in aggregationObject) { - if (aggregationObject.hasOwnProperty(keyName)) { - if (keyName !== "reaggragatable") { - aggregationName = keyName; - keys++; - aggregation = aggregationObject[keyName]; - } else { - if (aggregationObject[keyName] === true) { - reagregatable = true; - } - } - } - } - } - if (!reagregatable) { - throw "Reaggregatable aggregations are not yet implemented"; - } - - if (keys !== 1) { - throw "Too many keys"; - } - - if (!(aggregation && aggregation[0].$match && aggregation[0].$match.type)) { - throw "You are missing the $match.type for aggregation " + aggregationName; - } - - lock[aggregationName] = {}; - lock[aggregationName].collectionsCreated = 0; - - setInterval(function () { - executeAggregation(aggregation, aggregationName); - }, interval); - }; - - options.aggregations.forEach(function (aggregationObject) { - validateAndRunAggregation(aggregationObject, 50); - }); -}; - diff --git a/lib/cube/collector.js b/lib/cube/collector.js deleted file mode 100644 index 9c726ba..0000000 --- a/lib/cube/collector.js +++ /dev/null @@ -1,44 +0,0 @@ -"use strict"; -var endpoint = require("./endpoint.js"); - -var headers = { - "Content-Type": "application/json", - "Access-Control-Allow-Origin": "*" -}; - -function post(putter) { - return function (request, response) { - var content = ""; - request.on("data", function (chunk) { - content += chunk; - }); - request.on("end", function () { - try { - JSON.parse(content).forEach(putter); - } catch (e) { - response.writeHead(400, headers); - response.end(JSON.stringify({error: e.toString()})); - return; - } - response.writeHead(200, headers); - response.end("{}"); - }); - }; -} - -//noinspection JSLint -exports.register = function (dbs, endpoints, options) { - var db = dbs.events, - putter = require("./event.js").putterInit(db, options), - poster = post(putter); - - endpoints.ws.push( - endpoint("/1.0/event/put", putter) - ); - - endpoints.http.push( - endpoint("POST", "/1.0/event/put", poster) - ); - - endpoints.udp = putter; -}; diff --git a/lib/cube/database.js b/lib/cube/database.js deleted file mode 100644 index 2b7c80d..0000000 --- a/lib/cube/database.js +++ /dev/null @@ -1,43 +0,0 @@ -"use strict"; -var mongodb = require("mongodb"), - mongoConfigs = require("../../bin/databases-config"), - database = module.exports = {}, - myConnect = function (url, options, name, callback) { - mongodb.Db.connect(url, options, function (error, db) { - callback(error, db, name); - }); - }; - -database.openConnections = function (callback) { - var mongoDBType, - url, - options, - mongoConfig; - - for (mongoDBType in mongoConfigs) { - if (mongoConfigs.hasOwnProperty(mongoDBType)) { - mongoConfig = mongoConfigs[mongoDBType]; - url = database.configurl(mongoConfig); - options = mongoConfig["mongo-options"] || database.configOptions(mongoConfig); - myConnect(url, options, mongoDBType, callback); - } - } -}; - -database.configurl = function (config) { - var user = config["mongo-username"], - pass = config["mongo-password"], - host = config["mongo-host"] || "localhost", - port = config["mongo-port"] || 27017, - name = config["mongo-database"] || "cube", - auth = user ? user + ":" + pass + "@" : ""; - return "mongodb://" + auth + host + ":" + port + "/" + name; -}; - -database.configOptions = function (config) { - return { - db: config["mongo-database-options"] || { safe: false }, - server: config["mongo-server-options"] || { auto_reconnect: true }, - replSet: { read_secondary: true } - }; -}; \ No newline at end of file diff --git a/lib/cube/disperser.js b/lib/cube/disperser.js deleted file mode 100644 index f6c2e5a..0000000 --- a/lib/cube/disperser.js +++ /dev/null @@ -1,86 +0,0 @@ -"use strict"; -var endpoint = require("./endpoint.js"), - url = require("url"), - LIMIT_MAX = 1e4, - headers = { - "Content-Type": "application/json", - "Access-Control-Allow-Origin": "*" - }; - -//noinspection JSLint -exports.register = function (dbs, endpoints, options) { - var eventsDB = dbs.events, - aggregationsDB = dbs.aggregations, - eventGetter = require("./event.js").getterInit(eventsDB), - aggregationGetter = require("./aggregation.js").getterInit(aggregationsDB); - - endpoints.ws.push( - endpoint("/1.0/event/get", eventGetter), - endpoint("/1.0/aggregation/get", aggregationGetter) - ); - - function eventGet(request, response) { - request = url.parse(request.url, true).query; - var data = []; - - if (!request.hasOwnProperty('start')) { - request.start = 0; - } - if ((request.hasOwnProperty('limit') && (request.limit >= LIMIT_MAX))) { - request.limit = LIMIT_MAX; - } - - function documentHandler(dataElem) { - if (dataElem === null) { - response.end(JSON.stringify(data.reverse())); - } - else { - data.push(dataElem); - } - } - - if (eventGetter(request, documentHandler) < 0) { - response.writeHead(400, headers); - response.end(JSON.stringify(data[0])); - } else { - response.writeHead(200, headers); - } - - } - - function aggregationGet(request, response) { - request = url.parse(request.url, true).query; - var data = []; - - if (!request.hasOwnProperty('start')) { - request.start = 0; - } - if ((request.hasOwnProperty('limit') && (request.limit >= LIMIT_MAX))) { - request.limit = LIMIT_MAX; - } - - function documentHandler(dataElem) { - if (dataElem === null) { - response.end(JSON.stringify(data.reverse())); - } - else { - data.push(dataElem); - } - } - - if (aggregationGetter(request, documentHandler) < 0) { - response.writeHead(400, headers); - response.end(JSON.stringify(data[0])); - } else { - response.writeHead(200, headers); - } - - } - - endpoints.http.push( - endpoint("GET", "/1.0/event/get", eventGet), - endpoint("GET", "/1.0/aggregation/get", aggregationGet) - ); - -}; - diff --git a/lib/cube/endpoint.js b/lib/cube/endpoint.js deleted file mode 100644 index 062a387..0000000 --- a/lib/cube/endpoint.js +++ /dev/null @@ -1,25 +0,0 @@ -"use strict"; -// creates an endpoint with given HTTP method, URL path and dispatch (function) -// (method argument is optional) -// endpoints are evaluated in server.js and -// dispatch(request, response) is called if path/method matches -module.exports = function (method, path, dispatch) { - var match; - - if (arguments.length < 3) { - dispatch = path; - path = method; - match = function (p) { - return p === path; - }; - } else { - match = function (p, m) { - return m === method && p === path; - }; - } - - return { - match: match, - dispatch: dispatch - }; -}; diff --git a/lib/cube/event.js b/lib/cube/event.js deleted file mode 100644 index a1f2ca7..0000000 --- a/lib/cube/event.js +++ /dev/null @@ -1,96 +0,0 @@ -"use strict"; -var mongodb = require("mongodb"), - type_re = /^[a-z][a-zA-Z0-9_]+$/, - genericGetter = require("./genericGetter.js"), - myutils = require("./myutils.js"); - -exports.putterInit = function (db, options) { - var eventsCollectionCreated = 0, - eventsToSave = [], - event, - collectionSize = options.collectionSize; - - if (myutils.isInt(collectionSize)) { - throw "Invalid collection size: " + collectionSize; - } - function handle(error) { - if (error) { - throw error; - } - } - - function save(event) { - db.collection("events").insert(event, {w: 0}); - } - - function putter(request, messageSenderCallback) { - var time = new Date().getTime(); - - function saveEvents() { - eventsToSave.forEach(function (event) { - save(event); - }); - eventsToSave = []; - } - - // validations - if (!type_re.test(request.type)) { - messageSenderCallback({error: "invalid type"}); - return -1; - } - if (isNaN(time)) { - messageSenderCallback({error: "invalid time"}); - return -1; - } - - // If an id is specified, promote it to Mongo's primary key. - event = {t: time, d: request.data, type: request.type}; - if (request.hasOwnProperty("id")) { - event._id = request.id; - } - // If eventsCollectionCreated, save immediately. - if (eventsCollectionCreated === 1) { - return save(event); - } - - // If someone is already creating the event collection - // then append this event to the queue for later save. - if (eventsCollectionCreated === 0.5) { - return eventsToSave.push(event); - } - eventsCollectionCreated = 0.5; - - // Otherwise, it's up to us to see if the collection exists, verify the - // associated indexes and save - // any events that have queued up in the interim! - - // First add the new event to the queue. - eventsToSave.push(event); - - // If the events collection exists, then we assume the indexes do - // too. Otherwise, we must create the required collections and indexes. - - db.collectionNames("events", {}, function (error, names) { - if (error) { - throw error; - } - if (names.length) { - eventsCollectionCreated = 1; - return saveEvents(); - } - - // Events are indexed by time which is _id, which is natural order. - db.createCollection("events", {capped: true, autoIndexId: true, size: collectionSize}, function (error, result) { - handle(error); - eventsCollectionCreated = 1; - saveEvents(); - }); - }); - } - - return putter; -}; - -exports.getterInit = function (eventsDB) { - return genericGetter(eventsDB); -}; diff --git a/lib/cube/genericGetter.js b/lib/cube/genericGetter.js deleted file mode 100644 index d9bf122..0000000 --- a/lib/cube/genericGetter.js +++ /dev/null @@ -1,196 +0,0 @@ -"use strict"; -var myutils = require("./myutils.js"), - mongodb = require("mongodb"), - type_re = /^[a-z][a-zA-Z0-9_]+$/, - MAX_RETURNED_RECORDS = 10000; -function customQuery(collectionObj, filter, sort, limit, batchSize, streamified, documentHandler) { - // set MongoDB cursor options - var cursorOptions = {}, - cursor, - stream; - - if (filter === undefined) { - filter = {}; - } - if (sort === undefined) { - sort = {$natural: -1}; - } - - if (streamified) { - cursorOptions = { - tailable: true, - awaitdata: true, - numberOfRetries: -1 - }; - } - - cursor = collectionObj.find(filter, cursorOptions); - if (streamified) { - stream = cursor.sort({$natural: -1}).stream(); - stream.on('data', function (document) { - documentHandler({id: document._id, time: document.t, data: (document.d === undefined ? document.key : document.d)}); - }); - } - else { - cursor = cursor.sort(sort).limit(limit).batchSize(batchSize); - cursor.each(function (error, document) { - if (documentHandler.closed) { - //noinspection JSLint - return cursor.close(function (error, result) { - if (error) { - throw error; - } - //do nothing - }); - } - - if (error) { - throw error; - } - - // A null name indicates that there are no more results. - if (document) { - documentHandler({id: document._id, time: document.t, data: (document.d === undefined ? document.key : document.d)}); - } - else { - documentHandler(null); - } - }); - } -} - -module.exports = function (db) { - var collection, - streamsByName = {}; - - function open(documentHandler) { - return !documentHandler.closed; - } - - function getter(request, documentHandler) { - - var stream = !request.hasOwnProperty("stop"), - start = new Date(request.start).getTime(), - stop = stream ? undefined : (request.stop ? (new Date(request.stop)).getTime() : undefined), - name = request.hasOwnProperty("name") ? request.name : undefined, - type = request.hasOwnProperty("type") ? request.type : undefined, - limit, - sort, - batchSize, - filter, - streams, - collectionName; - - // Validate the date and type. - if (!type_re.test(request.type)) { - documentHandler({error: "invalid type"}); - return -1; - } - - // Validate the dates. - if (isNaN(start)) { - documentHandler({error: "invalid start"}); - return -1; - } - if (isNaN(stop)) { - stop = undefined; - } - - if (!stream) { - if (stop === undefined) { - documentHandler({error: "invalid stop"}); - return -1; - } - } - - // Set an optional limit on the number of documents to return. - sort = {_id: -1}; - batchSize = 10000; - if (request.hasOwnProperty("limit")) { - limit = request.limit; - } - - if (limit !== undefined || limit > MAX_RETURNED_RECORDS) { - limit = MAX_RETURNED_RECORDS; - } - - // Copy any expression filters into the match object. - filter = {_id: {$gte: myutils.objectIdFromDate(start)}}; - if (stop !== undefined) { - filter._id.$lt = myutils.objectIdFromDate(stop); - } - - if (name === undefined) { - collectionName = "events"; - if (type !== undefined) { - filter.type = type; - } - else { - throw "We cannot query events without a type"; - } - } else { - collectionName = name; - } - - db.collectionNames(collectionName, {}, function (err, result) { - if (err) { - throw err; - } - if (result.length !== 1) { - throw "The number of results for collection: " + collectionName + " is different than 1"; - } - collection = db.collection(collectionName); - - // For streaming queries, share streams for efficient polling. - if (stream && collectionName && streamsByName) { - streams = streamsByName[collectionName]; - - if (streams && streams.waiting && Array.isArray(streams.waiting)) { - streams.waiting.push(documentHandler); - customQuery(collection, filter, sort, limit, batchSize, true, documentHandler); - } - else { - if (streamsByName && collectionName) { - streams = streamsByName[collectionName] = {waiting: [], active: [documentHandler]}; - customQuery(collection, filter, sort, limit, batchSize, true, function (document) { - - // If there's an name, send it to all active, open clients. - if (document) { - streams.active.forEach(function (documentHandler) { - if (!documentHandler.closed) { - documentHandler(document); - } - }); - } - else { - streams.active = streams.active.concat(streams.waiting).filter(open); - streams.waiting = []; - - // If no clients remain, then it's safe to delete the shared - // stream, and we'll no longer be responsible for polling. - if (!streams.active.length) { - delete streamsByName[collectionName]; - } - } - }); - } - } - } - else { - customQuery(collection, filter, sort, limit, batchSize, false, documentHandler); - - } - }); - - } - - getter.close = function (documentHandler) { - documentHandler.closed = true; - }; - - return getter; -}; - - - - diff --git a/lib/cube/index.js b/lib/cube/index.js deleted file mode 100644 index 8ad10a4..0000000 --- a/lib/cube/index.js +++ /dev/null @@ -1,5 +0,0 @@ -exports.server = require("./server.js"); -exports.collector = require("./collector.js"); -exports.disperser = require("./disperser.js"); -exports.aggregator = require("./aggregator.js"); -exports.endpoint = require("./endpoint.js"); diff --git a/lib/cube/myutils.js b/lib/cube/myutils.js deleted file mode 100644 index 4377a07..0000000 --- a/lib/cube/myutils.js +++ /dev/null @@ -1,18 +0,0 @@ -"use strict"; - -var mongodb = require("mongodb"); -module.exports.objectIdFromDate = function (ts) { - return new mongodb.ObjectID(Math.floor(ts / 1000).toString(16) + "0000000000000000"); -}; -module.exports.dateFromObjectId = function (obj) { - return obj.getTimestamp(); -}; -module.exports.epochFromObjectId = function (obj) { - return parseInt(obj.valueOf().slice(0, 8), 16); -}; -module.exports.epochMSFromObjectId = function (obj) { - return parseInt(obj.valueOf().slice(0, 8), 16); -}; -module.exports.isInt = function (n) { - return typeof n === 'number' && parseFloat(n) === parseInt(n, 10) && !isNaN(n); -}; \ No newline at end of file diff --git a/lib/cube/server.js b/lib/cube/server.js deleted file mode 100644 index 95931b0..0000000 --- a/lib/cube/server.js +++ /dev/null @@ -1,166 +0,0 @@ -"use strict"; -var util = require("util"), - url = require("url"), - http = require("http"), - dgram = require("dgram"), - websocket = require("ws"), - staticModule = require("node-static"), - database = require('./database.js'); - -// Configuration for WebSocket requests. - -//var wsOptions = { -// maxReceivedFrameSize: 0x10000, -// maxReceivedMessageSize: 0x100000, -// fragmentOutgoingMessages: true, -// fragmentationThreshold: 0x4000, -// keepalive: true, -// keepaliveInterval: 20000, -// assembleFragments: true, -// disableNagleAlgorithm: true, -// closeTimeout: 5000 -//}; - -function ignore() { - console.log("Ignoring..."); - // Responses for UDP are ignored; there's nowhere for them to go! -} - -module.exports = function (options) { - - // Don't crash on errors. - process.on("uncaughtException", function (error) { - util.log("uncaught exception: " + error); - util.log(error.stack); - }); - - var server = {}, - httpServer = http.createServer(), - wsOptions = {server: httpServer}, - WebsocketServer = websocket.Server, - file = new staticModule.Server("static"), - endpoints = {ws: [], http: []}, - id = 0, - wss = new WebsocketServer(wsOptions); - - // Register httpServer WebSocket listener with fallback. - // httpServer.on("upgrade", function (request, socket, head) { - // if ("sec-websocket-version" in request.headers) { - // request = new websocket.request(socket, request, wsOptions); - // request.readHandshake(); - // connect(request.accept(request.requestedProtocols[0], request.origin), request.httpRequest); - // } else if (request.method === "GET" && /^websocket$/i.test(request.headers.upgrade) && /^upgrade$/i.test(request.headers.connection)) { - // wss.Connection(wss.manager, wss.options, request, socket, head); - // } - // }); - - wss.on("upgrade", function () { - console.log("-----upgrade-----"); - }); - - // Register wss WebSocket listener. - wss.on("connection", function (socket) { - var url = socket.upgradeReq.url, - foundMatch = false, - i, - n, - endpoint, - messageSender; - - // Forward messages to the appropriate endpoint, or close the connection. - n = endpoints.ws.length; - for (i = 0; i < n; i++) { - if ((endpoint = endpoints.ws[i]).match(url)) { - foundMatch = true; - break; - } - } - if (foundMatch) { - messageSender = function (response) { - socket.send(JSON.stringify(response)); - }; - - messageSender.id = ++id; - - // Listen for socket disconnect. - if (endpoint.dispatch.close) { - socket.on("end", function () { - endpoint.dispatch.close(messageSender); - }); - } - - socket.on("message", function (message) { - console.log("Got message: " + message); - console.log("Current WS status is " + socket.readyState); - endpoint.dispatch(JSON.parse(message), messageSender); - }); - return; - } - socket.close(); - }); - - // Register HTTP listener. - httpServer.on("request", function (request, response) { - var u = url.parse(request.url), - i, - n, - endpoint; - - // Forward messages to the appropriate endpoint, or 404. - n = endpoints.http.length; - for (i = 0; i < n; i++) { - if ((endpoint = endpoints.http[i]).match(u.pathname, request.method)) { - endpoint.dispatch(request, response); - return; - } - } - - // If this request wasn't matched, see if there's a static file to serve. - request.on("end", function () { - file.serve(request, response, function (error) { - if (error) { - response.writeHead(error.status, {"Content-Type": "text/plain"}); - //noinspection JSLint - response.end(error.status + ""); - } - }); - }); - - // as of node v0.10, 'end' is not emitted unless read() called - if (request.read !== undefined) { - request.read(); - } - }); - - server.dbLength = 0; - server.start = function () { - // Connect to mongodb. - util.log("starting mongodb client"); - var dbs = {}; - - database.openConnections(function (error, db, name) { - if (error) { - throw error; - } - dbs[name] = db; - server.dbLength++; - if (server.dbLength === 2) { - server.register(dbs, endpoints, options); - if (options["http-port"] !== undefined) { - util.log("starting http server on port " + options["http-port"]); - httpServer.listen(options["http-port"]); - if (endpoints.udp && options["udp-port"] !== undefined) { - util.log("starting udp server on port " + options["udp-port"]); - var udp = dgram.createSocket("udp4"); - udp.on("message", function (message) { - endpoints.udp(JSON.parse(message.toString("utf8")), ignore); - }); - udp.bind(options["udp-port"]); - } - } - } - }); - }; - - return server; -}; diff --git a/static/collectd/index.html b/static/collectd/index.html deleted file mode 100644 index 8b9e139..0000000 --- a/static/collectd/index.html +++ /dev/null @@ -1,86 +0,0 @@ - - -
=a.length)return n;var r=[],i=o[e++];return n.forEach(function(n,i){r.push({key:n,values:t(i,e)})}),i?r.sort(function(n,t){return i(n.key,t.key)}):r}var e,r,u={},a=[],o=[];return u.map=function(t,e){return n(e,t,0)},u.entries=function(e){return t(n(oa.map,e,0),0)},u.key=function(n){return a.push(n),u},u.sortKeys=function(n){return o[a.length-1]=n,u},u.sortValues=function(n){return e=n,u},u.rollup=function(n){return r=n,u},u},oa.set=function(n){var t=new u;if(n)for(var e=0;e =0?n.substring(0,t):n,r=t>=0?n.substring(t+1):"in";return e=Yo.get(e)||Oo,r=Uo.get(r)||st,xr(r(e.apply(null,Array.prototype.slice.call(arguments,1))))},oa.interpolateHcl=Dr,oa.interpolateHsl=jr,oa.interpolateLab=Lr,oa.interpolateRound=Fr,oa.layout={},oa.layout.bundle=function(){return function(n){for(var t=[],e=-1,r=n.length;++e