Skip to content

Commit

Permalink
integrated discord into reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
kanekoshoyu committed Oct 7, 2023
1 parent e266a64 commit e41e6d7
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 50 deletions.
198 changes: 196 additions & 2 deletions src/bin/chaiwala_service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,198 @@
use chaiwala::report::discord::task_discord_bot;
/// Executes triangular arbitrage
// api
use kucoin_api::{
client::{Kucoin, KucoinEnv},
model::market::OrderBookType,
model::websocket::{WSTopic, WSType},
};
// tasks
use kucoin_arbitrage::broker::gatekeeper::kucoin::task_gatekeep_chances;
use kucoin_arbitrage::broker::order::kucoin::task_place_order;
use kucoin_arbitrage::broker::orderbook::kucoin::{task_pub_orderbook_event, task_sync_orderbook};
use kucoin_arbitrage::broker::orderchange::kucoin::task_pub_orderchange_event;
use kucoin_arbitrage::broker::symbol::filter::{symbol_with_quotes, vector_to_hash};
use kucoin_arbitrage::broker::symbol::kucoin::{format_subscription_list, get_symbols};
use kucoin_arbitrage::strategy::all_taker_btc_usd::task_pub_chance_all_taker_btc_usd;
// events
use kucoin_arbitrage::event::chance::ChanceEvent;
use kucoin_arbitrage::event::order::OrderEvent;
use kucoin_arbitrage::event::orderbook::OrderbookEvent;
use kucoin_arbitrage::event::orderchange::OrderChangeEvent;
// models
use kucoin_arbitrage::model::counter::Counter;
use kucoin_arbitrage::model::orderbook::FullOrderbook;
use kucoin_arbitrage::translator::traits::OrderBookTranslator;
// async system
use std::sync::Arc;
use tokio::sync::broadcast::channel;
use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
unimplemented!("Implement the service as you promised!");
async fn main() -> Result<(), failure::Error> {
// Provides logging format
kucoin_arbitrage::logger::log_init();
log::info!("Log setup");

// Declares all the system counters
let api_input_counter = Arc::new(Mutex::new(Counter::new("api_input")));
let best_price_counter = Arc::new(Mutex::new(Counter::new("best_price")));
let chance_counter = Arc::new(Mutex::new(Counter::new("chance")));
let order_counter = Arc::new(Mutex::new(Counter::new("order")));

// Reads config.toml
let config = chaiwala::config::from_file("config.toml")?;
let discord_bot_token: String = config.discord.token.clone();
let discord_channel_id: u64 = config.discord.channel_id;
let core_config = config.core();
let monitor_interval = core_config.behaviour.monitor_interval_sec;
let budget = core_config.behaviour.usd_cyclic_arbitrage;

// Kucoin API Endpoints
let api: Kucoin = Kucoin::new(KucoinEnv::Live, Some(core_config.kucoin_credentials()))?;
let url_public = api.clone().get_socket_endpoint(WSType::Public).await?;
let url_private = api.clone().get_socket_endpoint(WSType::Private).await?;
log::info!("Credentials setup");

// Gets all symbols concurrently
let symbol_list = get_symbols(api.clone()).await;
log::info!("Total exchange symbols: {:?}", symbol_list.len());

// Filters with either btc or usdt as quote
let symbol_infos = symbol_with_quotes(&symbol_list, "BTC", "USDT");
let hash_symbols = Arc::new(Mutex::new(vector_to_hash(&symbol_infos)));
log::info!("Total symbols in scope: {:?}", symbol_infos.len());

// Changes a list of SymbolInfo into a 2D list of WSTopic per session in max 100 index
let subs = format_subscription_list(&symbol_infos);
log::info!("Total orderbook WS sessions: {:?}", subs.len());

// Creates broadcast channels
// for syncing public orderbook
let (tx_orderbook, rx_orderbook) = channel::<OrderbookEvent>(1024 * 2);
// for getting notable orderbook after syncing
let (tx_orderbook_best, rx_orderbook_best) = channel::<OrderbookEvent>(512);
// for getting chance
let (tx_chance, rx_chance) = channel::<ChanceEvent>(64);
// for placing order
let (tx_order, rx_order) = channel::<OrderEvent>(16);
// for getting private order changes
let (tx_orderchange, rx_orderchange) = channel::<OrderChangeEvent>(128);
// for reporting to Discord
let (tx_discord_message, rx_discord_message) = channel::<String>(256);
log::info!("Broadcast channels setup");

// Creates local orderbook
let orderbooks = Arc::new(Mutex::new(FullOrderbook::new()));
log::info!("Local orderbook setup");

// Infrastructure tasks
// USD cyclic arbitrage budget obtained from CONFIG
tokio::spawn(task_sync_orderbook(
rx_orderbook,
tx_orderbook_best,
orderbooks.clone(),
api_input_counter.clone(),
));
tokio::spawn(task_pub_chance_all_taker_btc_usd(
rx_orderbook_best,
tx_chance,
orderbooks.clone(),
hash_symbols,
budget as f64,
best_price_counter.clone(),
));
tokio::spawn(task_gatekeep_chances(
rx_chance,
rx_orderchange,
tx_order,
chance_counter.clone(),
));
tokio::spawn(task_place_order(
rx_order,
api.clone(),
order_counter.clone(),
));
tokio::spawn(task_discord_bot(
rx_discord_message,
discord_bot_token,
discord_channel_id,
));

// Gather all the orderbooks concurrently
let symbols: Vec<String> = symbol_infos.into_iter().map(|info| info.symbol).collect();
gather_orderbook_with_rest(symbols, api.clone(), orderbooks).await;

// TODO revert the flow, we should first setup the infrastructure, then setup the data flow

// Subscribes public orderbook WS per session, this is the source of data for the infrastructure tasks
for (i, sub) in subs.iter().enumerate() {
let mut ws_public = api.websocket();
ws_public.subscribe(url_public.clone(), sub.clone()).await?;
// TODO change to task_pub_orderbook_event
tokio::spawn(task_pub_orderbook_event(ws_public, tx_orderbook.clone()));
log::info!("{i:?}-th session of WS subscription setup");
}

// Subscribes private order change websocket
let mut ws_private = api.websocket();
ws_private
.subscribe(url_private.clone(), vec![WSTopic::TradeOrders])
.await?;
tokio::spawn(task_pub_orderchange_event(ws_private, tx_orderchange));

log::info!("All application tasks setup");

// Background routine
chaiwala::report::counter::system_monitor_task(
tx_discord_message,
vec![
api_input_counter.clone(),
best_price_counter.clone(),
chance_counter.clone(),
order_counter.clone(),
],
monitor_interval as u64,
)
.await?;
panic!("Program should not arrive here")
}

async fn gather_orderbook_with_rest(
symbols: Vec<String>,
api: Kucoin,
orderbooks: Arc<Mutex<FullOrderbook>>,
) {
let tasks: Vec<_> = symbols
.iter()
.map(|symbol| {
// clone variables per task before spawn
let api = api.clone();
let orderbooks_refcopy = orderbooks.clone();
let symbol = symbol.clone();

tokio::spawn(async move {
loop {
// log::info!("Obtaining initial orderbook[{}] from REST", symbol);
let res = api.get_orderbook(&symbol, OrderBookType::L100).await;
if res.is_err() {
log::warn!("orderbook[{}] did not respond, retry", &symbol);
continue;
}
let res = res.unwrap().data;
if res.is_none() {
log::warn!("orderbook[{}] received none, retry", &symbol);
continue;
}
let data = res.unwrap();
// log::info!("Initial sequence {}:{}", &symbol, data.sequence);
let mut x = orderbooks_refcopy.lock().await;
x.insert(symbol.to_string(), data.to_internal());
break;
}
})
})
.collect();
futures::future::join_all(tasks).await;
log::info!("Collected all the symbols");
}
2 changes: 1 addition & 1 deletion src/bin/test_discord_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ async fn main() -> Result<(), failure::Error> {
let (sender, receiver) = channel::<String>(256);

tokio::spawn(chaiwala::report::discord::task_discord_bot(
receiver,
discord_bot_token,
channel_id,
receiver,
));
let duration = Duration::from_secs(10);

Expand Down
91 changes: 48 additions & 43 deletions src/bin/test_kucoin_arbitrage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ async fn main() -> Result<(), failure::Error> {
let chance_counter = Arc::new(Mutex::new(Counter::new("chance")));
let order_counter = Arc::new(Mutex::new(Counter::new("order")));

// Credentials#
// TODO edit the credential to accomodate discord token
// or change to the use of toml files just to see if it gets easier in modularization
// Read Configs
let config = chaiwala::config::from_file("config.toml")?;
let core_config = config.core();
let monitor_interval = core_config.behaviour.monitor_interval_sec;
let budget = core_config.behaviour.usd_cyclic_arbitrage;
let api = Kucoin::new(KucoinEnv::Live, Some(core_config.kucoin_credentials()))?;

// Setup Kucoin API endpoints
let api: Kucoin = Kucoin::new(KucoinEnv::Live, Some(core_config.kucoin_credentials()))?;
let url_public = api.clone().get_socket_endpoint(WSType::Public).await?;
let url_private = api.clone().get_socket_endpoint(WSType::Private).await?;
log::info!("Credentials setup");
Expand Down Expand Up @@ -72,21 +72,21 @@ async fn main() -> Result<(), failure::Error> {
log::info!("Broadcast channels setup");

// Creates local orderbook
let full_orderbook = Arc::new(Mutex::new(FullOrderbook::new()));
let orderbooks = Arc::new(Mutex::new(FullOrderbook::new()));
log::info!("Local orderbook setup");

// Infrastructure tasks
// USD cyclic arbitrage budget obtained from CONFIG
tokio::spawn(task_sync_orderbook(
rx_orderbook,
tx_orderbook_best,
full_orderbook.clone(),
orderbooks.clone(),
api_input_counter.clone(),
));
tokio::spawn(task_pub_chance_all_taker_btc_usd(
rx_orderbook_best,
tx_chance,
full_orderbook.clone(),
orderbooks.clone(),
hash_symbols,
budget as f64,
best_price_counter.clone(),
Expand All @@ -103,43 +103,9 @@ async fn main() -> Result<(), failure::Error> {
order_counter.clone(),
));

// Extracts the names only
// Gather all the orderbooks concurrently
let symbols: Vec<String> = symbol_infos.into_iter().map(|info| info.symbol).collect();

// TODO place below in broker
// Uses REST to obtain the initial orderbook before subscribing to websocket
let tasks: Vec<_> = symbols
.iter()
.map(|symbol| {
// clone variables per task before spawn
let api = api.clone();
let full_orderbook_2 = full_orderbook.clone();
let symbol = symbol.clone();

tokio::spawn(async move {
loop {
// log::info!("Obtaining initial orderbook[{}] from REST", symbol);
let res = api.get_orderbook(&symbol, OrderBookType::L100).await;
if res.is_err() {
log::warn!("orderbook[{}] did not respond, retry", &symbol);
continue;
}
let res = res.unwrap().data;
if res.is_none() {
log::warn!("orderbook[{}] received none, retry", &symbol);
continue;
}
let data = res.unwrap();
// log::info!("Initial sequence {}:{}", &symbol, data.sequence);
let mut x = full_orderbook_2.lock().await;
x.insert(symbol.to_string(), data.to_internal());
break;
}
})
})
.collect();
futures::future::join_all(tasks).await;
log::info!("Collected all the symbols");
gather_orderbook_with_rest(symbols, api.clone(), orderbooks).await;

// TODO revert the flow, we should first setup the infrastructure, then setup the data flow

Expand Down Expand Up @@ -173,3 +139,42 @@ async fn main() -> Result<(), failure::Error> {
));
panic!("Program should not arrive here")
}

async fn gather_orderbook_with_rest(
symbols: Vec<String>,
api: Kucoin,
orderbooks: Arc<Mutex<FullOrderbook>>,
) {
let tasks: Vec<_> = symbols
.iter()
.map(|symbol| {
// clone variables per task before spawn
let api = api.clone();
let orderbooks_refcopy = orderbooks.clone();
let symbol = symbol.clone();

tokio::spawn(async move {
loop {
// log::info!("Obtaining initial orderbook[{}] from REST", symbol);
let res = api.get_orderbook(&symbol, OrderBookType::L100).await;
if res.is_err() {
log::warn!("orderbook[{}] did not respond, retry", &symbol);
continue;
}
let res = res.unwrap().data;
if res.is_none() {
log::warn!("orderbook[{}] received none, retry", &symbol);
continue;
}
let data = res.unwrap();
// log::info!("Initial sequence {}:{}", &symbol, data.sequence);
let mut x = orderbooks_refcopy.lock().await;
x.insert(symbol.to_string(), data.to_internal());
break;
}
})
})
.collect();
futures::future::join_all(tasks).await;
log::info!("Collected all the symbols");
}
2 changes: 1 addition & 1 deletion src/bin/test_signint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ fn cleanup_and_shutdown() {
// Add your cleanup and shutdown logic here
// For example, release resources, save state, close connections, etc.
println!("Cleanup and shutdown logic executed.");
}
}
43 changes: 43 additions & 0 deletions src/report/counter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use std::{sync::Arc, time::Duration};

use kucoin_arbitrage::{global::counter_helper, model::counter::Counter};
use tokio::sync::broadcast::Sender;
use tokio::sync::Mutex;
use tokio::time::sleep;

async fn report_counter_mps(
tx: Sender<String>,
counters: Vec<Arc<Mutex<Counter>>>,
interval: u64,
) -> Result<(), failure::Error> {
log::info!("Reporting broadcast data rate");
let mut status = String::new();
for counter in counters.iter() {
let (name, count) = {
let p = counter.lock().await;
(p.name, p.data_count)
};
let line = format!("{name:?}: {count:?} points ({:?}mps)", count / interval);
log::info!("{line}");
status += &line;
status += "\n";
// clear the data
counter_helper::reset(counter.clone()).await;
}
tx.send(status)?;
Ok(())
}

pub async fn system_monitor_task(
tx: Sender<String>,
counters: Vec<Arc<Mutex<Counter>>>,
interval: u64,
) -> Result<(), failure::Error> {
let monitor_delay = Duration::from_secs(interval);
loop {
sleep(monitor_delay).await;
report_counter_mps(tx.clone(), counters.clone(), interval)
.await
.expect("report status error");
}
}
Loading

0 comments on commit e41e6d7

Please sign in to comment.