Skip to content

Commit

Permalink
Heavy logging, stable aggregator. Completely refactored
Browse files Browse the repository at this point in the history
  • Loading branch information
Mircea Danila Dumitrescu committed May 10, 2014
1 parent 6e77bb6 commit d150c80
Show file tree
Hide file tree
Showing 16 changed files with 574 additions and 505 deletions.
157 changes: 70 additions & 87 deletions bin/aggregator-config.js
Original file line number Diff line number Diff line change
@@ -1,28 +1,11 @@
module.exports = {
mongo: [
{
"mongo-host": "192.168.56.101",
"mongo-port": 27017,
"mongo-database": "shopcade_cube_events",
"mongo-username": null,
"mongo-password": null
},
{
"mongo-host": "192.168.56.101",
"mongo-port": 27017,
"mongo-database": "shopcade_cube_aggregations",
"mongo-username": null,
"mongo-password": null
}
],
"aggregations": [
{
"reagragatable": true,
"reaggragatable": true,
"agg1": [
{
"$match": {
"type": "t1",
"t": "This gets overwritten with the correct expressions for the 1 or 5 minute intervals"
"type": "t1" //mandatory
}},
{
"$project": {
Expand Down Expand Up @@ -83,73 +66,73 @@ module.exports = {
}
]
}
// ,
// {
// "reagragatable": true,
// "t2": [
// {
// "$match": {
// "type": "t2",
// "t": "This gets overwritten with the correct expressions for the 1 or 5 minute intervals"
// }},
// {
// "$project": {
// _id: 0,
// type: "$type",
// v1: "$d.v1",
// v2: "$d.v2",
// v3: {
// $cond: [
// {
// $eq: ["$d.v3", "US"]
// },
// "US",
// {
// $cond: [
// {
// $eq: ["$d.v3", "GB"]
// },
// "GB",
// {
// $cond: [
// {
// $eq: ["$d.v3", "JP"]
// },
// "JP",
// {
// $cond: [
// {
// $eq: ["$d.v3", "IN"]
// },
// "IN",
// "OTHER"
// ]
// }
// ]
// }
// ]
// }
// ]
// }
// }
// },
// {
// "$group": {
// "_id": {
// "type": "$type",
// "v1": "$v1",
// "v2": "$v2",
// "v3": "$v3"
// },
// "count": {
// $sum: 1
// },
// "index": {
// $avg: "$index"
// }
// }
// }
// ]
// }
// ,
// {
// "reaggragatable": true,
// "t2": [
// {
// "$match": {
// "type": "t2",
// "t": "This gets overwritten with the correct expressions for the 1 or 5 minute intervals"
// }},
// {
// "$project": {
// _id: 0,
// type: "$type",
// v1: "$d.v1",
// v2: "$d.v2",
// v3: {
// $cond: [
// {
// $eq: ["$d.v3", "US"]
// },
// "US",
// {
// $cond: [
// {
// $eq: ["$d.v3", "GB"]
// },
// "GB",
// {
// $cond: [
// {
// $eq: ["$d.v3", "JP"]
// },
// "JP",
// {
// $cond: [
// {
// $eq: ["$d.v3", "IN"]
// },
// "IN",
// "OTHER"
// ]
// }
// ]
// }
// ]
// }
// ]
// }
// }
// },
// {
// "$group": {
// "_id": {
// "type": "$type",
// "v1": "$v1",
// "v2": "$v2",
// "v3": "$v3"
// },
// "count": {
// $sum: 1
// },
// "index": {
// $avg: "$index"
// }
// }
// }
// ]
// }
]
};
9 changes: 0 additions & 9 deletions bin/collector-config.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,4 @@
module.exports = {
mongo: [
{
"mongo-host": "192.168.56.101",
"mongo-port": 27017,
"mongo-database": "shopcade_cube_events",
"mongo-username": null,
"mongo-password": null
}
],
"http-port": 1080,
"udp-port": 1180
};
17 changes: 17 additions & 0 deletions bin/databases-config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
module.exports = {
events: {
"mongo-host": "192.168.56.101",
"mongo-port": 27017,
"mongo-database": "shopcade_cube_events",
"mongo-username": null,
"mongo-password": null
},
aggregations: {
"mongo-host": "192.168.56.101",
"mongo-port": 27017,
"mongo-database": "shopcade_cube_aggregations",
"mongo-username": null,
"mongo-password": null,
"collectionSize": 256 * 1024 * 1024
}
};
16 changes: 0 additions & 16 deletions bin/disperser-config.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
module.exports = {
mongo: [
{
"mongo-host": "192.168.56.101",
"mongo-port": 27017,
"mongo-database": "shopcade_cube_events",
"mongo-username": null,
"mongo-password": null
},
{
"mongo-host": "192.168.56.101",
"mongo-port": 27017,
"mongo-database": "shopcade_cube_aggregations",
"mongo-username": null,
"mongo-password": null
}
],
"http-port": 1081
};
9 changes: 5 additions & 4 deletions examples/event-stream/event-get.html
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,21 @@ <h1>Streaming Events</h1>
console.log("connected!");
socket.send(JSON.stringify({
type: type,
start: new Date()
stop: new Date(),
start: new Date().getTime()-5*60*1000
}));
};

socket.onmessage = function (message) {
var event;
if (message) {
console.log(index++);
// console.log(message);
//console.log(message);
if (message.data) {
event = JSON.parse(message.data);
// console.log("received", message.data);
//console.log("received", message.data);
if (event && event.data) {
// console.log("received", event.data);
//console.log("received", event.data);
}
}
}
Expand Down
15 changes: 0 additions & 15 deletions examples/random-emitter/random-config.js

This file was deleted.

30 changes: 0 additions & 30 deletions examples/random-emitter/random-emitter.js

This file was deleted.

82 changes: 0 additions & 82 deletions lib/cube/aggregation.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,88 +2,6 @@
var mongodb = require("mongodb"),
genericGetter = require("./genericGetter.js");

//exports.putterInit = function (db) {
// var eventsCollectionCreated = 0,
// eventsToSave = [],
// event;
//
// function handle(error) {
// if (error) {
// throw error;
// }
// }
//
// function save(event) {
// db.collection("events").insert(event, {w: 0});
// }
//
// function putterInit(request, callback) {
// var time = request.hasOwnProperty("time") ? new Date(request.time) : new Date();
//
// function saveEvents() {
// eventsToSave.forEach(function (event) {
// save(event);
// });
// eventsToSave = [];
// }
//
// // Validate the date and type.
// if (!type_re.test(request.type)) {
// callback({error: "invalid type"});
// return -1;
// }
// if (isNaN(time)) {
// callback({error: "invalid time"});
// return -1;
// }
//
// // If an id is specified, promote it to Mongo's primary key.
// event = {t: time, d: request.data, type: request.type};
// if (request.hasOwnProperty("id")) {
// event._id = request.id;
// }
// // If eventsCollectionCreated, save immediately.
// if (eventsCollectionCreated === 1) {
// return save(event);
// }
//
// // If someone is already creating the event collection
// // then append this event to the queue for later save.
// if (eventsCollectionCreated === 0.5) {
// return eventsToSave.push(event);
// }
// eventsCollectionCreated = 0.5;
//
// // Otherwise, it's up to us to see if the collection exists, verify the
// // associated indexes and save
// // any events that have queued up in the interim!
//
// // First add the new event to the queue.
// eventsToSave.push(event);
//
// // If the events collection exists, then we assume the indexes do
// // too. Otherwise, we must create the required collections and indexes.
//
// db.collectionNames("events", {}, function (error, names) {
// if (error) {
// throw error;
// }
// if (names.length) {
// eventsCollectionCreated = 1;
// return saveEvents();
// }
// var events = db.collection("events");
//
// // Events are indexed by time.
// events.ensureIndex({"t": 1}, handle);
// eventsCollectionCreated = 1;
// saveEvents();
// });
// }
//
// return putterInit;
//};

exports.getterInit = function (aggregationDB) {
return genericGetter(aggregationDB);
};
Expand Down
Loading

0 comments on commit d150c80

Please sign in to comment.