Skip to content

Commit

Permalink
Major refactor for frontend. Switching to crossfilter and dc js
Browse files Browse the repository at this point in the history
  • Loading branch information
Mircea Danila Dumitrescu committed May 23, 2014
1 parent f3f6feb commit a3daaf2
Show file tree
Hide file tree
Showing 14 changed files with 646 additions and 1,023 deletions.
3 changes: 2 additions & 1 deletion bin/databases-config.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ module.exports = {
"mongo-port": 27017,
"mongo-database": "events",
"mongo-username": null,
"mongo-password": null
"mongo-password": null,
"collectionSize": 1024 * 1024 * 1024
},
aggregations: {
"mongo-host": "192.168.56.101",
Expand Down
2 changes: 1 addition & 1 deletion bin/generator-config.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions bin/generator.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ var options = require("./generator-config.js"),
}
return a;
},
sendPacket: function () {
sendPacket: function (type) {
var country = options.data.country[randomTools.getRandomInt(0, options.data.country.length - 1)],
gender = options.data.gender[randomTools.getRandomInt(0, options.data.gender.length - 1)],
device = options.data.device[randomTools.getRandomInt(0, options.data.device.length - 1)],
type = options.data.type[randomTools.getRandomInt(0, options.data.type.length - 1)],
packet = JSON.stringify({
type: type,
data: {
Expand All @@ -54,19 +53,20 @@ var options = require("./generator-config.js"),
// console.log(packet);
ws.send(packet);
},
sendPackets: function (n) {
sendPackets: function (n, type) {
var i;
console.log((new Date()).getTime() + ": Sending packets: " + n);
for (i = 0; i < n; i++) {
this.sendPacket();
this.sendPacket(type);
}
},

sendForPacketsForInterval: function (arrayOfPacketsInInterval) {
var i,
doSetTimeout = function (i) {
setTimeout(function () {
packetGenerator.sendPackets(arrayOfPacketsInInterval[i]);
var type = options.data.type[randomTools.getRandomInt(0, options.data.type.length - 1)];
packetGenerator.sendPackets(arrayOfPacketsInInterval[i], type);
},
i * options.sampleRateInMS);
};
Expand Down
2 changes: 1 addition & 1 deletion bower.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"jquery": "2.1.0",
"bootstrap": "3.1.1",
"d3": "3.3.*",
"crossfilter": "1.2.0",
"crossfilter": "1.3.*",
"nvd3": "1.1.*",
"dcjs": "1.6.0"
},
Expand Down
3 changes: 1 addition & 2 deletions examples/event-stream/event-get.html
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ <h1>Streaming Events</h1>
console.log("connected!");
socket.send(JSON.stringify({
type: type,
stop: new Date(),
start: new Date().getTime()-5*60*1000
start: new Date().getTime()-60*1000
}));
};

Expand Down
40 changes: 36 additions & 4 deletions lib/analytics/disperser.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ exports.register = function (dbs, endpoints, options) {
var eventsDB = dbs.events,
aggregationsDB = dbs.aggregations,
eventGetter = require("./event.js").getterInit(eventsDB),
eventCounter = require("./event.js").counterInit(eventsDB),
aggregationGetter = require("./aggregation.js").getterInit(aggregationsDB);

endpoints.ws.push(
endpoint("/1.0/event/get", eventGetter),
endpoint("/1.0/event/getCount", eventCounter),
endpoint("/1.0/aggregation/get", aggregationGetter)
);

Expand All @@ -30,7 +32,7 @@ exports.register = function (dbs, endpoints, options) {
request.limit = LIMIT_MAX;
}

function documentHandler(dataElem) {
function messageSenderHttp(dataElem) {
if (dataElem === null) {
response.end(JSON.stringify(data.reverse()));
}
Expand All @@ -39,7 +41,7 @@ exports.register = function (dbs, endpoints, options) {
}
}

if (eventGetter(request, documentHandler) < 0) {
if (eventGetter(request, messageSenderHttp) < 0) {
response.writeHead(400, headers);
response.end(JSON.stringify(data[0]));
} else {
Expand All @@ -59,7 +61,7 @@ exports.register = function (dbs, endpoints, options) {
request.limit = LIMIT_MAX;
}

function documentHandler(dataElem) {
function messageSenderHttp(dataElem) {
if (dataElem === null) {
response.end(JSON.stringify(data.reverse()));
}
Expand All @@ -68,7 +70,36 @@ exports.register = function (dbs, endpoints, options) {
}
}

if (aggregationGetter(request, documentHandler) < 0) {
if (aggregationGetter(request, messageSenderHttp) < 0) {
response.writeHead(400, headers);
response.end(JSON.stringify(data[0]));
} else {
response.writeHead(200, headers);
}

}

function eventCount(request, response) {
request = url.parse(request.url, true).query;
var data = [];

if (!request.hasOwnProperty('start')) {
request.start = 0;
}
if ((request.hasOwnProperty('limit') && (request.limit >= LIMIT_MAX))) {
request.limit = LIMIT_MAX;
}

function messageSenderHttp(dataElem) {
if (dataElem === null) {
response.end(JSON.stringify(data.reverse()));
}
else {
data.push(dataElem);
}
}

if (eventCounter(request, messageSenderHttp) < 0) {
response.writeHead(400, headers);
response.end(JSON.stringify(data[0]));
} else {
Expand All @@ -79,6 +110,7 @@ exports.register = function (dbs, endpoints, options) {

endpoints.http.push(
endpoint("GET", "/1.0/event/get", eventGet),
endpoint("GET", "/1.0/event/getCount", eventCount),
endpoint("GET", "/1.0/aggregation/get", aggregationGet)
);

Expand Down
16 changes: 11 additions & 5 deletions lib/analytics/event.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@
var mongodb = require("mongodb"),
type_re = /^[a-z][a-zA-Z0-9_]+$/,
genericGetter = require("./genericGetter.js"),
genericCounter = require("./genericCounter.js"),
myutils = require("./myutils.js"),
util = require("util");
util = require("util"),
mongoConfigs = require("../../bin/databases-config");

exports.putterInit = function (db, options) {
var eventsCollectionCreated = 0,
eventsToSave = [],
events = [],
collectionSize = options.collectionSize,
collectionSize = mongoConfigs.events.collectionSize,
bufferSize = 10000,
bufferTimestamp = (new Date()).getTime();

if (myutils.isInt(collectionSize)) {
if (!myutils.isInt(collectionSize)) {
throw "Invalid collection size: " + collectionSize;
}
function handle(error) {
Expand Down Expand Up @@ -61,7 +63,8 @@ exports.putterInit = function (db, options) {
request = [request];
}
for (i = 0; i < request.length; i++) {
events[i] = {t: time, d: request[i].data, type: request[i].type};
//diminishing time granularity, as it is not needed and saving just the second allows us to aggregate on time
events[i] = {t: time - time % 1000, d: request[i].data, type: request[i].type};
}
// If eventsCollectionCreated, save immediately.
if (eventsCollectionCreated === 1) {
Expand Down Expand Up @@ -93,7 +96,7 @@ exports.putterInit = function (db, options) {
return save();
}

// Events are indexed by time which is _id, which is natural order.
// Events are indexed by _id, which is natural order. (which is time)
db.createCollection("events", {capped: true, autoIndexId: true, size: collectionSize}, function (error, result) {
handle(error);
eventsCollectionCreated = 1;
Expand All @@ -108,3 +111,6 @@ exports.putterInit = function (db, options) {
exports.getterInit = function (eventsDB) {
return genericGetter(eventsDB);
};
exports.counterInit = function (eventsDB) {
return genericCounter(eventsDB);
};
Loading

0 comments on commit a3daaf2

Please sign in to comment.