From c55f20ce6401838047c1cf5affe82ea48ab086b7 Mon Sep 17 00:00:00 2001 From: Gerard Moroney Date: Mon, 1 Jan 2018 20:08:09 +0000 Subject: [PATCH] Changes for #9 add function to server.js and managed to connect to websockets, parse and transform the feed for one currency/symbol. Next to add redis and other currencies. Shell script easy way to parse list into format configuration file can understand. --- config/default.json | 2 +- functions/exchange_functions.js | 67 ++++++++++++++++++++++++++++++++- listparse.sh | 13 +++++++ src/server.js | 18 ++++++++- 4 files changed, 96 insertions(+), 4 deletions(-) create mode 100755 listparse.sh diff --git a/config/default.json b/config/default.json index 03d36c6..af2a84d 100644 --- a/config/default.json +++ b/config/default.json @@ -85,7 +85,7 @@ "docurl": "https://www.okex.com/ws_getStarted.html", "wssurl": "wss://real.okex.com:10440/websocket/okexapi", "pairs": [ - { "symbol": "NONE"} + { "symbol": "LTC_BTC" } ], "active": "N" }, diff --git a/functions/exchange_functions.js b/functions/exchange_functions.js index c2b53cf..f495201 100644 --- a/functions/exchange_functions.js +++ b/functions/exchange_functions.js @@ -131,6 +131,7 @@ function processGEMINI(client,exchange_name,exchange_wss,exchange_symbol) { } } + // Function to subscribe to stream, transform data and publish to Redis from HUOBIAPI function processHUOBIAPI(client,exchange_name,exchange_wss,exchange_symbol) { @@ -185,7 +186,6 @@ function processHUOBIAPI(client,exchange_name,exchange_wss,exchange_symbol) }}); } - // Function to subscribe to stream, transform data and publish to Redis from BITTREX function processBITTREX(client,exchange_name,exchange_wss,exchange_symbol) { @@ -217,10 +217,73 @@ function processBITTREX(client,exchange_name,exchange_wss,exchange_symbol) } + +// Function to subscribe to stream, transform data and publish to Redis from OKEX +function processOKEX(client, exchange_name,exchange_wss,exchange_symbol) { + + // Connect To Exchange + const WebSocket = require('ws'); + const wss = new WebSocket(exchange_wss); + const channelName = 'ok_sub_spot_' + exchange_symbol + '_deals'; + // Open connection once one is established + wss.onopen = () => { + + // Send request to subscribe + wss.send(JSON.stringify( + { + 'event':'addChannel', + 'channel': channelName + } + )); + }; + + // Parse channel information and send to Redis + wss.onmessage = (msg) => { + var resp = JSON.parse(msg.data); + var head = resp.event; + var head_body = resp[1]; + var bc_queue = exchange_name + ':' + exchange_symbol; + + //console.log(msg.data); + // Get channel and symbol from response + console.log(resp[0].channel); + if ( resp[0].channel == "addChannel" ) + { + var channelName = resp[0].data.channel; + console.log( bc_queue, " channel name = ", channelName ); + } else if ( resp[0].data.channel == channelName ) + { + records = resp[0].data; + for ( i = 0; i < records.length; i++ ) + { + tr_timestamp=records[i][3]; + tr_id=records[i][0]; + tr_price=records[i][1]; + tr_amount=records[i][2]; + tr_side=( records[i][4] == "ask" ? "buy" : "sell" ); + console.log(tr_id,tr_side,tr_amount,tr_price,tr_timestamp); + } + + // Transform message and send to Redis channel named exchange:symbol + tr_timestamp=new Date(resp[4]*1000); + tr_id=resp[3]; + tr_price=resp[5]; + tr_amount=resp[6]; + tr_side=( tr_amount > 0 ? "buy" : "sell" ); + msg = { "tr_id": tr_id, "tr_timestamp": tr_timestamp, "tr_price": tr_price, "tr_amount": tr_amount, "tr_side": tr_side }; + client.publish(bc_queue,JSON.stringify(msg)); + } else { + console.log("Unexpected record. Please investigate"); + console.log(resp); + } + }; +} + module.exports = { processBITFINEX, processHITBTC, processGEMINI, processHUOBIAPI, - processBITTREX + processBITTREX, + processOKEX }; diff --git a/listparse.sh b/listparse.sh new file mode 100755 index 0000000..0146cb0 --- /dev/null +++ b/listparse.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +LINE_VAL="ltc_btc eth_btc etc_btc bch_btc btc_usdt eth_usdt ltc_usdt etc_usdt bch_usdt etc_eth bt1_btc bt2_btc btg_btc qtum_btc hsr_btc neo_btc gas_btc qtum_usdt hsr_usdt neo_usdt gas_usdt" + +LIST_VAL=`echo ${LINE_VAL} | tr " " "\n"` + +echo ${LINE_VAL} | tr " " "\n" | while read ITEM +do + ITEM=`echo ${ITEM} | tr '[a-z]' '[A-Z]'` + echo "{ \"symbol\": \"${ITEM}\" }," +done + +exit 0 diff --git a/src/server.js b/src/server.js index 32ffec2..5ef1472 100644 --- a/src/server.js +++ b/src/server.js @@ -25,6 +25,7 @@ function main () { var exchange_wss = config.exchanges[i].wssurl; var exchange_symbol_array = config.exchanges[i].pairs; + // Loop through config for BITFINEX & connect, transform and create Redis pub/sub if ( exchange_name == 'xBITFINEX') { for(var j = 0; j < exchange_symbol_array.length; j++) @@ -35,6 +36,7 @@ function main () { } } + // Loop through config for HITBTC & connect, transform and create Redis pub/sub if ( exchange_name == 'xHITBTC') { for(var j = 0; j < exchange_symbol_array.length; j++) @@ -45,6 +47,7 @@ function main () { } } + // Loop through config for GEMINI & connect, transform and create Redis pub/sub if ( exchange_name == 'xGEMINI') { for(var j = 0; j < exchange_symbol_array.length; j++) @@ -55,6 +58,7 @@ function main () { } } + // Loop through config for HUOBIAPI & connect, transform and create Redis pub/sub if ( exchange_name == 'xHUOBIAPI') { for(var j = 0; j < exchange_symbol_array.length; j++) @@ -65,7 +69,19 @@ function main () { } } - if ( exchange_name == 'BITTREX') + // Loop through config for OKEX & connect, transform and create Redis pub/sub + if ( exchange_name == 'OKEX') + { + for(var j = 0; j < exchange_symbol_array.length; j++) + { + exchange_symbol = exchange_symbol_array[j].symbol; + console.log(exchange_name, exchange_wss, exchange_symbol); + exFn.processOKEX(client, exchange_name, exchange_wss, exchange_symbol); + } + } + + // Loop through config for BITTREX & connect, transform and create Redis pub/sub + if ( exchange_name == 'xBITTREX') { for(var j = 0; j < exchange_symbol_array.length; j++) {