From e41e6d73995efd6600a7c65b5f3c0a7304232256 Mon Sep 17 00:00:00 2001 From: Sho Kaneko Date: Sat, 7 Oct 2023 20:53:09 +0100 Subject: [PATCH] integrated discord into reporting --- src/bin/chaiwala_service.rs | 198 ++++++++++++++++++++++++++++++- src/bin/test_discord_task.rs | 2 +- src/bin/test_kucoin_arbitrage.rs | 91 +++++++------- src/bin/test_signint.rs | 2 +- src/report/counter.rs | 43 +++++++ src/report/discord.rs | 5 +- src/report/mod.rs | 2 + 7 files changed, 293 insertions(+), 50 deletions(-) create mode 100644 src/report/counter.rs diff --git a/src/bin/chaiwala_service.rs b/src/bin/chaiwala_service.rs index 1ccb047..9ab87e0 100644 --- a/src/bin/chaiwala_service.rs +++ b/src/bin/chaiwala_service.rs @@ -1,4 +1,198 @@ +use chaiwala::report::discord::task_discord_bot; +/// Executes triangular arbitrage +// api +use kucoin_api::{ + client::{Kucoin, KucoinEnv}, + model::market::OrderBookType, + model::websocket::{WSTopic, WSType}, +}; +// tasks +use kucoin_arbitrage::broker::gatekeeper::kucoin::task_gatekeep_chances; +use kucoin_arbitrage::broker::order::kucoin::task_place_order; +use kucoin_arbitrage::broker::orderbook::kucoin::{task_pub_orderbook_event, task_sync_orderbook}; +use kucoin_arbitrage::broker::orderchange::kucoin::task_pub_orderchange_event; +use kucoin_arbitrage::broker::symbol::filter::{symbol_with_quotes, vector_to_hash}; +use kucoin_arbitrage::broker::symbol::kucoin::{format_subscription_list, get_symbols}; +use kucoin_arbitrage::strategy::all_taker_btc_usd::task_pub_chance_all_taker_btc_usd; +// events +use kucoin_arbitrage::event::chance::ChanceEvent; +use kucoin_arbitrage::event::order::OrderEvent; +use kucoin_arbitrage::event::orderbook::OrderbookEvent; +use kucoin_arbitrage::event::orderchange::OrderChangeEvent; +// models +use kucoin_arbitrage::model::counter::Counter; +use kucoin_arbitrage::model::orderbook::FullOrderbook; +use kucoin_arbitrage::translator::traits::OrderBookTranslator; +// async system +use std::sync::Arc; +use tokio::sync::broadcast::channel; +use tokio::sync::Mutex; + #[tokio::main] -async fn main() { - unimplemented!("Implement the service as you promised!"); +async fn main() -> Result<(), failure::Error> { + // Provides logging format + kucoin_arbitrage::logger::log_init(); + log::info!("Log setup"); + + // Declares all the system counters + let api_input_counter = Arc::new(Mutex::new(Counter::new("api_input"))); + let best_price_counter = Arc::new(Mutex::new(Counter::new("best_price"))); + let chance_counter = Arc::new(Mutex::new(Counter::new("chance"))); + let order_counter = Arc::new(Mutex::new(Counter::new("order"))); + + // Reads config.toml + let config = chaiwala::config::from_file("config.toml")?; + let discord_bot_token: String = config.discord.token.clone(); + let discord_channel_id: u64 = config.discord.channel_id; + let core_config = config.core(); + let monitor_interval = core_config.behaviour.monitor_interval_sec; + let budget = core_config.behaviour.usd_cyclic_arbitrage; + + // Kucoin API Endpoints + let api: Kucoin = Kucoin::new(KucoinEnv::Live, Some(core_config.kucoin_credentials()))?; + let url_public = api.clone().get_socket_endpoint(WSType::Public).await?; + let url_private = api.clone().get_socket_endpoint(WSType::Private).await?; + log::info!("Credentials setup"); + + // Gets all symbols concurrently + let symbol_list = get_symbols(api.clone()).await; + log::info!("Total exchange symbols: {:?}", symbol_list.len()); + + // Filters with either btc or usdt as quote + let symbol_infos = symbol_with_quotes(&symbol_list, "BTC", "USDT"); + let hash_symbols = Arc::new(Mutex::new(vector_to_hash(&symbol_infos))); + log::info!("Total symbols in scope: {:?}", symbol_infos.len()); + + // Changes a list of SymbolInfo into a 2D list of WSTopic per session in max 100 index + let subs = format_subscription_list(&symbol_infos); + log::info!("Total orderbook WS sessions: {:?}", subs.len()); + + // Creates broadcast channels + // for syncing public orderbook + let (tx_orderbook, rx_orderbook) = channel::(1024 * 2); + // for getting notable orderbook after syncing + let (tx_orderbook_best, rx_orderbook_best) = channel::(512); + // for getting chance + let (tx_chance, rx_chance) = channel::(64); + // for placing order + let (tx_order, rx_order) = channel::(16); + // for getting private order changes + let (tx_orderchange, rx_orderchange) = channel::(128); + // for reporting to Discord + let (tx_discord_message, rx_discord_message) = channel::(256); + log::info!("Broadcast channels setup"); + + // Creates local orderbook + let orderbooks = Arc::new(Mutex::new(FullOrderbook::new())); + log::info!("Local orderbook setup"); + + // Infrastructure tasks + // USD cyclic arbitrage budget obtained from CONFIG + tokio::spawn(task_sync_orderbook( + rx_orderbook, + tx_orderbook_best, + orderbooks.clone(), + api_input_counter.clone(), + )); + tokio::spawn(task_pub_chance_all_taker_btc_usd( + rx_orderbook_best, + tx_chance, + orderbooks.clone(), + hash_symbols, + budget as f64, + best_price_counter.clone(), + )); + tokio::spawn(task_gatekeep_chances( + rx_chance, + rx_orderchange, + tx_order, + chance_counter.clone(), + )); + tokio::spawn(task_place_order( + rx_order, + api.clone(), + order_counter.clone(), + )); + tokio::spawn(task_discord_bot( + rx_discord_message, + discord_bot_token, + discord_channel_id, + )); + + // Gather all the orderbooks concurrently + let symbols: Vec = symbol_infos.into_iter().map(|info| info.symbol).collect(); + gather_orderbook_with_rest(symbols, api.clone(), orderbooks).await; + + // TODO revert the flow, we should first setup the infrastructure, then setup the data flow + + // Subscribes public orderbook WS per session, this is the source of data for the infrastructure tasks + for (i, sub) in subs.iter().enumerate() { + let mut ws_public = api.websocket(); + ws_public.subscribe(url_public.clone(), sub.clone()).await?; + // TODO change to task_pub_orderbook_event + tokio::spawn(task_pub_orderbook_event(ws_public, tx_orderbook.clone())); + log::info!("{i:?}-th session of WS subscription setup"); + } + + // Subscribes private order change websocket + let mut ws_private = api.websocket(); + ws_private + .subscribe(url_private.clone(), vec![WSTopic::TradeOrders]) + .await?; + tokio::spawn(task_pub_orderchange_event(ws_private, tx_orderchange)); + + log::info!("All application tasks setup"); + + // Background routine + chaiwala::report::counter::system_monitor_task( + tx_discord_message, + vec![ + api_input_counter.clone(), + best_price_counter.clone(), + chance_counter.clone(), + order_counter.clone(), + ], + monitor_interval as u64, + ) + .await?; + panic!("Program should not arrive here") +} + +async fn gather_orderbook_with_rest( + symbols: Vec, + api: Kucoin, + orderbooks: Arc>, +) { + let tasks: Vec<_> = symbols + .iter() + .map(|symbol| { + // clone variables per task before spawn + let api = api.clone(); + let orderbooks_refcopy = orderbooks.clone(); + let symbol = symbol.clone(); + + tokio::spawn(async move { + loop { + // log::info!("Obtaining initial orderbook[{}] from REST", symbol); + let res = api.get_orderbook(&symbol, OrderBookType::L100).await; + if res.is_err() { + log::warn!("orderbook[{}] did not respond, retry", &symbol); + continue; + } + let res = res.unwrap().data; + if res.is_none() { + log::warn!("orderbook[{}] received none, retry", &symbol); + continue; + } + let data = res.unwrap(); + // log::info!("Initial sequence {}:{}", &symbol, data.sequence); + let mut x = orderbooks_refcopy.lock().await; + x.insert(symbol.to_string(), data.to_internal()); + break; + } + }) + }) + .collect(); + futures::future::join_all(tasks).await; + log::info!("Collected all the symbols"); } diff --git a/src/bin/test_discord_task.rs b/src/bin/test_discord_task.rs index c4cd144..09e4ec6 100644 --- a/src/bin/test_discord_task.rs +++ b/src/bin/test_discord_task.rs @@ -13,9 +13,9 @@ async fn main() -> Result<(), failure::Error> { let (sender, receiver) = channel::(256); tokio::spawn(chaiwala::report::discord::task_discord_bot( + receiver, discord_bot_token, channel_id, - receiver, )); let duration = Duration::from_secs(10); diff --git a/src/bin/test_kucoin_arbitrage.rs b/src/bin/test_kucoin_arbitrage.rs index 974db4e..ab9977d 100644 --- a/src/bin/test_kucoin_arbitrage.rs +++ b/src/bin/test_kucoin_arbitrage.rs @@ -33,14 +33,14 @@ async fn main() -> Result<(), failure::Error> { let chance_counter = Arc::new(Mutex::new(Counter::new("chance"))); let order_counter = Arc::new(Mutex::new(Counter::new("order"))); - // Credentials# - // TODO edit the credential to accomodate discord token - // or change to the use of toml files just to see if it gets easier in modularization + // Read Configs let config = chaiwala::config::from_file("config.toml")?; let core_config = config.core(); let monitor_interval = core_config.behaviour.monitor_interval_sec; let budget = core_config.behaviour.usd_cyclic_arbitrage; - let api = Kucoin::new(KucoinEnv::Live, Some(core_config.kucoin_credentials()))?; + + // Setup Kucoin API endpoints + let api: Kucoin = Kucoin::new(KucoinEnv::Live, Some(core_config.kucoin_credentials()))?; let url_public = api.clone().get_socket_endpoint(WSType::Public).await?; let url_private = api.clone().get_socket_endpoint(WSType::Private).await?; log::info!("Credentials setup"); @@ -72,7 +72,7 @@ async fn main() -> Result<(), failure::Error> { log::info!("Broadcast channels setup"); // Creates local orderbook - let full_orderbook = Arc::new(Mutex::new(FullOrderbook::new())); + let orderbooks = Arc::new(Mutex::new(FullOrderbook::new())); log::info!("Local orderbook setup"); // Infrastructure tasks @@ -80,13 +80,13 @@ async fn main() -> Result<(), failure::Error> { tokio::spawn(task_sync_orderbook( rx_orderbook, tx_orderbook_best, - full_orderbook.clone(), + orderbooks.clone(), api_input_counter.clone(), )); tokio::spawn(task_pub_chance_all_taker_btc_usd( rx_orderbook_best, tx_chance, - full_orderbook.clone(), + orderbooks.clone(), hash_symbols, budget as f64, best_price_counter.clone(), @@ -103,43 +103,9 @@ async fn main() -> Result<(), failure::Error> { order_counter.clone(), )); - // Extracts the names only + // Gather all the orderbooks concurrently let symbols: Vec = symbol_infos.into_iter().map(|info| info.symbol).collect(); - - // TODO place below in broker - // Uses REST to obtain the initial orderbook before subscribing to websocket - let tasks: Vec<_> = symbols - .iter() - .map(|symbol| { - // clone variables per task before spawn - let api = api.clone(); - let full_orderbook_2 = full_orderbook.clone(); - let symbol = symbol.clone(); - - tokio::spawn(async move { - loop { - // log::info!("Obtaining initial orderbook[{}] from REST", symbol); - let res = api.get_orderbook(&symbol, OrderBookType::L100).await; - if res.is_err() { - log::warn!("orderbook[{}] did not respond, retry", &symbol); - continue; - } - let res = res.unwrap().data; - if res.is_none() { - log::warn!("orderbook[{}] received none, retry", &symbol); - continue; - } - let data = res.unwrap(); - // log::info!("Initial sequence {}:{}", &symbol, data.sequence); - let mut x = full_orderbook_2.lock().await; - x.insert(symbol.to_string(), data.to_internal()); - break; - } - }) - }) - .collect(); - futures::future::join_all(tasks).await; - log::info!("Collected all the symbols"); + gather_orderbook_with_rest(symbols, api.clone(), orderbooks).await; // TODO revert the flow, we should first setup the infrastructure, then setup the data flow @@ -173,3 +139,42 @@ async fn main() -> Result<(), failure::Error> { )); panic!("Program should not arrive here") } + +async fn gather_orderbook_with_rest( + symbols: Vec, + api: Kucoin, + orderbooks: Arc>, +) { + let tasks: Vec<_> = symbols + .iter() + .map(|symbol| { + // clone variables per task before spawn + let api = api.clone(); + let orderbooks_refcopy = orderbooks.clone(); + let symbol = symbol.clone(); + + tokio::spawn(async move { + loop { + // log::info!("Obtaining initial orderbook[{}] from REST", symbol); + let res = api.get_orderbook(&symbol, OrderBookType::L100).await; + if res.is_err() { + log::warn!("orderbook[{}] did not respond, retry", &symbol); + continue; + } + let res = res.unwrap().data; + if res.is_none() { + log::warn!("orderbook[{}] received none, retry", &symbol); + continue; + } + let data = res.unwrap(); + // log::info!("Initial sequence {}:{}", &symbol, data.sequence); + let mut x = orderbooks_refcopy.lock().await; + x.insert(symbol.to_string(), data.to_internal()); + break; + } + }) + }) + .collect(); + futures::future::join_all(tasks).await; + log::info!("Collected all the symbols"); +} diff --git a/src/bin/test_signint.rs b/src/bin/test_signint.rs index 927f581..086c353 100644 --- a/src/bin/test_signint.rs +++ b/src/bin/test_signint.rs @@ -33,4 +33,4 @@ fn cleanup_and_shutdown() { // Add your cleanup and shutdown logic here // For example, release resources, save state, close connections, etc. println!("Cleanup and shutdown logic executed."); -} \ No newline at end of file +} diff --git a/src/report/counter.rs b/src/report/counter.rs new file mode 100644 index 0000000..03a7c93 --- /dev/null +++ b/src/report/counter.rs @@ -0,0 +1,43 @@ +use std::{sync::Arc, time::Duration}; + +use kucoin_arbitrage::{global::counter_helper, model::counter::Counter}; +use tokio::sync::broadcast::Sender; +use tokio::sync::Mutex; +use tokio::time::sleep; + +async fn report_counter_mps( + tx: Sender, + counters: Vec>>, + interval: u64, +) -> Result<(), failure::Error> { + log::info!("Reporting broadcast data rate"); + let mut status = String::new(); + for counter in counters.iter() { + let (name, count) = { + let p = counter.lock().await; + (p.name, p.data_count) + }; + let line = format!("{name:?}: {count:?} points ({:?}mps)", count / interval); + log::info!("{line}"); + status += &line; + status += "\n"; + // clear the data + counter_helper::reset(counter.clone()).await; + } + tx.send(status)?; + Ok(()) +} + +pub async fn system_monitor_task( + tx: Sender, + counters: Vec>>, + interval: u64, +) -> Result<(), failure::Error> { + let monitor_delay = Duration::from_secs(interval); + loop { + sleep(monitor_delay).await; + report_counter_mps(tx.clone(), counters.clone(), interval) + .await + .expect("report status error"); + } +} diff --git a/src/report/discord.rs b/src/report/discord.rs index cfaa491..81d6874 100644 --- a/src/report/discord.rs +++ b/src/report/discord.rs @@ -7,7 +7,6 @@ use serenity::model::prelude::ChannelId; use serenity::prelude::*; use serenity::Client; use std::sync::Arc; -use tokio::sync::broadcast::Receiver; // Bot configuration struct Bot; @@ -28,9 +27,9 @@ impl EventHandler for Bot { } pub async fn task_discord_bot( + receiver: tokio::sync::broadcast::Receiver, token: String, channel_id: u64, - receiver: Receiver, ) -> Result<(), failure::Error> { println!("Initializing discord bot client"); @@ -59,7 +58,7 @@ pub async fn task_discord_bot( async fn channel_broadcast( channel_id: u64, http: Arc, - mut receiver: Receiver, + mut receiver: tokio::sync::broadcast::Receiver, ) -> Result<(), failure::Error> { let channel = ChannelId(channel_id); loop { diff --git a/src/report/mod.rs b/src/report/mod.rs index a373a3a..839f6be 100644 --- a/src/report/mod.rs +++ b/src/report/mod.rs @@ -1,2 +1,4 @@ +/// Counter +pub mod counter; /// Discord pub mod discord;