From 99565162bc28ce7dfaeb9e661d41d4049c0d77bb Mon Sep 17 00:00:00 2001 From: Mircea Danila Dumitrescu Date: Mon, 19 May 2014 00:42:27 +0100 Subject: [PATCH] Almost done with the generator. Event insertion is now buffered. --- bin/databases-config.js | 4 +- bin/generator-config.js | 4 +- bin/generator.js | 14 ++++-- lib/analytics/event.js | 102 ++++++++++++++++++++++------------------ 4 files changed, 70 insertions(+), 54 deletions(-) diff --git a/bin/databases-config.js b/bin/databases-config.js index b031dad..c4684b9 100644 --- a/bin/databases-config.js +++ b/bin/databases-config.js @@ -1,13 +1,13 @@ module.exports = { events: { - "mongo-host": "localhost", + "mongo-host": "192.168.56.101", "mongo-port": 27017, "mongo-database": "events", "mongo-username": null, "mongo-password": null }, aggregations: { - "mongo-host": "localhost", + "mongo-host": "192.168.56.101", "mongo-port": 27017, "mongo-database": "aggregations", "mongo-username": null, diff --git a/bin/generator-config.js b/bin/generator-config.js index fea022a..3fae12c 100644 --- a/bin/generator-config.js +++ b/bin/generator-config.js @@ -1,11 +1,11 @@ module.exports = { - "packetsInInterval": 10000, + "packetsInInterval": 10000000, "intervalInMS": 60000, "sampleRateInMS": 100, "data": { "country": ["US", "US", "US", "US", "US", "US", "US", "US", "US", "US", "US", "GB", "GB", "GB", "GB", "GB", "GB", "GB", "GB", "GB", "GB", "GB", "GB", "GB", "GB", "GB", "GB", "GB", "GB", "GB", "GB", "GB", "GB", "GB", "GB", "IN", "IN", "IN", "IN", "IN", "IN", "FR", "FR", "FR", "JP", "JP", "JP", "JP", "JP", "JP", "JP", "JP", "JP", "JP", "JP", "JP", "JP", "AD", "AE", "AF", "AG", "AI", "AL", "AM", "AO", "AQ", "AR", "AS", "AT", "AU", "AW", "AX", "AZ", "BA", "BB", "BD", "BE", "BF", "BG", "BH", "BI", "BJ", "BL", "BM", "BN", "BO", "BQ", "BQ", "BR", "BS", "BT", "BV", "BW", "BY", "BZ", "CA", "CC", "CD", "CF", "CG", "CH", "CI", "CK", "CL", "CM", "CN", "CO", "CR", "CU", "CV", "CW", "CX", "CY", "CZ", "DE", "DJ", "DK", "DM", "DO", "DZ", "EC", "EE", "EG", "EH", "ER", "ES", "ET", "FI", "FJ", "FK", "FM", "FO", "GA", "GD", "GE", "GF", "GG", "GH", "GI", "GL", "GM", "GN", "GP", "GQ", "GR", "GS", "GT", "GU", "GW", "GY", "HK", "HM", "HN", "HR", "HT", "HU", "ID", "IE", "IL", "IM", "IO", "IQ", "IR", "IS", "IT", "JE", "JM", "JO", "KE", "KG", "KH", "KI", "KM", "KN", "KP", "KR", "KW", "KY", "KZ", "LA", "LB", "LC", "LI", "LK", "LR", "LS", "LT", "LU", "LV", "LY", "MA", "MC", "MD", "ME", "MF", "MG", "MH", "MK", "ML", "MM", "MN", "MO", "MP", "MQ", "MR", "MS", "MT", "MU", "MV", "MW", "MX", "MY", "MZ", "NA", "NC", "NE", "NF", "NG", "NI", "NL", "NO", "NP", "NR", "NU", "NZ", "OM", "PA", "PE", "PF", "PG", "PH", "PK", "PL", "PM", "PN", "PR", "PS", "PT", "PW", "PY", "QA", "RE", "RO", "RS", "RU", "RW", "SA", "SB", "SC", "SD", "SE", "SG", "SH", "SI", "SJ", "SK", "SL", "SM", "SN", "SO", "SR", "SS", "ST", "SV", "SX", "SY", "SZ", "TC", "TD", "TF", "TG", "TH", "TJ", "TK", "TL", "TM", "TN", "TO", "TR", "TT", "TV", "TW", "TZ", "UA", "UG", "UM", "UY", "UZ", "VA", "VC", "VE", "VG", "VI", "VN", "VU", "WF", "WS", "YE", "YT", "ZA", "ZM", "ZW"], "device": ["web", "iphone", "android"], "gender": ["male", "male", "male", "female", "female", "female", "female", "female", "female", "female"], - "type": ["t1", "t1", "t1", "t1", "t1", "t2", "t3", "t3", "t3"] + "type": ["t1"] } }; \ No newline at end of file diff --git a/bin/generator.js b/bin/generator.js index 9642c5f..7d6150b 100644 --- a/bin/generator.js +++ b/bin/generator.js @@ -51,20 +51,28 @@ var options = require("./generator-config.js"), v3: country } }); - console.log(packet); + // console.log(packet); ws.send(packet); }, sendPackets: function (n) { var i; + console.log((new Date()).getTime() + ": Sending packets: " + n); for (i = 0; i < n; i++) { this.sendPacket(); } }, sendForPacketsForInterval: function (arrayOfPacketsInInterval) { - var i; + var i, + doSetTimeout = function (i) { + setTimeout(function () { + packetGenerator.sendPackets(arrayOfPacketsInInterval[i]); + }, + i * options.sampleRateInMS); + }; + for (i = 0; i < arrayOfPacketsInInterval.length; i++) { - setTimeout(this.sendPackets(arrayOfPacketsInInterval[i]), i * options.sampleRateInMS); + doSetTimeout(i); } } }; diff --git a/lib/analytics/event.js b/lib/analytics/event.js index 798fbf1..dd1840d 100644 --- a/lib/analytics/event.js +++ b/lib/analytics/event.js @@ -8,8 +8,10 @@ var mongodb = require("mongodb"), exports.putterInit = function (db, options) { var eventsCollectionCreated = 0, eventsToSave = [], - event, - collectionSize = options.collectionSize; + events = [], + collectionSize = options.collectionSize, + bufferSize = 10000, + bufferTimestamp = (new Date()).getTime(); if (myutils.isInt(collectionSize)) { throw "Invalid collection size: " + collectionSize; @@ -20,22 +22,31 @@ exports.putterInit = function (db, options) { } } - function save(event) { - db.collection("events").insert(event, {w: 0}); - console.log("Got event: " + util.inspect(event, {colors: true, depth: null})); + /** + * @param events Array + */ + function save(events) { + var insertedDocs; + //implementing buffering + if (events !== undefined) { + if (!Array.isArray(events)) { + console.log("Ignoring save value: " + util.inspect(events, {colors: true, depth: null})); + return; + } + eventsToSave = eventsToSave.concat(events); + } + if ((eventsToSave.length > bufferSize) || (new Date().getTime() - bufferTimestamp > 100 && eventsToSave.length > 0)) { + insertedDocs = eventsToSave.splice(0, bufferSize); + db.collection("events").insert(insertedDocs, {w: 0, forceServerObjectId: true}); + bufferTimestamp = new Date().getTime(); + } + //console.log("Got event: " + util.inspect(event, {colors: true, depth: null})); } function putter(request, messageSenderCallback) { var time = new Date().getTime(), i; - function saveEvents() { - eventsToSave.forEach(function (event) { - save(event); - }); - eventsToSave = []; - } - // validations if (!type_re.test(request.type)) { messageSenderCallback({error: "invalid type"}); @@ -46,52 +57,49 @@ exports.putterInit = function (db, options) { return -1; } - // If an id is specified, promote it to Mongo's primary key. if (!Array.isArray(request)) { request = [request]; } for (i = 0; i < request.length; i++) { - event = {t: time, d: request[i].data, type: request[i].type}; - - // If eventsCollectionCreated, save immediately. - if (eventsCollectionCreated === 1) { - save(event); - } + events[i] = {t: time, d: request[i].data, type: request[i].type}; + } + // If eventsCollectionCreated, save immediately. + if (eventsCollectionCreated === 1) { + return save(events); + } - // If someone is already creating the event collection - // then append this event to the queue for later save. - if (eventsCollectionCreated === 0.5) { - eventsToSave.push(event); - } - eventsCollectionCreated = 0.5; + // If someone is already creating the events collection + // then append this events to the queue for later save. + if (eventsCollectionCreated === 0.5) { + eventsToSave = eventsToSave.concat(events); + } + eventsCollectionCreated = 0.5; - // Otherwise, it's up to us to see if the collection exists, verify the - // associated indexes and save - // any events that have queued up in the interim! + // Otherwise, it's up to us to see if the collection exists + // and save any events that have queued up in the interim! - // First add the new event to the queue. - eventsToSave.push(event); + // First add the new events to the queue. + eventsToSave = eventsToSave.concat(events); - // If the events collection exists, then we assume the indexes do - // too. Otherwise, we must create the required collections and indexes. + // If the events collection exists, then we assume the indexes do + // too. Otherwise, we must create the required collections and indexes. - db.collectionNames("events", {}, function (error, names) { - if (error) { - throw error; - } - if (names.length) { - eventsCollectionCreated = 1; - return saveEvents(); - } + db.collectionNames("events", {}, function (error, names) { + if (error) { + throw error; + } + if (names.length) { + eventsCollectionCreated = 1; + return save(); + } - // Events are indexed by time which is _id, which is natural order. - db.createCollection("events", {capped: true, autoIndexId: true, size: collectionSize}, function (error, result) { - handle(error); - eventsCollectionCreated = 1; - saveEvents(); - }); + // Events are indexed by time which is _id, which is natural order. + db.createCollection("events", {capped: true, autoIndexId: true, size: collectionSize}, function (error, result) { + handle(error); + eventsCollectionCreated = 1; + save(); }); - } + }); } return putter;