Skip to content

Commit

Permalink
Almost done with the generator. Event insertion is now buffered.
Browse files Browse the repository at this point in the history
  • Loading branch information
Mircea Danila Dumitrescu committed May 18, 2014
1 parent 77aa566 commit 9956516
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 54 deletions.
4 changes: 2 additions & 2 deletions bin/databases-config.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
module.exports = {
events: {
"mongo-host": "localhost",
"mongo-host": "192.168.56.101",
"mongo-port": 27017,
"mongo-database": "events",
"mongo-username": null,
"mongo-password": null
},
aggregations: {
"mongo-host": "localhost",
"mongo-host": "192.168.56.101",
"mongo-port": 27017,
"mongo-database": "aggregations",
"mongo-username": null,
Expand Down
4 changes: 2 additions & 2 deletions bin/generator-config.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 11 additions & 3 deletions bin/generator.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,28 @@ var options = require("./generator-config.js"),
v3: country
}
});
console.log(packet);
// console.log(packet);
ws.send(packet);
},
sendPackets: function (n) {
var i;
console.log((new Date()).getTime() + ": Sending packets: " + n);
for (i = 0; i < n; i++) {
this.sendPacket();
}
},

sendForPacketsForInterval: function (arrayOfPacketsInInterval) {
var i;
var i,
doSetTimeout = function (i) {
setTimeout(function () {
packetGenerator.sendPackets(arrayOfPacketsInInterval[i]);
},
i * options.sampleRateInMS);
};

for (i = 0; i < arrayOfPacketsInInterval.length; i++) {
setTimeout(this.sendPackets(arrayOfPacketsInInterval[i]), i * options.sampleRateInMS);
doSetTimeout(i);
}
}
};
Expand Down
102 changes: 55 additions & 47 deletions lib/analytics/event.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ var mongodb = require("mongodb"),
exports.putterInit = function (db, options) {
var eventsCollectionCreated = 0,
eventsToSave = [],
event,
collectionSize = options.collectionSize;
events = [],
collectionSize = options.collectionSize,
bufferSize = 10000,
bufferTimestamp = (new Date()).getTime();

if (myutils.isInt(collectionSize)) {
throw "Invalid collection size: " + collectionSize;
Expand All @@ -20,22 +22,31 @@ exports.putterInit = function (db, options) {
}
}

function save(event) {
db.collection("events").insert(event, {w: 0});
console.log("Got event: " + util.inspect(event, {colors: true, depth: null}));
/**
* @param events Array
*/
function save(events) {
var insertedDocs;
//implementing buffering
if (events !== undefined) {
if (!Array.isArray(events)) {
console.log("Ignoring save value: " + util.inspect(events, {colors: true, depth: null}));
return;
}
eventsToSave = eventsToSave.concat(events);
}
if ((eventsToSave.length > bufferSize) || (new Date().getTime() - bufferTimestamp > 100 && eventsToSave.length > 0)) {
insertedDocs = eventsToSave.splice(0, bufferSize);
db.collection("events").insert(insertedDocs, {w: 0, forceServerObjectId: true});
bufferTimestamp = new Date().getTime();
}
//console.log("Got event: " + util.inspect(event, {colors: true, depth: null}));
}

function putter(request, messageSenderCallback) {
var time = new Date().getTime(),
i;

function saveEvents() {
eventsToSave.forEach(function (event) {
save(event);
});
eventsToSave = [];
}

// validations
if (!type_re.test(request.type)) {
messageSenderCallback({error: "invalid type"});
Expand All @@ -46,52 +57,49 @@ exports.putterInit = function (db, options) {
return -1;
}

// If an id is specified, promote it to Mongo's primary key.
if (!Array.isArray(request)) {
request = [request];
}
for (i = 0; i < request.length; i++) {
event = {t: time, d: request[i].data, type: request[i].type};

// If eventsCollectionCreated, save immediately.
if (eventsCollectionCreated === 1) {
save(event);
}
events[i] = {t: time, d: request[i].data, type: request[i].type};
}
// If eventsCollectionCreated, save immediately.
if (eventsCollectionCreated === 1) {
return save(events);
}

// If someone is already creating the event collection
// then append this event to the queue for later save.
if (eventsCollectionCreated === 0.5) {
eventsToSave.push(event);
}
eventsCollectionCreated = 0.5;
// If someone is already creating the events collection
// then append this events to the queue for later save.
if (eventsCollectionCreated === 0.5) {
eventsToSave = eventsToSave.concat(events);
}
eventsCollectionCreated = 0.5;

// Otherwise, it's up to us to see if the collection exists, verify the
// associated indexes and save
// any events that have queued up in the interim!
// Otherwise, it's up to us to see if the collection exists
// and save any events that have queued up in the interim!

// First add the new event to the queue.
eventsToSave.push(event);
// First add the new events to the queue.
eventsToSave = eventsToSave.concat(events);

// If the events collection exists, then we assume the indexes do
// too. Otherwise, we must create the required collections and indexes.
// If the events collection exists, then we assume the indexes do
// too. Otherwise, we must create the required collections and indexes.

db.collectionNames("events", {}, function (error, names) {
if (error) {
throw error;
}
if (names.length) {
eventsCollectionCreated = 1;
return saveEvents();
}
db.collectionNames("events", {}, function (error, names) {
if (error) {
throw error;
}
if (names.length) {
eventsCollectionCreated = 1;
return save();
}

// Events are indexed by time which is _id, which is natural order.
db.createCollection("events", {capped: true, autoIndexId: true, size: collectionSize}, function (error, result) {
handle(error);
eventsCollectionCreated = 1;
saveEvents();
});
// Events are indexed by time which is _id, which is natural order.
db.createCollection("events", {capped: true, autoIndexId: true, size: collectionSize}, function (error, result) {
handle(error);
eventsCollectionCreated = 1;
save();
});
}
});
}

return putter;
Expand Down

0 comments on commit 9956516

Please sign in to comment.