Skip to content

Commit

Permalink
Made some changes to have consistent logging as required by #14. So f…
Browse files Browse the repository at this point in the history
…ar have function for logging and also text and JSON outputs ffor start function, open socket and send message.
  • Loading branch information
Gerard Moroney committed Jan 3, 2018
1 parent bd23ab9 commit 40e0482
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 23 deletions.
111 changes: 97 additions & 14 deletions functions/exchange_functions.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,49 @@

var moment=require('moment');

// Function to log key stages in the establishng of subscriptions for each exchange to assist with operational monitoring and support.
function processMessages (id,ts,exchange_name,exchange_symbol,exchange_wss)
{
msg_time=new Date();
var sysmsg = [];
sysmsg[0] = { "id": ts, "time": msg_time, "action": "none", "exchange_name": exchange_name, "exchange_symbol": exchange_symbol, "exchange_wss": exchange_wss };
sysmsg[1] = "|ID: " + ts + "|Time: " + msg_time + "|Exchange_Name: " + exchange_name + "|Exchange_Symbol: " + exchange_symbol, "|Exchange_wss: " + exchange_wss;
if ( id == 0 ) {
msg_action = "START_FUNCTION";
} else if ( id == 100 ) {
msg_action = "OPEN_WS";
} else if ( id == 110 ) {
msg_action = "OPEN_WS_BINANCE";
} else if ( id == 120 ) {
msg_action = "OPEN_WS_BITTREX";
} else if ( id == 130 ) {
msg_action = "OPEN_WS_PUSHER";
} else if ( id == 200 ) {
msg_action = "SEND_WS_SUBS_MSG";
} else if ( id == 300 ) {
msg_action = "CONFIRM_WS_SUBS";
} else if ( id == 400 ) {
msg_action = "OPEN_REDIS";
} else {
msg_action = "UNKNOWN";
}
sysmsg[0].action = msg_action;
sysmsg[1] = sysmsg[1] + "|Action: " + msg_action;
console.log(sysmsg[0]);
}

// Function to subscribe to stream, transform data and publish to Redis from BITFINEX
function processBITFINEX(client, exchange_name,exchange_wss,exchange_symbol) {
function processBITFINEX(client, exchange_name,exchange_wss,exchange_symbol)
{
// Log Message
var ts = new Date().getTime();
processMessages ('0',ts,exchange_name,exchange_symbol,exchange_wss);

// Connect To Exchange
var bc_queue = exchange_name + ':' + exchange_symbol;
const WebSocket = require('ws');
const wss = new WebSocket(exchange_wss);
processMessages ('100',ts,exchange_name,exchange_symbol,exchange_wss);

// Open connection once one is established
wss.onopen = () => {
Expand All @@ -20,14 +57,14 @@ function processBITFINEX(client, exchange_name,exchange_wss,exchange_symbol) {
"pair": exchange_symbol
}
));
processMessages ('200',ts,exchange_name,exchange_symbol,exchange_wss);
};

// Parse channel information and send to Redis
wss.onmessage = (msg) => {
var resp = JSON.parse(msg.data);
var head = resp.event;
var head_body = resp[1];
var bc_queue = exchange_name + ':' + exchange_symbol;

// Get channel and symbol from response
if ( head == "subscribed" )
Expand All @@ -50,11 +87,17 @@ function processBITFINEX(client, exchange_name,exchange_wss,exchange_symbol) {
}

// Function to subscribe to stream, transform data and publish to Redis from HITBTC
function processHITBTC(client, exchange_name,exchange_wss,exchange_symbol) {
function processHITBTC(client, exchange_name,exchange_wss,exchange_symbol)
{
// Log Message
var ts = new Date().getTime();
processMessages ('0',ts,exchange_name,exchange_symbol,exchange_wss);

// Connect To Exchange
var bc_queue = exchange_name + ':' + exchange_symbol;
const WebSocket = require('ws');
const wss = new WebSocket(exchange_wss);
processMessages ('100',ts,exchange_name,exchange_symbol,exchange_wss);

// Open connection once one is established
wss.onopen = () => {
Expand All @@ -68,14 +111,14 @@ function processHITBTC(client, exchange_name,exchange_wss,exchange_symbol) {
}
}
));
processMessages ('200',ts,exchange_name,exchange_symbol,exchange_wss);
};

// Parse channel information and send to Redis
wss.onmessage = (msg) => {

// parse response and create queue name
var message = JSON.parse([msg.data]);
var bc_queue = exchange_name + ':' + exchange_symbol;

// Get channel and symbol from response
if ( message.method == "snapshotTrades" )
Expand All @@ -100,20 +143,25 @@ function processHITBTC(client, exchange_name,exchange_wss,exchange_symbol) {
}

// Function to subscribe to stream, transform data and publish to Redis from GEMINI
function processGEMINI(client,exchange_name,exchange_wss,exchange_symbol) {
function processGEMINI(client,exchange_name,exchange_wss,exchange_symbol)
{
// Log Message
var ts = new Date().getTime();
processMessages ('0',ts,exchange_name,exchange_symbol,exchange_wss);

// Connect To Exchange
var bc_queue = exchange_name + ':' + exchange_symbol;
const WebSocket = require('ws');
wssurl = exchange_wss + '/' + exchange_symbol;
console.log(wssurl);
const wss = new WebSocket(wssurl);
processMessages ('100',ts,exchange_name,exchange_symbol,exchange_wss);

// Parse channel information and send to Redis
wss.onmessage = (msg) => {

// parse response and create queue name
var message = JSON.parse([msg.data]);
var bc_queue = exchange_name + ':' + exchange_symbol;

if (message.socket_sequence == 0 ) {
console.log( bc_queue, " currency = ", exchange_symbol);
Expand All @@ -136,12 +184,17 @@ function processGEMINI(client,exchange_name,exchange_wss,exchange_symbol) {
}

// Function to subscribe to stream, transform data and publish to Redis from BINANCE
function processBINANCE(client,exchange_name,exchange_wss,exchange_symbol) {
function processBINANCE(client,exchange_name,exchange_wss,exchange_symbol)
{
// Log Message
var ts = new Date().getTime();
processMessages ('0',ts,exchange_name,exchange_symbol,exchange_wss);

// Import functions, define variables and establish connection
const api = require('binance');
var bc_queue = exchange_name + ':' + exchange_symbol;
const api = require('binance');
const binanceWS = new api.BinanceWS();
processMessages ('110',ts,exchange_name,exchange_symbol,exchange_wss);

binanceWS.onAggTrade( exchange_symbol , (data) => {
tr_id=data.tradeId;
Expand All @@ -160,12 +213,18 @@ function processBINANCE(client,exchange_name,exchange_wss,exchange_symbol) {
// Function to subscribe to stream, transform data and publish to Redis from HUOBIAPI
function processHUOBIAPI(client,exchange_name,exchange_wss,exchange_symbol)
{
// Log Message
var ts = new Date().getTime();
processMessages ('0',ts,exchange_name,exchange_symbol,exchange_wss);

// Define constants
const WebSocket = require('ws');
const pako = require('pako');

// Open connection once one is established
var bc_queue = exchange_name + ':' + exchange_symbol;
var wss = new WebSocket(exchange_wss);
processMessages ('100',ts,exchange_name,exchange_symbol,exchange_wss);

wss.onopen = () =>
{
Expand All @@ -178,6 +237,7 @@ function processHUOBIAPI(client,exchange_name,exchange_wss,exchange_symbol)
"id": symbol
}
));
processMessages ('200',ts,exchange_name,exchange_symbol,exchange_wss);
};

// Parse channel information and send to Redis
Expand All @@ -190,7 +250,6 @@ function processHUOBIAPI(client,exchange_name,exchange_wss,exchange_symbol)
to: 'string'
});
msg = JSON.parse(text);
var bc_queue = exchange_name + ':' + exchange_symbol;
if (msg.ping)
{
wss.send(JSON.stringify(
Expand All @@ -214,8 +273,14 @@ function processHUOBIAPI(client,exchange_name,exchange_wss,exchange_symbol)
// Function to subscribe to stream, transform data and publish to Redis from BITTREX
function processBITTREX(client,exchange_name,exchange_wss,exchange_symbol)
{
// Log Message
var ts = new Date().getTime();
processMessages ('0',ts,exchange_name,exchange_symbol,exchange_wss);

// Define constants
var bc_queue = exchange_name + ':' + exchange_symbol;
var bittrex = require('../node_modules/node.bittrex.api/node.bittrex.api');
processMessages ('120',ts,exchange_name,exchange_symbol,exchange_wss);

bittrex.options({
websockets: {
Expand Down Expand Up @@ -243,11 +308,18 @@ function processBITTREX(client,exchange_name,exchange_wss,exchange_symbol)
}

// Function to subscribe to stream, transform data and publish to Redis from OKEX
function processOKEX(client, exchange_name,exchange_wss,exchange_symbol) {
function processOKEX(client, exchange_name,exchange_wss,exchange_symbol)
{
// Log Message
var ts = new Date().getTime();
processMessages ('0',ts,exchange_name,exchange_symbol,exchange_wss);

// Connect To Exchange
var bc_queue = exchange_name + ':' + exchange_symbol;
const WebSocket = require('ws');
const wss = new WebSocket(exchange_wss);
processMessages ('100',ts,exchange_name,exchange_symbol,exchange_wss);

const channelName = 'ok_sub_spot_' + exchange_symbol + '_deals';
// Open connection once one is established
wss.onopen = () => {
Expand All @@ -259,14 +331,14 @@ function processOKEX(client, exchange_name,exchange_wss,exchange_symbol) {
'channel': channelName
}
));
processMessages ('200',ts,exchange_name,exchange_symbol,exchange_wss);
};

// Parse channel information and send to Redis
wss.onmessage = (msg) => {
var resp = JSON.parse(msg.data);
var resp_channel = resp[0].channel;
var resp_data_channel = resp[0].data.channel;
var bc_queue = exchange_name + ':' + exchange_symbol;

// Parse and transform record returned and publish to redis
if ( resp_channel == "addChannel" )
Expand Down Expand Up @@ -308,12 +380,17 @@ function processOKEX(client, exchange_name,exchange_wss,exchange_symbol) {
}

// Function to subscribe to stream, transform data and publish to Redis from GDAX
function processGDAX(client, exchange_name,exchange_wss,exchange_symbol) {
function processGDAX(client, exchange_name,exchange_wss,exchange_symbol)
{
// Log Message
var ts = new Date().getTime();
processMessages ('0',ts,exchange_name,exchange_symbol,exchange_wss);

// Connect To Exchange
var bc_queue = exchange_name + ':' + exchange_symbol;
const WebSocket = require('ws');
const wss = new WebSocket(exchange_wss);
var bc_queue = exchange_name + ':' + exchange_symbol;
processMessages ('100',exchange_name,exchange_symbol,exchange_wss);

// Open connection once one is established
wss.onopen = () => {
Expand All @@ -327,6 +404,7 @@ function processGDAX(client, exchange_name,exchange_wss,exchange_symbol) {
]
}
));
processMessages ('200',exchange_name,exchange_symbol,exchange_wss);
};

// Parse channel information and send to Redis
Expand All @@ -349,14 +427,19 @@ function processGDAX(client, exchange_name,exchange_wss,exchange_symbol) {
}

// Function to subscribe to stream, transform data and publish to Redis from GDAX
function processBITSTAMP(client, exchange_name,exchange_wss,exchange_symbol) {
function processBITSTAMP(client, exchange_name,exchange_wss,exchange_symbol)
{
// Log Message
var ts = new Date().getTime();
processMessages ('0',ts,exchange_name,exchange_symbol,exchange_wss);

// Parameter setup
var bc_queue = exchange_name + ':' + exchange_symbol;

// Connect to pusher
var Pusher = require('pusher-client');
var socket = new Pusher('de504dc5763aeef9ff52');
processMessages ('130',ts,exchange_name,exchange_symbol,exchange_wss);
var channel = socket.subscribe('live_trades_' + exchange_symbol.toLowerCase() );
socket.bind_all ( function(data)
{
Expand Down
9 changes: 0 additions & 9 deletions src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ function main () {
for( var j = 0; j < exchange_symbol_array.length; j++)
{
exchange_symbol = exchange_symbol_array[j].symbol;
console.log(exchange_name, exchange_wss, exchange_symbol);
exFn.processBITFINEX(client, exchange_name, exchange_wss, exchange_symbol);
}
}
Expand All @@ -45,7 +44,6 @@ function main () {
for( var k = 0; k < exchange_symbol_array.length; k++)
{
exchange_symbol = exchange_symbol_array[k].symbol;
console.log(exchange_name, exchange_wss, exchange_symbol);
exFn.processHITBTC(client, exchange_name, exchange_wss, exchange_symbol);
}
}
Expand All @@ -56,7 +54,6 @@ function main () {
for( var l = 0; l < exchange_symbol_array.length; l++)
{
exchange_symbol = exchange_symbol_array[l].symbol;
console.log(exchange_name, exchange_wss, exchange_symbol);
exFn.processGEMINI(client, exchange_name, exchange_wss, exchange_symbol);
}
}
Expand All @@ -67,7 +64,6 @@ function main () {
for( var m = 0; m < exchange_symbol_array.length; m++)
{
exchange_symbol = exchange_symbol_array[m].symbol;
console.log(exchange_name, exchange_wss, exchange_symbol);
exFn.processHUOBIAPI(client, exchange_name, exchange_wss, exchange_symbol);
}
}
Expand All @@ -78,7 +74,6 @@ function main () {
for( var n = 0; n < exchange_symbol_array.length; n++)
{
exchange_symbol = exchange_symbol_array[n].symbol;
console.log(exchange_name, exchange_wss, exchange_symbol);
exFn.processOKEX(client, exchange_name, exchange_wss, exchange_symbol);
}
}
Expand All @@ -89,7 +84,6 @@ function main () {
for( var p = 0; p < exchange_symbol_array.length; p++)
{
exchange_symbol = exchange_symbol_array[p].symbol;
console.log(exchange_name, exchange_wss, exchange_symbol);
exFn.processGDAX(client, exchange_name, exchange_wss, exchange_symbol);
}
}
Expand All @@ -100,7 +94,6 @@ function main () {
for( var q = 0; q < exchange_symbol_array.length; q++)
{
exchange_symbol = exchange_symbol_array[q].symbol;
console.log(exchange_name, exchange_wss, exchange_symbol);
exFn.processBITSTAMP(client, exchange_name, exchange_wss, exchange_symbol);
}
}
Expand All @@ -111,7 +104,6 @@ function main () {
for( var r = 0; r < exchange_symbol_array.length; r++)
{
exchange_symbol = exchange_symbol_array[r].symbol;
console.log(exchange_name, exchange_wss, exchange_symbol);
exFn.processBINANCE(client, exchange_name, exchange_wss, exchange_symbol);
}
}
Expand All @@ -129,7 +121,6 @@ function main () {
}
if ( s == ( exchange_symbol_array.length - 1) )
{
console.log(exchange_name, exchange_wss, exchange_symbol);
exFn.processBITTREX(client, exchange_name, exchange_wss, exchange_symbol);
}
}
Expand Down

0 comments on commit 40e0482

Please sign in to comment.