Skip to content

Commit

Permalink
Changes for #14 (consistent messaging) and #15 (moving redis to its o…
Browse files Browse the repository at this point in the history
…wn function)
  • Loading branch information
Gerard Moroney committed Jan 3, 2018
1 parent 40e0482 commit 8311d8c
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 130 deletions.
91 changes: 59 additions & 32 deletions functions/exchange_functions.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
/*jshint esversion: 6 */

var moment=require('moment');
//var moment=require('moment');

// Small function to publish transformed message to Redis
function publishRedis (tr_timestamp,tr_id,tr_price,tr_amount,tr_side,client,bc_queue) {
msgout = { "tr_id": tr_id, "tr_timestamp": tr_timestamp, "tr_price": tr_price, "tr_amount": tr_amount, "tr_side": tr_side };
client.publish(bc_queue,JSON.stringify(msgout));
}

// Function to log key stages in the establishng of subscriptions for each exchange to assist with operational monitoring and support.
function processMessages (id,ts,exchange_name,exchange_symbol,exchange_wss)
{
msg_time=new Date();
var sysmsg = [];
// for json better to write this to mongo collection or not do at all. Consider dropping ts and using mongo _id perhaps?
sysmsg[0] = { "id": ts, "time": msg_time, "action": "none", "exchange_name": exchange_name, "exchange_symbol": exchange_symbol, "exchange_wss": exchange_wss };
sysmsg[1] = "|ID: " + ts + "|Time: " + msg_time + "|Exchange_Name: " + exchange_name + "|Exchange_Symbol: " + exchange_symbol, "|Exchange_wss: " + exchange_wss;
sysmsg[2] = { "id": ts, "time": msg_time, "action": "none", "exchange_symbol": exchange_symbol };
sysmsg[3] = "|ID: " + ts + "|Time: " + msg_time + "|Exchange_Symbol: " + exchange_symbol;

if ( id == 0 ) {
msg_action = "START_FUNCTION";
} else if ( id == 100 ) {
Expand All @@ -28,16 +38,23 @@ function processMessages (id,ts,exchange_name,exchange_symbol,exchange_wss)
} else {
msg_action = "UNKNOWN";
}
sysmsg[0].action = msg_action;
sysmsg[1] = sysmsg[1] + "|Action: " + msg_action;
console.log(sysmsg[0]);

if ( id == 0 ) {
sysmsg[0].action = msg_action;
sysmsg[1] = sysmsg[1] + "|Action: " + msg_action;
console.log(sysmsg[0]);
} else {
sysmsg[2].action = msg_action;
sysmsg[3] = sysmsg[1] + "|Action: " + msg_action;
console.log(sysmsg[2]);
}
}

// Function to subscribe to stream, transform data and publish to Redis from BITFINEX
function processBITFINEX(client, exchange_name,exchange_wss,exchange_symbol)
{
// Log Message
var ts = new Date().getTime();
var ts = Math.round((new Date()).getTime() / 1000);
processMessages ('0',ts,exchange_name,exchange_symbol,exchange_wss);

// Connect To Exchange
Expand Down Expand Up @@ -69,7 +86,8 @@ function processBITFINEX(client, exchange_name,exchange_wss,exchange_symbol)
// Get channel and symbol from response
if ( head == "subscribed" )
{
console.log( bc_queue, " channelID = ", resp.chanId, " currency = ", resp.pair);
//console.log( bc_queue, " channelID = ", resp.chanId, " currency = ", resp.pair);
processMessages ('300',ts,exchange_name,exchange_symbol,exchange_wss);
} else {
if ( resp[1] == "tu")
{
Expand All @@ -79,8 +97,9 @@ function processBITFINEX(client, exchange_name,exchange_wss,exchange_symbol)
tr_price=resp[5];
tr_amount=resp[6];
tr_side=( tr_amount > 0 ? "buy" : "sell" );
msg = { "tr_id": tr_id, "tr_timestamp": tr_timestamp, "tr_price": tr_price, "tr_amount": tr_amount, "tr_side": tr_side };
client.publish(bc_queue,JSON.stringify(msg));
publishRedis(tr_timestamp,tr_id,tr_price,tr_amount,tr_side,client,bc_queue);
// msg = { "tr_id": tr_id, "tr_timestamp": tr_timestamp, "tr_price": tr_price, "tr_amount": tr_amount, "tr_side": tr_side };
// client.publish(bc_queue,JSON.stringify(msg));
}
}
};
Expand All @@ -90,7 +109,7 @@ function processBITFINEX(client, exchange_name,exchange_wss,exchange_symbol)
function processHITBTC(client, exchange_name,exchange_wss,exchange_symbol)
{
// Log Message
var ts = new Date().getTime();
var ts = Math.round((new Date()).getTime() / 1000);
processMessages ('0',ts,exchange_name,exchange_symbol,exchange_wss);

// Connect To Exchange
Expand Down Expand Up @@ -118,12 +137,14 @@ function processHITBTC(client, exchange_name,exchange_wss,exchange_symbol)
wss.onmessage = (msg) => {

// parse response and create queue name
var message = JSON.parse([msg.data]);
//var message = JSON.parse([msg.data]);
var message = JSON.parse(msg.data);

// Get channel and symbol from response
if ( message.method == "snapshotTrades" )
{
console.log( bc_queue, " currency = ", message.params.symbol);
//console.log( bc_queue, " currency = ", message.params.symbol);
processMessages ('300',ts,exchange_name,exchange_symbol,exchange_wss);
} else {
if ( message.method == "updateTrades")
{
Expand All @@ -134,8 +155,9 @@ function processHITBTC(client, exchange_name,exchange_wss,exchange_symbol)
tr_price=message.params.data[i].price;
tr_amount=message.params.data[i].amount;
tr_side=message.params.data[i].side;
msg = { "tr_id": tr_id, "tr_timestamp": tr_timestamp, "tr_price": tr_price, "tr_amount": tr_amount, "tr_side": tr_side };
client.publish(bc_queue,JSON.stringify(msg));
publishRedis(tr_timestamp,tr_id,tr_price,tr_amount,tr_side,client,bc_queue);
// msg = { "tr_id": tr_id, "tr_timestamp": tr_timestamp, "tr_price": tr_price, "tr_amount": tr_amount, "tr_side": tr_side };
// client.publish(bc_queue,JSON.stringify(msg));
}
}
}
Expand All @@ -146,7 +168,7 @@ function processHITBTC(client, exchange_name,exchange_wss,exchange_symbol)
function processGEMINI(client,exchange_name,exchange_wss,exchange_symbol)
{
// Log Message
var ts = new Date().getTime();
var ts = Math.round((new Date()).getTime() / 1000);
processMessages ('0',ts,exchange_name,exchange_symbol,exchange_wss);

// Connect To Exchange
Expand Down Expand Up @@ -176,8 +198,9 @@ function processGEMINI(client,exchange_name,exchange_wss,exchange_symbol)
tr_price=message.events[i].price;
tr_side=( message.events[i+1].side == 'ask' ? 'sell' : 'buy' );
tr_timestamp=new Date(message.timestampms);
msg = { "tr_id": tr_id, "tr_timestamp": tr_timestamp, "tr_price": tr_price, "tr_amount": tr_amount, "tr_side": tr_side };
client.publish(bc_queue,JSON.stringify(msg));
publishRedis(tr_timestamp,tr_id,tr_price,tr_amount,tr_side,client,bc_queue);
// msg = { "tr_id": tr_id, "tr_timestamp": tr_timestamp, "tr_price": tr_price, "tr_amount": tr_amount, "tr_side": tr_side };
// client.publish(bc_queue,JSON.stringify(msg));
}
}
};
Expand Down Expand Up @@ -205,16 +228,17 @@ function processBINANCE(client,exchange_name,exchange_wss,exchange_symbol)
// https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md
tr_side=( data.maker == true ? 'sell' : 'buy' );
tr_timestamp=new Date (data.eventTime);
msg = { "tr_id": tr_id, "tr_timestamp": tr_timestamp, "tr_price": tr_price, "tr_amount": tr_amount, "tr_side": tr_side };
client.publish(bc_queue,JSON.stringify(msg));
publishRedis(tr_timestamp,tr_id,tr_price,tr_amount,tr_side,client,bc_queue);
// msg = { "tr_id": tr_id, "tr_timestamp": tr_timestamp, "tr_price": tr_price, "tr_amount": tr_amount, "tr_side": tr_side };
// client.publish(bc_queue,JSON.stringify(msg));
});
}

// Function to subscribe to stream, transform data and publish to Redis from HUOBIAPI
function processHUOBIAPI(client,exchange_name,exchange_wss,exchange_symbol)
{
// Log Message
var ts = new Date().getTime();
var ts = Math.round((new Date()).getTime() / 1000);
processMessages ('0',ts,exchange_name,exchange_symbol,exchange_wss);

// Define constants
Expand Down Expand Up @@ -263,8 +287,9 @@ function processHUOBIAPI(client,exchange_name,exchange_wss,exchange_symbol)
tr_price=msg.tick.close;
tr_side="DoNotKnow";
tr_timestamp=new Date(msg.ts);
msgout = { "tr_id": tr_id, "tr_timestamp": tr_timestamp, "tr_price": tr_price, "tr_amount": tr_amount, "tr_side": tr_side };
client.publish(bc_queue,JSON.stringify(msgout));
publishRedis(tr_timestamp,tr_id,tr_price,tr_amount,tr_side,client,bc_queue);
// msg = { "tr_id": tr_id, "tr_timestamp": tr_timestamp, "tr_price": tr_price, "tr_amount": tr_amount, "tr_side": tr_side };
// client.publish(bc_queue,JSON.stringify(msg));
}

}});
Expand All @@ -274,7 +299,7 @@ function processHUOBIAPI(client,exchange_name,exchange_wss,exchange_symbol)
function processBITTREX(client,exchange_name,exchange_wss,exchange_symbol)
{
// Log Message
var ts = new Date().getTime();
var ts = Math.round((new Date()).getTime() / 1000);
processMessages ('0',ts,exchange_name,exchange_symbol,exchange_wss);

// Define constants
Expand Down Expand Up @@ -311,7 +336,7 @@ function processBITTREX(client,exchange_name,exchange_wss,exchange_symbol)
function processOKEX(client, exchange_name,exchange_wss,exchange_symbol)
{
// Log Message
var ts = new Date().getTime();
var ts = Math.round((new Date()).getTime() / 1000);
processMessages ('0',ts,exchange_name,exchange_symbol,exchange_wss);

// Connect To Exchange
Expand Down Expand Up @@ -369,9 +394,9 @@ function processOKEX(client, exchange_name,exchange_wss,exchange_symbol)
// - if you are buying a stock you are going to get the ask price.
// not sure if what below is correct. Need to recheck
tr_side=( records[i][4] == "ask" ? "buy" : "sell" );
msgout = { "tr_id": tr_id, "tr_timestamp": tr_timestamp, "tr_price": tr_price, "tr_amount": tr_amount, "tr_side": tr_side };
client.publish(bc_queue,JSON.stringify(msgout));

publishRedis(tr_timestamp,tr_id,tr_price,tr_amount,tr_side,client,bc_queue);
// msg = { "tr_id": tr_id, "tr_timestamp": tr_timestamp, "tr_price": tr_price, "tr_amount": tr_amount, "tr_side": tr_side };
// client.publish(bc_queue,JSON.stringify(msg));
}
} else {
console.log("Unexpected record. Please investigate");
Expand All @@ -383,7 +408,7 @@ function processOKEX(client, exchange_name,exchange_wss,exchange_symbol)
function processGDAX(client, exchange_name,exchange_wss,exchange_symbol)
{
// Log Message
var ts = new Date().getTime();
var ts = Math.round((new Date()).getTime() / 1000);
processMessages ('0',ts,exchange_name,exchange_symbol,exchange_wss);

// Connect To Exchange
Expand Down Expand Up @@ -420,8 +445,9 @@ function processGDAX(client, exchange_name,exchange_wss,exchange_symbol)
// not sure if size is amount but was nearest match
var tr_amount = resp.size;
var tr_side = resp.side;
msgout = { "tr_id": tr_id, "tr_timestamp": tr_timestamp, "tr_price": tr_price, "tr_amount": tr_amount, "tr_side": tr_side };
client.publish(bc_queue,JSON.stringify(msgout));
publishRedis(tr_timestamp,tr_id,tr_price,tr_amount,tr_side,client,bc_queue);
// msg = { "tr_id": tr_id, "tr_timestamp": tr_timestamp, "tr_price": tr_price, "tr_amount": tr_amount, "tr_side": tr_side };
// client.publish(bc_queue,JSON.stringify(msg));
}
};
}
Expand All @@ -430,7 +456,7 @@ function processGDAX(client, exchange_name,exchange_wss,exchange_symbol)
function processBITSTAMP(client, exchange_name,exchange_wss,exchange_symbol)
{
// Log Message
var ts = new Date().getTime();
var ts = Math.round((new Date()).getTime() / 1000);
processMessages ('0',ts,exchange_name,exchange_symbol,exchange_wss);

// Parameter setup
Expand All @@ -448,8 +474,9 @@ function processBITSTAMP(client, exchange_name,exchange_wss,exchange_symbol)
tr_price = data.price;
tr_side=( data.type == "0" ? "buy" : "sell" );
tr_timestamp = new Date(data.timestamp * 1000 );
msgout = { "tr_id": tr_id, "tr_timestamp": tr_timestamp, "tr_price": tr_price, "tr_amount": tr_amount, "tr_side": tr_side };
client.publish(bc_queue,JSON.stringify(msgout));
publishRedis(tr_timestamp,tr_id,tr_price,tr_amount,tr_side,client,bc_queue);
// msg = { "tr_id": tr_id, "tr_timestamp": tr_timestamp, "tr_price": tr_price, "tr_amount": tr_amount, "tr_side": tr_side };
// client.publish(bc_queue,JSON.stringify(msg));
});
}

Expand Down
Loading

0 comments on commit 8311d8c

Please sign in to comment.