-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.js
133 lines (104 loc) · 3.76 KB
/
app.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env node
const http = require('http');
const ipc = require('node-ipc');
const BigNumber = require('bignumber.js');
// ipc configuration
ipc.config.id = 'nanoStream';
ipc.config.retry = 1500;
ipc.config.logger = () => {}; // Make ipc logger a no-op
const args = {};
// Collect all args passed in
process.argv.slice(2).forEach((arg) => {
const [key, value] = arg.split('=');
args[key] = value;
});
// Port and host of the webserver that receives callbacks from the Nano RPC
const port = args.port || 3000;
const host = args.host || '127.0.0.1';
// Establish new ipc socket server
const ipcPath = ipc.config.socketRoot + ipc.config.appspace + ipc.config.id;
ipc.serve(ipcPath);
ipc.server.start();
ipc.server.on('connect', () => console.info('nano-stream client connected'));
ipc.server.on('socket.disconnected', () => console.info('nano-stream client disconnected'));
let transactionCount = 0; // cleared every second
// Transaction record is an array containing counts of the last 60s of transactions,
// each element will represent the total per second
let transactionRecord = Array(60 * 10).fill(undefined);
// Returns the current transactions per second
const tps = () => {
return tpsOverPastSeconds(60);
};
// Returns the current transactions per minute
const tpm = () => {
const tps = tpsOverPastSeconds(60 * 10);
return tps * 60;
};
const tpsOverPastSeconds = (nSeconds) => {
const recorded = transactionRecord.filter(d => d !== undefined);
if (recorded.length === 0) return 1;
const sampleSize = recorded.slice(0, nSeconds);
const total = sampleSize.reduce((total, num) => total + num);
return total / sampleSize.length;
};
// Every second move the transactionCount into the first position in the transactionRecord array and reset it
setInterval(() => {
transactionRecord.pop();
transactionRecord.unshift(transactionCount);
transactionCount = 0;
}, 1000);
// Webserver request handler
const requestHandler = (request, response) => {
console.debug(request.method, request.url);
// Handle any POST request
if (request.method === 'POST') {
transactionCount += 1;
let body = '';
request.on('data', chunk => body += chunk.toString());
// TODO make this a proper stream
request.on('end', () => {
// Construct a payload
let payload = JSON.parse(body);
// Parse block
let block = JSON.parse(payload.block);
delete payload.block;
// Convert amounts to ints as raw, and add in amounts in NANO
if (block.balance) {
Object.assign(block, {
balance_raw: parseInt(block.balance),
balance: parseFloat(new BigNumber(block.balance).shiftedBy(-30).toFixed(15, 1))
});
}
if (payload.amount) {
Object.assign(payload, {
amount_raw: parseInt(payload.amount),
amount: parseFloat(new BigNumber(payload.amount).shiftedBy(-30).toFixed(15, 1))
});
}
// Merge block data into top level object, and add tps and tpm
Object.assign(payload, block);
// Add in tps and tpm, and make is_send a boolean
Object.assign(payload, {
is_send: payload.is_send === 'true',
seen: (new Date()).getTime() / 1000,
tps: tps(),
tpm: tpm()
});
payload = JSON.stringify(payload);
// Broadcast payload to all connected clients
if (ipc.server.sockets.length > 0) {
ipc.server.broadcast('payload', payload);
} else {
console.info(`No connected clients. Payload was: ${payload}`);
}
response.end('ok');
});
}
};
const webServer = http.createServer(requestHandler);
webServer.listen({port: port, host: host}, (err) => {
if (err) {
return console.error('Something bad happened', err);
}
console.info(`Web server is listening on ${host}:${port}`);
});