diff --git a/Cargo.lock b/Cargo.lock index 2103be9..24ddcbe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,9 +19,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "aho-corasick" -version = "1.1.1" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea5d730647d4fadd988536d06fecce94b7b4f2a7efdae548f1cf4b63205518ab" +checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" dependencies = [ "memchr", ] @@ -82,9 +82,10 @@ checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", "axum-core", + "axum-macros", "base64 0.21.4", "bitflags 1.3.2", - "bytes", + "bytes 1.5.0", "futures-util", "headers", "http", @@ -117,7 +118,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" dependencies = [ "async-trait", - "bytes", + "bytes 1.5.0", "futures-util", "http", "http-body", @@ -127,6 +128,18 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-macros" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdca6a10ecad987bda04e95606ef85a5417dcaac1a78455242d72e031e2b6b62" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -135,7 +148,7 @@ checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" dependencies = [ "addr2line", "cc", - "cfg-if", + "cfg-if 1.0.0", "libc", "miniz_oxide", "object", @@ -229,6 +242,16 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" +[[package]] +name = "bytes" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c" +dependencies = [ + "byteorder", + "iovec", +] + [[package]] name = "bytes" version = "1.5.0" @@ -244,6 +267,12 @@ dependencies = [ "libc", ] +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + [[package]] name = "cfg-if" version = "1.0.0" @@ -252,13 +281,13 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chaiwala" -version = "0.1.4" +version = "0.1.5" dependencies = [ "axum", "chrono", "failure", "fern", - "futures", + "futures 0.3.28", "futures-util", "headers", "kucoin_api", @@ -285,6 +314,15 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "cloudabi" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "core-foundation" version = "0.9.3" @@ -316,7 +354,18 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", +] + +[[package]] +name = "crossbeam-utils" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +dependencies = [ + "autocfg", + "cfg-if 0.1.10", + "lazy_static", ] [[package]] @@ -347,10 +396,11 @@ checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" [[package]] name = "deranged" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2696e8a945f658fd14dc3b87242e6b80cd0f36ff04ea560fa39082368847946" +checksum = "0f32d04922c60427da6f9fef14d042d9edddef64cb9d4ce0d64d0685fbeb1fd3" dependencies = [ + "powerfmt", "serde", ] @@ -388,7 +438,7 @@ version = "0.8.33" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -412,25 +462,14 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "add4f07d43996f76ef320709726a556a9d4f965d9410d8d0271132d2f8293480" +checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860" dependencies = [ - "errno-dragonfly", "libc", "windows-sys", ] -[[package]] -name = "errno-dragonfly" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" -dependencies = [ - "cc", - "libc", -] - [[package]] name = "failure" version = "0.1.8" @@ -476,9 +515,9 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.27" +version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6c98ee8095e9d1dcbf2fcc6d95acccb90d1c81db1e44725c6a984b1dbdfb010" +checksum = "46303f565772937ffe1d394a4fac6f411c6013172fadde9dcdb1e147a086940e" dependencies = [ "crc32fast", "miniz_oxide", @@ -514,6 +553,28 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fuchsia-zircon" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" +dependencies = [ + "bitflags 1.3.2", + "fuchsia-zircon-sys", +] + +[[package]] +name = "fuchsia-zircon-sys" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" + +[[package]] +name = "futures" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" + [[package]] name = "futures" version = "0.3.28" @@ -628,7 +689,7 @@ version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "wasi", ] @@ -645,7 +706,7 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91fc23aa11be92976ef4729127f1a74adf36d8436f7816b185d18df956790833" dependencies = [ - "bytes", + "bytes 1.5.0", "fnv", "futures-core", "futures-sink", @@ -677,7 +738,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" dependencies = [ "base64 0.21.4", - "bytes", + "bytes 1.5.0", "headers-core", "http", "httpdate", @@ -694,6 +755,12 @@ dependencies = [ "http", ] +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + [[package]] name = "hermit-abi" version = "0.3.3" @@ -716,7 +783,7 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" dependencies = [ - "bytes", + "bytes 1.5.0", "fnv", "itoa", ] @@ -727,7 +794,7 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ - "bytes", + "bytes 1.5.0", "http", "pin-project-lite", ] @@ -756,7 +823,7 @@ version = "0.14.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468" dependencies = [ - "bytes", + "bytes 1.5.0", "futures-channel", "futures-core", "futures-util", @@ -794,7 +861,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ - "bytes", + "bytes 1.5.0", "hyper", "native-tls", "tokio", @@ -860,7 +927,16 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f97967975f448f1a7ddb12b0bc41069d09ed6a1c161a92687e057325db35d413" dependencies = [ - "bytes", + "bytes 1.5.0", +] + +[[package]] +name = "iovec" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" +dependencies = [ + "libc", ] [[package]] @@ -895,6 +971,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kernel32-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" +dependencies = [ + "winapi 0.2.8", + "winapi-build", +] + [[package]] name = "kucoin_api" version = "1.4.10" @@ -903,7 +989,7 @@ checksum = "5dab1ab081362b47c5808f531769e39d1a069690a40c10c2a553b9cc15e82aee" dependencies = [ "base64 0.12.3", "failure", - "futures", + "futures 0.3.28", "hmac", "pin-project", "reqwest", @@ -921,24 +1007,25 @@ dependencies = [ [[package]] name = "kucoin_arbitrage" -version = "0.0.11" +version = "0.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e82b1dc36bd17b1b2eef90ce1c804aa7a2204bc3ce142a7020d83852e0e170eb" +checksum = "088ce5b0ef07868fc8ab8320c0e885428fd827eaf75add7718fdb99b05032cb1" dependencies = [ "chrono", "env_logger", "failure", "fern", - "futures", + "futures 0.3.28", "kucoin_api", "lazy_static", "log", "num-traits", - "ordered-float 3.9.1", + "ordered-float 3.9.2", "rand", "serde", "serde_derive", "tokio", + "tokio-signal", "toml", ] @@ -956,9 +1043,18 @@ checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" [[package]] name = "linux-raw-sys" -version = "0.4.8" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3852614a3bd9ca9804678ba6be5e3b8ce76dfc902cae004e3e0c44051b6e88db" +checksum = "da2479e8c062e40bf0066ffa0bc823de0a9368974af99c9f6df941d2c231e03f" + +[[package]] +name = "lock_api" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4da24a77a3d8a6d4862d95f72e6fdb9c09a643ecdb402d754004a557f2bec75" +dependencies = [ + "scopeguard", +] [[package]] name = "lock_api" @@ -982,6 +1078,12 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "maybe-uninit" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" + [[package]] name = "memchr" version = "2.6.4" @@ -1013,6 +1115,25 @@ dependencies = [ "adler", ] +[[package]] +name = "mio" +version = "0.6.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4afd66f5b91bf2a3bc13fad0e21caedac168ca4c707504e75585648ae80e4cc4" +dependencies = [ + "cfg-if 0.1.10", + "fuchsia-zircon", + "fuchsia-zircon-sys", + "iovec", + "kernel32-sys", + "libc", + "log", + "miow", + "net2", + "slab", + "winapi 0.2.8", +] + [[package]] name = "mio" version = "0.8.8" @@ -1024,6 +1145,29 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "mio-uds" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0" +dependencies = [ + "iovec", + "libc", + "mio 0.6.23", +] + +[[package]] +name = "miow" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebd808424166322d4a38da87083bfddd3ac4c131334ed55856112eb06d46944d" +dependencies = [ + "kernel32-sys", + "net2", + "winapi 0.2.8", + "ws2_32-sys", +] + [[package]] name = "native-tls" version = "0.2.11" @@ -1042,11 +1186,22 @@ dependencies = [ "tempfile", ] +[[package]] +name = "net2" +version = "0.2.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b13b648036a2339d06de780866fbdfda0dde886de7b3af2ddeba8b14f4ee34ac" +dependencies = [ + "cfg-if 0.1.10", + "libc", + "winapi 0.3.9", +] + [[package]] name = "num-traits" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2" +checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" dependencies = [ "autocfg", ] @@ -1095,7 +1250,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bac25ee399abb46215765b1cb35bc0212377e58a061560d8b29b024fd0430e7c" dependencies = [ "bitflags 2.4.0", - "cfg-if", + "cfg-if 1.0.0", "foreign-types", "libc", "once_cell", @@ -1134,30 +1289,56 @@ dependencies = [ [[package]] name = "ordered-float" -version = "2.10.0" +version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7940cf2ca942593318d07fcf2596cdca60a85c9e7fab408a5e21a4f9dcd40d87" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" dependencies = [ "num-traits", ] [[package]] name = "ordered-float" -version = "3.9.1" +version = "3.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a54938017eacd63036332b4ae5c8a49fc8c0c1d6d629893057e4f13609edd06" +checksum = "f1e1c390732d15f1d48471625cd92d154e66db2c56645e29a9cd26f4699f72dc" dependencies = [ "num-traits", ] +[[package]] +name = "parking_lot" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f842b1982eb6c2fe34036a4fbfb06dd185a3f5c8edfaacdf7d1ea10b07de6252" +dependencies = [ + "lock_api 0.3.4", + "parking_lot_core 0.6.3", + "rustc_version", +] + [[package]] name = "parking_lot" version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ - "lock_api", - "parking_lot_core", + "lock_api 0.4.10", + "parking_lot_core 0.9.8", +] + +[[package]] +name = "parking_lot_core" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda66b810a62be75176a80873726630147a5ca780cd33921e0b5709033e66b0a" +dependencies = [ + "cfg-if 0.1.10", + "cloudabi", + "libc", + "redox_syscall 0.1.57", + "rustc_version", + "smallvec 0.6.14", + "winapi 0.3.9", ] [[package]] @@ -1166,10 +1347,10 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", - "redox_syscall", - "smallvec", + "redox_syscall 0.3.5", + "smallvec 1.11.1", "windows-targets", ] @@ -1217,6 +1398,12 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1225,9 +1412,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.68" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b1106fec09662ec6dd98ccac0f81cef56984d0b49f75c92d8cbad76e20c005c" +checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" dependencies = [ "unicode-ident", ] @@ -1271,6 +1458,12 @@ dependencies = [ "getrandom", ] +[[package]] +name = "redox_syscall" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" + [[package]] name = "redox_syscall" version = "0.3.5" @@ -1282,9 +1475,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.6" +version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebee201405406dbf528b8b672104ae6d6d63e6d118cb10e4d51abbc7b58044ff" +checksum = "aaac441002f822bc9705a681810a4dd2963094b9ca0ddc41cb963a4c189189ea" dependencies = [ "aho-corasick", "memchr", @@ -1294,9 +1487,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.9" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59b23e92ee4318893fa3fe3e6fb365258efbfe6ac6ab30f090cdcbb7aa37efa9" +checksum = "5011c7e263a695dc8ca064cddb722af1be54e517a280b12a5356f98366899e5d" dependencies = [ "aho-corasick", "memchr", @@ -1305,9 +1498,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.7.5" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" +checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "reqwest" @@ -1316,7 +1509,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" dependencies = [ "base64 0.21.4", - "bytes", + "bytes 1.5.0", "encoding_rs", "futures-core", "futures-util", @@ -1364,10 +1557,24 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", - "untrusted", + "spin 0.5.2", + "untrusted 0.7.1", "web-sys", - "winapi", + "winapi 0.3.9", +] + +[[package]] +name = "ring" +version = "0.17.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9babe80d5c16becf6594aa32ad2be8fe08498e7ae60b77de8df700e67f191d7e" +dependencies = [ + "cc", + "getrandom", + "libc", + "spin 0.9.8", + "untrusted 0.9.0", + "windows-sys", ] [[package]] @@ -1376,11 +1583,20 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc_version" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" +dependencies = [ + "semver", +] + [[package]] name = "rustix" -version = "0.38.17" +version = "0.38.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f25469e9ae0f3d0047ca8b93fc56843f38e6774f0914a107ff8b41be8be8e0b7" +checksum = "745ecfa778e66b2b63c88a61cb36e0eea109e803b0b86bf9879fbc77c70e86ed" dependencies = [ "bitflags 2.4.0", "errno", @@ -1396,7 +1612,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b80e3dec595989ea8510028f30c408a4630db12c9cbb8de34203b89d6577e99" dependencies = [ "log", - "ring", + "ring 0.16.20", "sct", "webpki", ] @@ -1408,7 +1624,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8" dependencies = [ "log", - "ring", + "ring 0.16.20", "rustls-webpki", "sct", ] @@ -1428,8 +1644,8 @@ version = "0.101.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c7d5dece342910d9ba34d259310cae3e0154b873b35408b787b59bce53d34fe" dependencies = [ - "ring", - "untrusted", + "ring 0.16.20", + "untrusted 0.7.1", ] [[package]] @@ -1465,8 +1681,8 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" dependencies = [ - "ring", - "untrusted", + "ring 0.16.20", + "untrusted 0.7.1", ] [[package]] @@ -1492,11 +1708,26 @@ dependencies = [ "libc", ] +[[package]] +name = "semver" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403" +dependencies = [ + "semver-parser", +] + +[[package]] +name = "semver-parser" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" + [[package]] name = "serde" -version = "1.0.188" +version = "1.0.189" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e" +checksum = "8e422a44e74ad4001bdc8eede9a4570ab52f71190e9c076d14369f38b9200537" dependencies = [ "serde_derive", ] @@ -1507,15 +1738,15 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" dependencies = [ - "ordered-float 2.10.0", + "ordered-float 2.10.1", "serde", ] [[package]] name = "serde_derive" -version = "1.0.188" +version = "1.0.189" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" +checksum = "1e48d1f918009ce3145511378cf68d613e3b3d9137d67272562080d68a2b32d5" dependencies = [ "proc-macro2", "quote", @@ -1574,10 +1805,10 @@ dependencies = [ "async-tungstenite", "base64 0.13.1", "bitflags 1.3.2", - "bytes", - "cfg-if", + "bytes 1.5.0", + "cfg-if 1.0.0", "flate2", - "futures", + "futures 0.3.28", "mime", "mime_guess", "percent-encoding", @@ -1599,7 +1830,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6" dependencies = [ "block-buffer 0.9.0", - "cfg-if", + "cfg-if 1.0.0", "cpufeatures", "digest 0.9.0", "opaque-debug 0.3.0", @@ -1611,7 +1842,7 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "cpufeatures", "digest 0.10.7", ] @@ -1622,7 +1853,7 @@ version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "cpufeatures", "digest 0.10.7", ] @@ -1657,6 +1888,15 @@ dependencies = [ "autocfg", ] +[[package]] +name = "smallvec" +version = "0.6.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97fcaeba89edba30f044a10c6a3cc39df9c3f17d7cd829dd1446cab35f890e0" +dependencies = [ + "maybe-uninit", +] + [[package]] name = "smallvec" version = "1.11.1" @@ -1670,7 +1910,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" dependencies = [ "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1689,6 +1929,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + [[package]] name = "streamunordered" version = "0.5.3" @@ -1774,9 +2020,9 @@ version = "3.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "fastrand", - "redox_syscall", + "redox_syscall 0.3.5", "rustix", "windows-sys", ] @@ -1812,12 +2058,13 @@ dependencies = [ [[package]] name = "time" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "426f806f4089c493dcac0d24c29c01e2c38baf8e30f1b716ee37e83d200b18fe" +checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" dependencies = [ "deranged", "itoa", + "powerfmt", "serde", "time-core", "time-macros", @@ -1855,16 +2102,16 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.32.0" +version = "1.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" +checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" dependencies = [ "backtrace", - "bytes", + "bytes 1.5.0", "libc", - "mio", + "mio 0.8.8", "num_cpus", - "parking_lot", + "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", "socket2 0.5.4", @@ -1872,6 +2119,27 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "tokio-executor" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb2d1b8f4548dbf5e1f7818512e9c406860678f29c300cdf0ebac72d1a3a1671" +dependencies = [ + "crossbeam-utils", + "futures 0.1.31", +] + +[[package]] +name = "tokio-io" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674" +dependencies = [ + "bytes 0.4.12", + "futures 0.1.31", + "log", +] + [[package]] name = "tokio-macros" version = "2.1.0" @@ -1893,6 +2161,25 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-reactor" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09bc590ec4ba8ba87652da2068d150dcada2cfa2e07faae270a5e0409aa51351" +dependencies = [ + "crossbeam-utils", + "futures 0.1.31", + "lazy_static", + "log", + "mio 0.6.23", + "num_cpus", + "parking_lot 0.9.0", + "slab", + "tokio-executor", + "tokio-io", + "tokio-sync", +] + [[package]] name = "tokio-rustls" version = "0.23.4" @@ -1914,6 +2201,33 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-signal" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0c34c6e548f101053321cba3da7cbb87a610b85555884c41b07da2eb91aff12" +dependencies = [ + "futures 0.1.31", + "libc", + "mio 0.6.23", + "mio-uds", + "signal-hook-registry", + "tokio-executor", + "tokio-io", + "tokio-reactor", + "winapi 0.3.9", +] + +[[package]] +name = "tokio-sync" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edfe50152bc8164fcc456dab7891fa9bf8beaf01c5ee7e1dd43a397c3cf87dee" +dependencies = [ + "fnv", + "futures 0.1.31", +] + [[package]] name = "tokio-tungstenite" version = "0.13.0" @@ -1947,7 +2261,7 @@ version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d" dependencies = [ - "bytes", + "bytes 1.5.0", "futures-core", "futures-sink", "pin-project-lite", @@ -2019,11 +2333,10 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.37" +version = "0.1.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +checksum = "ee2ef2af84856a50c1d430afce2fdded0a4ec7eda868db86409b4543df0797f9" dependencies = [ - "cfg-if", "log", "pin-project-lite", "tracing-attributes", @@ -2032,9 +2345,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.26" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", @@ -2043,9 +2356,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.31" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", ] @@ -2064,7 +2377,7 @@ checksum = "8ada8297e8d70872fa9a551d93250a9f407beb9f37ef86494eb20012a2ff7c24" dependencies = [ "base64 0.13.1", "byteorder", - "bytes", + "bytes 1.5.0", "http", "httparse", "input_buffer", @@ -2084,7 +2397,7 @@ checksum = "5fe8dada8c1a3aeca77d6b51a4f1314e0f4b8e438b7b1b71e3ddaca8080e4093" dependencies = [ "base64 0.13.1", "byteorder", - "bytes", + "bytes 1.5.0", "http", "httparse", "input_buffer", @@ -2105,7 +2418,7 @@ checksum = "e27992fd6a8c29ee7eef28fc78349aa244134e10ad447ce3b9f0ac0ed0fa4ce0" dependencies = [ "base64 0.13.1", "byteorder", - "bytes", + "bytes 1.5.0", "http", "httparse", "log", @@ -2125,7 +2438,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" dependencies = [ "byteorder", - "bytes", + "bytes 1.5.0", "data-encoding", "http", "httparse", @@ -2191,6 +2504,12 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.4.1" @@ -2242,7 +2561,7 @@ version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7706a72ab36d8cb1f80ffbf0e071533974a60d0a308d01a5d0375bf60499a342" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "wasm-bindgen-macro", ] @@ -2267,7 +2586,7 @@ version = "0.4.37" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "js-sys", "wasm-bindgen", "web-sys", @@ -2327,12 +2646,12 @@ dependencies = [ [[package]] name = "webpki" -version = "0.22.2" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07ecc0cd7cac091bf682ec5efa18b1cff79d617b84181f38b3951dbe135f607f" +checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" dependencies = [ - "ring", - "untrusted", + "ring 0.17.3", + "untrusted 0.9.0", ] [[package]] @@ -2350,6 +2669,12 @@ version = "0.25.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc" +[[package]] +name = "winapi" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" + [[package]] name = "winapi" version = "0.3.9" @@ -2360,6 +2685,12 @@ dependencies = [ "winapi-x86_64-pc-windows-gnu", ] +[[package]] +name = "winapi-build" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" + [[package]] name = "winapi-i686-pc-windows-gnu" version = "0.4.0" @@ -2372,7 +2703,7 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] @@ -2458,9 +2789,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "winnow" -version = "0.5.16" +version = "0.5.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "037711d82167854aff2018dfd193aa0fef5370f456732f0d5a0c59b0f1b4b907" +checksum = "a3b801d0e0a6726477cc207f60162da452f3a95adb368399bef20a946e06f65c" dependencies = [ "memchr", ] @@ -2471,6 +2802,16 @@ version = "0.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "windows-sys", ] + +[[package]] +name = "ws2_32-sys" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" +dependencies = [ + "winapi 0.2.8", + "winapi-build", +] diff --git a/Cargo.toml b/Cargo.toml index 8280b1b..7b80445 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "chaiwala" -version = "0.1.4" +version = "0.1.5" edition = "2021" authors = ["Sho Kaneko "] description = "Service Layer for Kucoin Arbitrage, along with Continuous Deployment" @@ -9,26 +9,33 @@ license = "MIT" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +# core kucoin_api = "1.4.10" -kucoin_arbitrage = "0.0.11" -log = "0.4.8" -failure = "0.1.8" +kucoin_arbitrage = "0.0.13" +# async +futures = "0.3.28" +futures-util = { version = "0.3.28", default-features = false, features = [ + "sink", + "std", +] } +tokio = { version = "1.33.0", features = ["full"] } +tokio-tungstenite = "0.20.1" +# log +log = "0.4.20" fern = "0.6.2" -chrono = "0.4" +chrono = "0.4.31" +# config +toml = "0.8.2" +serde = "1.0.189" +# debug +failure = "0.1.8" +# discord serenity = { version = "0.11.6", default-features = false, features = [ "client", "gateway", "rustls_backend", "model", ] } -serde = "1.0.188" -toml = "0.8.2" -tokio = { version = "1.29.1", features = ["full"] } -tokio-tungstenite = "0.20" -axum = { version = "0.6.19", features = ["ws", "headers"] } -futures = "0.3" -futures-util = { version = "0.3", default-features = false, features = [ - "sink", - "std", -] } -headers = "0.3" +# servers +axum = { version = "0.6.20", features = ["ws", "headers", "macros"] } +headers = "0.3.9" diff --git a/README.md b/README.md index 5d542f4..5471f5a 100644 --- a/README.md +++ b/README.md @@ -3,18 +3,34 @@ [![](https://img.shields.io/docsrs/chaiwala)](https://docs.rs/chaiwala) [![](https://img.shields.io/github/license/kanekoshoyu/chaiwala)](https://github.com/kanekoshoyu/chaiwala/blob/master/LICENSE) -Chaiwaka is a service layer for Kucoin Arbitrage, along with Continuous Deployment +Chaiwala is a service layer for Kucoin Arbitrage, along with Continuous Deployment -### Introduction -Having the perfect algorithm and software architecture is not enough for algo-trading. A low latency network environment is needed to properly place order, which highlights the need of deployment to the cloud. -As of now, Kucoin API has the lowest latency at AWS east japan, which suggests deployment over ECS or similar services using docker. To facilitate the effective remote debug reports and performance reports, this repo was set up to experiment hosting a webserver in event-driven async rust. +## Introduction +Perfect algorithms and software architectures are not enough for algo-trading. A low latency network environment is needed to properly place order, which highlights the need of deployment to the cloud. As of now, Kucoin API has the lowest latency at AWS east japan, which suggests deployment on ECS Fargate using docker. + +### Monitoring via Discord +kucoin_arbitrage's monitor mod is modified as report mod in chaiwala, which sends the MPS counter report to Discord channel in real time. + +### Core Runtime Management via REST +set core's runtime status using GET command i.e. +Enable: http://localhost:1080/set?status=Running +Disable: http://localhost:1080/set?status=Idle + +### Docker +Build docker image locally: `docker build . -t local-chaiwala -f ./.deploy/local.dockerfile` +Run local docker image: `docker run -p 80:1080 local-chaiwala:latest` + +## Features to be Included +| Feature | API | Status | +| -------------------------------------------- | ----------- | --------- | +| System warning report | Discord bot | Available | +| Arbitrage performance report via Discord bot | Discord bot | Available | +| Release build | Docker | Pending | +| AWS Continuous Deployment | Docker | Pending | +| Remote request process | REST | Available | +| Process management | REST | Available | +| Arbitrage broadcast | WebSocket | Pending | -### Features to be Included -- Continuous deployment to AWS using GitHub CI/Docker -- Arbitrage performance report via Discord Bot -- System warning report via Discord Bot -- Remote request process via REST -- Arbitrage broadcast via WebSocket ### Discord Server -[Join my discord ](https://discord.com/invite/uHbX7nSQ) +[Join my Discord](https://discord.gg/q3j5MYdwnm) diff --git a/src/bin/chaiwala_service.rs b/src/bin/chaiwala_service.rs index 9ab87e0..663f9a8 100644 --- a/src/bin/chaiwala_service.rs +++ b/src/bin/chaiwala_service.rs @@ -1,73 +1,139 @@ +/// Executes triangular arbitrage, as a service +use chaiwala::event; +use chaiwala::report::counter::task_log_mps; use chaiwala::report::discord::task_discord_bot; -/// Executes triangular arbitrage -// api -use kucoin_api::{ - client::{Kucoin, KucoinEnv}, - model::market::OrderBookType, - model::websocket::{WSTopic, WSType}, -}; -// tasks +use chaiwala::webserver::task_api_router; +use kucoin_api::client::{Kucoin, KucoinEnv}; use kucoin_arbitrage::broker::gatekeeper::kucoin::task_gatekeep_chances; use kucoin_arbitrage::broker::order::kucoin::task_place_order; -use kucoin_arbitrage::broker::orderbook::kucoin::{task_pub_orderbook_event, task_sync_orderbook}; +use kucoin_arbitrage::broker::orderbook::internal::task_sync_orderbook; +use kucoin_arbitrage::broker::orderbook::kucoin::{ + task_get_initial_orderbooks, task_pub_orderbook_event, +}; use kucoin_arbitrage::broker::orderchange::kucoin::task_pub_orderchange_event; use kucoin_arbitrage::broker::symbol::filter::{symbol_with_quotes, vector_to_hash}; use kucoin_arbitrage::broker::symbol::kucoin::{format_subscription_list, get_symbols}; -use kucoin_arbitrage::strategy::all_taker_btc_usd::task_pub_chance_all_taker_btc_usd; -// events -use kucoin_arbitrage::event::chance::ChanceEvent; -use kucoin_arbitrage::event::order::OrderEvent; -use kucoin_arbitrage::event::orderbook::OrderbookEvent; -use kucoin_arbitrage::event::orderchange::OrderChangeEvent; -// models -use kucoin_arbitrage::model::counter::Counter; +use kucoin_arbitrage::event::{ + chance::ChanceEvent, order::OrderEvent, orderbook::OrderbookEvent, + orderchange::OrderChangeEvent, +}; use kucoin_arbitrage::model::orderbook::FullOrderbook; -use kucoin_arbitrage::translator::traits::OrderBookTranslator; -// async system +use kucoin_arbitrage::monitor::counter; +use kucoin_arbitrage::strategy::all_taker_btc_usd::task_pub_chance_all_taker_btc_usd; use std::sync::Arc; -use tokio::sync::broadcast::channel; +use tokio::signal::unix::{signal, SignalKind}; +use tokio::sync::broadcast::{channel, Receiver, Sender}; use tokio::sync::Mutex; +use tokio::task::JoinSet; #[tokio::main] async fn main() -> Result<(), failure::Error> { - // Provides logging format + // logging format kucoin_arbitrage::logger::log_init(); log::info!("Log setup"); - // Declares all the system counters - let api_input_counter = Arc::new(Mutex::new(Counter::new("api_input"))); - let best_price_counter = Arc::new(Mutex::new(Counter::new("best_price"))); - let chance_counter = Arc::new(Mutex::new(Counter::new("chance"))); - let order_counter = Arc::new(Mutex::new(Counter::new("order"))); - - // Reads config.toml + // credentials let config = chaiwala::config::from_file("config.toml")?; + + // channels + let (tx_discord_message, rx_discord_message) = channel::(256); + let (tx_runtime_status, rx_runtime_status) = channel::(5); + + // setup http server + let msg = tokio::select! { + _ = task_signal_handle() => format!("received external signal"), + res = core_runtime(config.clone(), tx_discord_message, rx_runtime_status) => format!("core ended ({res:?})"), + res = service(config, rx_discord_message, tx_runtime_status) => format!("service ended ({res:?})"), + }; + log::info!("{msg}, Good bye!"); + Ok(()) +} +async fn service( + config: chaiwala::config::Config, + rx_discord_message: Receiver, + tx_runtime_status: Sender, +) -> Result<(), failure::Error> { let discord_bot_token: String = config.discord.token.clone(); let discord_channel_id: u64 = config.discord.channel_id; - let core_config = config.core(); - let monitor_interval = core_config.behaviour.monitor_interval_sec; - let budget = core_config.behaviour.usd_cyclic_arbitrage; - - // Kucoin API Endpoints - let api: Kucoin = Kucoin::new(KucoinEnv::Live, Some(core_config.kucoin_credentials()))?; - let url_public = api.clone().get_socket_endpoint(WSType::Public).await?; - let url_private = api.clone().get_socket_endpoint(WSType::Private).await?; + + let mut taskpool_service = tokio::task::JoinSet::new(); + + taskpool_service.spawn(task_discord_bot( + rx_discord_message, + discord_bot_token, + discord_channel_id, + )); + taskpool_service.spawn(task_api_router(tx_runtime_status)); + taskpool_service.join_next().await.unwrap()? +} + +async fn received_runtime_status( + mut rx_runtime_status: Receiver, + status: event::RuntimeStatus, +) -> Result<(), failure::Error> { + loop { + if let Ok(res) = rx_runtime_status.recv().await { + log::info!("Received runtime status request [{res:?}]"); + if res.eq(&status) { + return Ok(()); + } + } + } +} + +async fn core_runtime( + config: chaiwala::config::Config, + tx_discord_message: Sender, + rx_runtime_status: Receiver, +) -> Result<(), failure::Error> { + loop { + received_runtime_status( + rx_runtime_status.resubscribe(), + event::RuntimeStatus::Running, + ) + .await?; + let message: String = tokio::select! { + _ = core(config.clone().core(), tx_discord_message.clone()) => String::from("unexpected end of core"), + res = received_runtime_status(rx_runtime_status.resubscribe(), event::RuntimeStatus::Idle) =>format!("signal[{:?}]", res) + + }; + log::info!("Runtime set to Idle [{message}]"); + } +} +async fn core( + config: kucoin_arbitrage::config::Config, + tx_discord_message: Sender, +) -> Result<(), failure::Error> { + // TODO setup reporting with tx_discord_message + + // config parameters + let budget = config.behaviour.usd_cyclic_arbitrage; + let monitor_interval = config.behaviour.monitor_interval_sec; + + // system mps counters + let api_input_counter = Arc::new(Mutex::new(counter::Counter::new("api_input"))); + let best_price_counter = Arc::new(Mutex::new(counter::Counter::new("best_price"))); + let chance_counter = Arc::new(Mutex::new(counter::Counter::new("chance"))); + let order_counter = Arc::new(Mutex::new(counter::Counter::new("order"))); + + // API endpoints + let api = Kucoin::new(KucoinEnv::Live, Some(config.kucoin_credentials()))?; log::info!("Credentials setup"); - // Gets all symbols concurrently + // get all symbols concurrently let symbol_list = get_symbols(api.clone()).await; log::info!("Total exchange symbols: {:?}", symbol_list.len()); - // Filters with either btc or usdt as quote + // filter with either btc or usdt as quote let symbol_infos = symbol_with_quotes(&symbol_list, "BTC", "USDT"); let hash_symbols = Arc::new(Mutex::new(vector_to_hash(&symbol_infos))); log::info!("Total symbols in scope: {:?}", symbol_infos.len()); - // Changes a list of SymbolInfo into a 2D list of WSTopic per session in max 100 index + // list subscription using the filtered symbols let subs = format_subscription_list(&symbol_infos); log::info!("Total orderbook WS sessions: {:?}", subs.len()); - // Creates broadcast channels + // create broadcast channels // for syncing public orderbook let (tx_orderbook, rx_orderbook) = channel::(1024 * 2); // for getting notable orderbook after syncing @@ -78,73 +144,40 @@ async fn main() -> Result<(), failure::Error> { let (tx_order, rx_order) = channel::(16); // for getting private order changes let (tx_orderchange, rx_orderchange) = channel::(128); - // for reporting to Discord - let (tx_discord_message, rx_discord_message) = channel::(256); log::info!("Broadcast channels setup"); - // Creates local orderbook - let orderbooks = Arc::new(Mutex::new(FullOrderbook::new())); - log::info!("Local orderbook setup"); + // local orderbook + let full_orderbook = Arc::new(Mutex::new(FullOrderbook::new())); + log::info!("Local empty full orderbook setup"); - // Infrastructure tasks - // USD cyclic arbitrage budget obtained from CONFIG - tokio::spawn(task_sync_orderbook( + // infrastructure tasks + let mut taskpool_infrastructure = JoinSet::new(); + taskpool_infrastructure.spawn(task_sync_orderbook( rx_orderbook, tx_orderbook_best, - orderbooks.clone(), + full_orderbook.clone(), api_input_counter.clone(), )); - tokio::spawn(task_pub_chance_all_taker_btc_usd( + taskpool_infrastructure.spawn(task_pub_chance_all_taker_btc_usd( rx_orderbook_best, tx_chance, - orderbooks.clone(), + full_orderbook.clone(), hash_symbols, budget as f64, best_price_counter.clone(), )); - tokio::spawn(task_gatekeep_chances( + taskpool_infrastructure.spawn(task_gatekeep_chances( rx_chance, rx_orderchange, tx_order, chance_counter.clone(), )); - tokio::spawn(task_place_order( + taskpool_infrastructure.spawn(task_place_order( rx_order, api.clone(), order_counter.clone(), )); - tokio::spawn(task_discord_bot( - rx_discord_message, - discord_bot_token, - discord_channel_id, - )); - - // Gather all the orderbooks concurrently - let symbols: Vec = symbol_infos.into_iter().map(|info| info.symbol).collect(); - gather_orderbook_with_rest(symbols, api.clone(), orderbooks).await; - - // TODO revert the flow, we should first setup the infrastructure, then setup the data flow - - // Subscribes public orderbook WS per session, this is the source of data for the infrastructure tasks - for (i, sub) in subs.iter().enumerate() { - let mut ws_public = api.websocket(); - ws_public.subscribe(url_public.clone(), sub.clone()).await?; - // TODO change to task_pub_orderbook_event - tokio::spawn(task_pub_orderbook_event(ws_public, tx_orderbook.clone())); - log::info!("{i:?}-th session of WS subscription setup"); - } - - // Subscribes private order change websocket - let mut ws_private = api.websocket(); - ws_private - .subscribe(url_private.clone(), vec![WSTopic::TradeOrders]) - .await?; - tokio::spawn(task_pub_orderchange_event(ws_private, tx_orderchange)); - - log::info!("All application tasks setup"); - - // Background routine - chaiwala::report::counter::system_monitor_task( + taskpool_infrastructure.spawn(task_log_mps( tx_discord_message, vec![ api_input_counter.clone(), @@ -153,46 +186,46 @@ async fn main() -> Result<(), failure::Error> { order_counter.clone(), ], monitor_interval as u64, - ) - .await?; - panic!("Program should not arrive here") + )); + + // collect all initial orderbook states with REST + task_get_initial_orderbooks(api.clone(), symbol_infos, full_orderbook).await?; + log::info!("Aggregated all the symbols"); + let mut taskpool_subscription = JoinSet::new(); + // publishes OrderChangeEvent from private subscription + taskpool_subscription.spawn(task_pub_orderchange_event(api.clone(), tx_orderchange)); + // publishes OrderBookEvent from public subscription + for (i, sub) in subs.iter().enumerate() { + taskpool_subscription.spawn(task_pub_orderbook_event( + api.clone(), + sub.to_vec(), + tx_orderbook.clone(), + )); + log::info!("{i:?}-th session of WS subscription setup"); + } + + // terminate if taskpools failed + let message = tokio::select! { + res = taskpool_infrastructure.join_next() => + format!("infrastructure task pool error [{res:?}]"), + res = taskpool_subscription.join_next() => format!("subscription task pool error [{res:?}]"), + }; + Err(failure::err_msg(format!("unexpected error [{message}]"))) +} + +/// wait for any external terminating signal +async fn task_signal_handle() -> Result<(), failure::Error> { + let mut sigterm = signal(SignalKind::terminate()).unwrap(); + let mut sigint = signal(SignalKind::interrupt()).unwrap(); + tokio::select! { + _ = sigterm.recv() => exit_program("SIGTERM").await?, + _ = sigint.recv() => exit_program("SIGINT").await?, + }; + Ok(()) } -async fn gather_orderbook_with_rest( - symbols: Vec, - api: Kucoin, - orderbooks: Arc>, -) { - let tasks: Vec<_> = symbols - .iter() - .map(|symbol| { - // clone variables per task before spawn - let api = api.clone(); - let orderbooks_refcopy = orderbooks.clone(); - let symbol = symbol.clone(); - - tokio::spawn(async move { - loop { - // log::info!("Obtaining initial orderbook[{}] from REST", symbol); - let res = api.get_orderbook(&symbol, OrderBookType::L100).await; - if res.is_err() { - log::warn!("orderbook[{}] did not respond, retry", &symbol); - continue; - } - let res = res.unwrap().data; - if res.is_none() { - log::warn!("orderbook[{}] received none, retry", &symbol); - continue; - } - let data = res.unwrap(); - // log::info!("Initial sequence {}:{}", &symbol, data.sequence); - let mut x = orderbooks_refcopy.lock().await; - x.insert(symbol.to_string(), data.to_internal()); - break; - } - }) - }) - .collect(); - futures::future::join_all(tasks).await; - log::info!("Collected all the symbols"); +/// handle external signal +async fn exit_program(signal_alias: &str) -> Result<(), failure::Error> { + log::info!("Received [{signal_alias}] signal"); + Ok(()) } diff --git a/src/bin/test_http_command.rs b/src/bin/test_http_command.rs new file mode 100644 index 0000000..6ca477b --- /dev/null +++ b/src/bin/test_http_command.rs @@ -0,0 +1,112 @@ +use axum::body::Body; +use axum::http::Response; +use axum::routing::get; +use axum::{Extension, Router, Server}; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; +use tokio::select; +use tokio::sync::broadcast::{Receiver, Sender}; +use tokio::sync::{broadcast, Mutex}; +use tokio::time::sleep; + +async fn runtime() -> Result<(), failure::Error> { + println!("Bot is running"); + let duration = Duration::from_secs(5); + loop { + sleep(duration).await; + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Copy)] +enum HTTPCommand { + Start, + Stop, +} + +#[tokio::main] +async fn main() -> Result<(), failure::Error> { + // server should always be available, whereas the arbitrage code should be controllable + // build our application with a single route + let (tx_command, _) = broadcast::channel::(4); + let (tx_restart, mut rx_restart) = broadcast::channel::<()>(1); + let tx_cmd_ref = Arc::new(Mutex::new(tx_command)); + + let app = Router::new() + .route("/", get(|| async { "KuCoin Arbitrage is running" })) + .route("/start", get(handler_start)) + .route("/status", get(handler_status)) + .route("/stop", get(handler_stop)) + .layer(Extension(tx_cmd_ref.clone())); + + // Set the server address. + let localhost = [0, 0, 0, 0]; + let port = 1080; + let socket_address: SocketAddr = SocketAddr::from((localhost, port)); + + println!("Setup server at [{socket_address:?}]"); + // run server + let server = Server::bind(&socket_address); + tokio::spawn(server.serve(app.into_make_service())); + loop { + // await for a new command received to restart + println!("Waiting for a HTTP::START command to run the command"); + let mut rx_command = tx_cmd_ref.lock().await.clone().subscribe(); + let cmd = rx_command.recv().await?; + if cmd != HTTPCommand::Start { + continue; + } + println!("starting command"); + let rx_command = tx_cmd_ref.lock().await.clone().subscribe(); + tokio::spawn(handler_http_command(rx_command, tx_restart.clone())); + // two futures, app vs received signal + let received_stop = rx_restart.recv(); + let app = runtime(); + // TODO find a better solution than select! that can kill another task async + select! { + res = app => { + match res { + Ok(_) => println!("app completed successfully"), + Err(err) => eprintln!("app error: {:?}", err), + } + } + _ = received_stop => { + println!("received stop successfully"); + } + } + } +} + +async fn handler_http_command( + mut rx_command: Receiver, + tx: Sender<()>, +) -> Result<(), failure::Error> { + loop { + let cmd = rx_command.recv().await?; + println!("Received command: {cmd:?}"); + if cmd == HTTPCommand::Stop { + tx.send(())?; + } + } +} + +async fn handler_status() -> Response { + // Add your code to terminate your service here. + let status: bool = false; + let msg: String = format!("status: {status:?}"); + Response::new(Body::from(msg)) +} + +async fn handler_stop(tx: Extension>>>) -> Response { + // Add your code to terminate your service here. + tx.lock().await.send(HTTPCommand::Stop).unwrap(); + let msg = "Service terminated."; + Response::new(Body::from(msg)) +} + +async fn handler_start(tx: Extension>>>) -> Response { + // Add your code to restart your service here. + tx.lock().await.send(HTTPCommand::Start).unwrap(); + let msg = "Service restarted."; + Response::new(Body::from(msg)) +} diff --git a/src/bin/test_http_lib.rs b/src/bin/test_http_lib.rs new file mode 100644 index 0000000..935cef1 --- /dev/null +++ b/src/bin/test_http_lib.rs @@ -0,0 +1,28 @@ +use chaiwala::event; +use chaiwala::event::RuntimeStatus; +use chaiwala::webserver; +use tokio::sync::broadcast; +#[tokio::main] +async fn main() -> Result<(), failure::Error> { + // logging format + kucoin_arbitrage::logger::log_init(); + log::info!("Log setup"); + + // build our application with a single route + let (tx_status, rx_status) = broadcast::channel::(2); + + let mut taskpool = tokio::task::JoinSet::new(); + taskpool.spawn(webserver::task_api_router(tx_status)); + taskpool.spawn(recv_status(rx_status)); + taskpool.join_next().await; + Ok(()) +} + +async fn recv_status( + mut rx_status: broadcast::Receiver, +) -> Result<(), failure::Error> { + loop { + let res = rx_status.recv().await?; + log::info!("recv_status receievd: [{res:?}]"); + } +} diff --git a/src/bin/test_kucoin_arbitrage.rs b/src/bin/test_kucoin_arbitrage.rs index ab9977d..f93d0ae 100644 --- a/src/bin/test_kucoin_arbitrage.rs +++ b/src/bin/test_kucoin_arbitrage.rs @@ -1,12 +1,11 @@ /// Executes triangular arbitrage -use kucoin_api::{ - client::{Kucoin, KucoinEnv}, - model::market::OrderBookType, - model::websocket::{WSTopic, WSType}, -}; +use kucoin_api::client::{Kucoin, KucoinEnv}; use kucoin_arbitrage::broker::gatekeeper::kucoin::task_gatekeep_chances; use kucoin_arbitrage::broker::order::kucoin::task_place_order; -use kucoin_arbitrage::broker::orderbook::kucoin::{task_pub_orderbook_event, task_sync_orderbook}; +use kucoin_arbitrage::broker::orderbook::internal::task_sync_orderbook; +use kucoin_arbitrage::broker::orderbook::kucoin::{ + task_get_initial_orderbooks, task_pub_orderbook_event, +}; use kucoin_arbitrage::broker::orderchange::kucoin::task_pub_orderchange_event; use kucoin_arbitrage::broker::symbol::filter::{symbol_with_quotes, vector_to_hash}; use kucoin_arbitrage::broker::symbol::kucoin::{format_subscription_list, get_symbols}; @@ -14,51 +13,63 @@ use kucoin_arbitrage::event::{ chance::ChanceEvent, order::OrderEvent, orderbook::OrderbookEvent, orderchange::OrderChangeEvent, }; -use kucoin_arbitrage::model::{counter::Counter, orderbook::FullOrderbook}; +use kucoin_arbitrage::model::orderbook::FullOrderbook; +use kucoin_arbitrage::monitor::counter::Counter; +use kucoin_arbitrage::monitor::task::task_log_mps; use kucoin_arbitrage::strategy::all_taker_btc_usd::task_pub_chance_all_taker_btc_usd; -use kucoin_arbitrage::translator::traits::OrderBookTranslator; use std::sync::Arc; +use tokio::signal::unix::{signal, SignalKind}; use tokio::sync::broadcast::channel; use tokio::sync::Mutex; +use tokio::task::JoinSet; #[tokio::main] async fn main() -> Result<(), failure::Error> { - // Provides logging format + // logging format kucoin_arbitrage::logger::log_init(); log::info!("Log setup"); - // Declares all the system counters + // credentials + let config = kucoin_arbitrage::config::from_file("config.toml")?; + + tokio::select! { + _ = task_signal_handle() => println!("received external signal, terminating program"), + res = core(config) => println!("core ended first {res:?}"), + }; + + println!("Good bye!"); + Ok(()) +} + +async fn core(config: kucoin_arbitrage::config::Config) -> Result<(), failure::Error> { + // config parameters + let budget = config.behaviour.usd_cyclic_arbitrage; + let monitor_interval = config.behaviour.monitor_interval_sec; + + // system mps counters let api_input_counter = Arc::new(Mutex::new(Counter::new("api_input"))); let best_price_counter = Arc::new(Mutex::new(Counter::new("best_price"))); let chance_counter = Arc::new(Mutex::new(Counter::new("chance"))); let order_counter = Arc::new(Mutex::new(Counter::new("order"))); - // Read Configs - let config = chaiwala::config::from_file("config.toml")?; - let core_config = config.core(); - let monitor_interval = core_config.behaviour.monitor_interval_sec; - let budget = core_config.behaviour.usd_cyclic_arbitrage; - - // Setup Kucoin API endpoints - let api: Kucoin = Kucoin::new(KucoinEnv::Live, Some(core_config.kucoin_credentials()))?; - let url_public = api.clone().get_socket_endpoint(WSType::Public).await?; - let url_private = api.clone().get_socket_endpoint(WSType::Private).await?; + // API endpoints + let api = Kucoin::new(KucoinEnv::Live, Some(config.kucoin_credentials()))?; log::info!("Credentials setup"); - // Gets all symbols concurrently + // get all symbols concurrently let symbol_list = get_symbols(api.clone()).await; log::info!("Total exchange symbols: {:?}", symbol_list.len()); - // Filters with either btc or usdt as quote + // filter with either btc or usdt as quote let symbol_infos = symbol_with_quotes(&symbol_list, "BTC", "USDT"); let hash_symbols = Arc::new(Mutex::new(vector_to_hash(&symbol_infos))); log::info!("Total symbols in scope: {:?}", symbol_infos.len()); - // Changes a list of SymbolInfo into a 2D list of WSTopic per session in max 100 index + // list subscription using the filtered symbols let subs = format_subscription_list(&symbol_infos); log::info!("Total orderbook WS sessions: {:?}", subs.len()); - // Creates broadcast channels + // create broadcast channels // for syncing public orderbook let (tx_orderbook, rx_orderbook) = channel::(1024 * 2); // for getting notable orderbook after syncing @@ -71,110 +82,85 @@ async fn main() -> Result<(), failure::Error> { let (tx_orderchange, rx_orderchange) = channel::(128); log::info!("Broadcast channels setup"); - // Creates local orderbook - let orderbooks = Arc::new(Mutex::new(FullOrderbook::new())); - log::info!("Local orderbook setup"); + // local orderbook + let full_orderbook = Arc::new(Mutex::new(FullOrderbook::new())); + log::info!("Local empty full orderbook setup"); - // Infrastructure tasks - // USD cyclic arbitrage budget obtained from CONFIG - tokio::spawn(task_sync_orderbook( + // infrastructure tasks + let mut taskpool_infrastructure = JoinSet::new(); + taskpool_infrastructure.spawn(task_sync_orderbook( rx_orderbook, tx_orderbook_best, - orderbooks.clone(), + full_orderbook.clone(), api_input_counter.clone(), )); - tokio::spawn(task_pub_chance_all_taker_btc_usd( + taskpool_infrastructure.spawn(task_pub_chance_all_taker_btc_usd( rx_orderbook_best, tx_chance, - orderbooks.clone(), + full_orderbook.clone(), hash_symbols, budget as f64, best_price_counter.clone(), )); - tokio::spawn(task_gatekeep_chances( + taskpool_infrastructure.spawn(task_gatekeep_chances( rx_chance, rx_orderchange, tx_order, chance_counter.clone(), )); - tokio::spawn(task_place_order( + taskpool_infrastructure.spawn(task_place_order( rx_order, api.clone(), order_counter.clone(), )); - - // Gather all the orderbooks concurrently - let symbols: Vec = symbol_infos.into_iter().map(|info| info.symbol).collect(); - gather_orderbook_with_rest(symbols, api.clone(), orderbooks).await; - - // TODO revert the flow, we should first setup the infrastructure, then setup the data flow - - // Subscribes public orderbook WS per session, this is the source of data for the infrastructure tasks - for (i, sub) in subs.iter().enumerate() { - let mut ws_public = api.websocket(); - ws_public.subscribe(url_public.clone(), sub.clone()).await?; - // TODO change to task_pub_orderbook_event - tokio::spawn(task_pub_orderbook_event(ws_public, tx_orderbook.clone())); - log::info!("{i:?}-th session of WS subscription setup"); - } - - // Subscribes private order change websocket - let mut ws_private = api.websocket(); - ws_private - .subscribe(url_private.clone(), vec![WSTopic::TradeOrders]) - .await?; - tokio::spawn(task_pub_orderchange_event(ws_private, tx_orderchange)); - - log::info!("All application tasks setup"); - - // Background routine - let _ = tokio::join!(kucoin_arbitrage::global::task::background_routine( + taskpool_infrastructure.spawn(task_log_mps( vec![ api_input_counter.clone(), best_price_counter.clone(), chance_counter.clone(), - order_counter.clone() + order_counter.clone(), ], - monitor_interval as u64 + monitor_interval as u64, )); - panic!("Program should not arrive here") + + // collect all initial orderbook states with REST + task_get_initial_orderbooks(api.clone(), symbol_infos, full_orderbook).await?; + log::info!("Aggregated all the symbols"); + let mut taskpool_subscription = JoinSet::new(); + // publishes OrderChangeEvent from private subscription + taskpool_subscription.spawn(task_pub_orderchange_event(api.clone(), tx_orderchange)); + // publishes OrderBookEvent from public subscription + for (i, sub) in subs.iter().enumerate() { + taskpool_subscription.spawn(task_pub_orderbook_event( + api.clone(), + sub.to_vec(), + tx_orderbook.clone(), + )); + log::info!("{i:?}-th session of WS subscription setup"); + } + + // terminate if taskpools failed + let message = tokio::select! { + res = taskpool_infrastructure.join_next() => + format!("Infrastructure task pool error [{res:?}]"), + res = taskpool_subscription.join_next() => format!("Subscription task pool error [{res:?}]"), + }; + Err(failure::err_msg(format!("unexpected error [{message}]"))) +} + +/// wait for any external terminating signal +async fn task_signal_handle() -> Result<(), failure::Error> { + let mut sigterm = signal(SignalKind::terminate()).unwrap(); + let mut sigint = signal(SignalKind::interrupt()).unwrap(); + tokio::select! { + _ = sigterm.recv() => exit_program("SIGTERM").await?, + _ = sigint.recv() => exit_program("SIGINT").await?, + }; + Ok(()) } -async fn gather_orderbook_with_rest( - symbols: Vec, - api: Kucoin, - orderbooks: Arc>, -) { - let tasks: Vec<_> = symbols - .iter() - .map(|symbol| { - // clone variables per task before spawn - let api = api.clone(); - let orderbooks_refcopy = orderbooks.clone(); - let symbol = symbol.clone(); - - tokio::spawn(async move { - loop { - // log::info!("Obtaining initial orderbook[{}] from REST", symbol); - let res = api.get_orderbook(&symbol, OrderBookType::L100).await; - if res.is_err() { - log::warn!("orderbook[{}] did not respond, retry", &symbol); - continue; - } - let res = res.unwrap().data; - if res.is_none() { - log::warn!("orderbook[{}] received none, retry", &symbol); - continue; - } - let data = res.unwrap(); - // log::info!("Initial sequence {}:{}", &symbol, data.sequence); - let mut x = orderbooks_refcopy.lock().await; - x.insert(symbol.to_string(), data.to_internal()); - break; - } - }) - }) - .collect(); - futures::future::join_all(tasks).await; - log::info!("Collected all the symbols"); +/// handle external signal +async fn exit_program(signal_alias: &str) -> Result<(), failure::Error> { + log::info!("Received [{signal_alias}] signal"); + Ok(()) } diff --git a/src/bin/test_ws.rs b/src/bin/test_ws.rs index ff07c08..a32410f 100644 --- a/src/bin/test_ws.rs +++ b/src/bin/test_ws.rs @@ -34,15 +34,18 @@ async fn main() { // router let app = Router::new() // HTTP - .route("/", axum::routing::get(chai_handler::handle_http)) + .route( + "/", + axum::routing::get(chai_handler::http::plain_hello_world), + ) // Websocket .route( "/broadcast", - axum::routing::get(chai_handler::handle_ws_broadcast), + axum::routing::get(chai_handler::ws::handler_broadcast), ) .route( "/pingpong", - axum::routing::get(chai_handler::handle_ws_pingpong), + axum::routing::get(chai_handler::ws::handler_ping_pong), ) // Adds extension for broadcast receiver .layer(Extension(Arc::new(Mutex::new(rx)))); diff --git a/src/config.rs b/src/config.rs index 867fd18..cec9061 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2,7 +2,7 @@ use kucoin_arbitrage::error::Error; use serde::Deserialize; use std::fs; -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct Config { pub kucoin: kucoin_arbitrage::config::KuCoin, pub behaviour: kucoin_arbitrage::config::Behaviour, @@ -17,7 +17,7 @@ impl Config { } } -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct Discord { pub token: String, pub channel_id: u64, diff --git a/src/event.rs b/src/event.rs new file mode 100644 index 0000000..4984468 --- /dev/null +++ b/src/event.rs @@ -0,0 +1,8 @@ +use serde::{Deserialize, Serialize}; +/// Core runtime status +#[derive(Debug, Clone, PartialEq, Eq, Copy, Default, Deserialize, Serialize)] +pub enum RuntimeStatus { + #[default] + Idle, + Running, +} diff --git a/src/handler/http.rs b/src/handler/http.rs new file mode 100644 index 0000000..1e22aaa --- /dev/null +++ b/src/handler/http.rs @@ -0,0 +1,39 @@ +use crate::event; +use axum::body::Body; +use axum::extract::{Query, TypedHeader}; +use axum::http::Response; +use axum::Extension; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tokio::sync::{broadcast, Mutex}; + +#[derive(Debug, Deserialize, Serialize)] +pub struct SetStatusParam { + status: event::RuntimeStatus, +} + +/// HTTP handler, returns plain text +pub async fn plain_hello_world( + user_agent: Option>, +) -> Response { + log::info!("Connected: {}", user_agent.unwrap().as_str()); + Response::new(Body::from("Hello, World!".to_string())) +} + +/// Set runtime status by query +pub async fn handler_set_status( + Query(param): Query, + tx: Extension>>>, +) -> Response { + let msg = format!( + "Received REST command to set runtime status as [{:?}]", + param.status + ); + // Add your code to restart your service here. + let tx = tx.lock().await; + if tx.send(param.status).is_err() { + log::warn!("problem publishing runtime status, check receiver side") + } + log::info!("{msg}"); + Response::new(Body::from(msg)) +} diff --git a/src/handler/mod.rs b/src/handler/mod.rs new file mode 100644 index 0000000..aefd7d0 --- /dev/null +++ b/src/handler/mod.rs @@ -0,0 +1,4 @@ +/// HTTP handlers +pub mod http; +/// WebSocket handlers +pub mod ws; diff --git a/src/handler.rs b/src/handler/ws.rs similarity index 71% rename from src/handler.rs rename to src/handler/ws.rs index 5b3d546..a5ad6b5 100644 --- a/src/handler.rs +++ b/src/handler/ws.rs @@ -4,37 +4,26 @@ use axum::Extension; use std::sync::Arc; use tokio::sync::{broadcast, Mutex}; -/// HTTP handler, returns plain text -pub async fn handle_http(user_agent: Option>) -> &'static str { - log::info!("Connected: {}", user_agent.unwrap().as_str()); - "Hello, World!" -} - /// WebSocket handler, returns response from callback -pub async fn handle_ws_broadcast( +pub async fn handler_broadcast( ws: WebSocketUpgrade, user_agent: Option>, rx: Extension>>>, ) -> impl axum::response::IntoResponse { - // callback upon reception log::info!("Connected: {}", user_agent.unwrap().as_str()); - - ws.on_upgrade(move |socket: WebSocket| ws_upgrade_callback(socket, rx.0)) + ws.on_upgrade(move |socket: WebSocket| publish_index(socket, rx.0)) } /// Websocket Callback that sends received data from broadcast -async fn ws_upgrade_callback(mut ws: WebSocket, rx: Arc>>) { - // TODO spawn both the broadcast loop and the receiver loop for real-time control - // while websocket is on connection +async fn publish_index(mut ws: WebSocket, rx: Arc>>) { while let Ok(number) = rx.lock().await.recv().await { ws.send(Message::Text(format!("{number}"))).await.unwrap(); } - // sends Message::Close() ws.close().await.unwrap(); } /// WebSocket handler, returns response from callback -pub async fn handle_ws_pingpong( +pub async fn handler_ping_pong( ws: WebSocketUpgrade, user_agent: Option>, ) -> impl axum::response::IntoResponse { @@ -42,11 +31,11 @@ pub async fn handle_ws_pingpong( log::info!("Connected: {}", user_agent.as_str()); } - ws.on_upgrade(ws_callback_pingpong) + ws.on_upgrade(pub_received) } /// Websocket Callback that sends received data -async fn ws_callback_pingpong(mut socket: WebSocket) { +async fn pub_received(mut socket: WebSocket) { loop { let res = socket.recv().await; if res.is_none() { diff --git a/src/lib.rs b/src/lib.rs index 9814df3..d5b40ac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,11 +1,12 @@ -/// Logger intialization -pub mod logger; - +/// Config +pub mod config; +/// Event +pub mod event; /// Handlers pub mod handler; - -/// Config reader -pub mod config; - -/// report +/// Logger intialization +pub mod logger; +/// Report pub mod report; +/// server +pub mod webserver; diff --git a/src/report/counter.rs b/src/report/counter.rs index 03a7c93..b928e58 100644 --- a/src/report/counter.rs +++ b/src/report/counter.rs @@ -1,42 +1,42 @@ -use std::{sync::Arc, time::Duration}; - -use kucoin_arbitrage::{global::counter_helper, model::counter::Counter}; +use kucoin_arbitrage::monitor::counter; +use std::sync::Arc; +use std::time::Duration; use tokio::sync::broadcast::Sender; use tokio::sync::Mutex; use tokio::time::sleep; -async fn report_counter_mps( - tx: Sender, - counters: Vec>>, +/// log counters +async fn log_mps( + tx_message: Sender, + counters: Vec>>, interval: u64, -) -> Result<(), failure::Error> { - log::info!("Reporting broadcast data rate"); - let mut status = String::new(); +) -> Result<(), kucoin_api::failure::Error> { + let title = String::from("Broadcast channel data rate"); + log::info!("{title}"); + tx_message.send(title)?; for counter in counters.iter() { let (name, count) = { let p = counter.lock().await; (p.name, p.data_count) }; - let line = format!("{name:?}: {count:?} points ({:?}mps)", count / interval); - log::info!("{line}"); - status += &line; - status += "\n"; + let message = format!("{name:10}: {count:5} points ({:5}mps)", count / interval); + log::info!("{message}"); + tx_message.send(message)?; // clear the data - counter_helper::reset(counter.clone()).await; + counter::reset(counter.clone()).await; } - tx.send(status)?; Ok(()) } - -pub async fn system_monitor_task( - tx: Sender, - counters: Vec>>, +/// log counters as a task +pub async fn task_log_mps( + tx_message: Sender, + counters: Vec>>, interval: u64, -) -> Result<(), failure::Error> { +) -> Result<(), kucoin_api::failure::Error> { let monitor_delay = Duration::from_secs(interval); loop { sleep(monitor_delay).await; - report_counter_mps(tx.clone(), counters.clone(), interval) + log_mps(tx_message.clone(), counters.clone(), interval) .await .expect("report status error"); } diff --git a/src/webserver.rs b/src/webserver.rs new file mode 100644 index 0000000..cde1e37 --- /dev/null +++ b/src/webserver.rs @@ -0,0 +1,39 @@ +use crate::event; +use crate::handler; +use axum::body::Body; +use axum::http::Response; +use axum::routing::get; +use axum::{Extension, Router, Server}; +use std::net::SocketAddr; +use std::sync::Arc; +use tokio::sync::{broadcast, Mutex}; + +pub async fn task_api_router( + tx_status: broadcast::Sender, +) -> Result<(), failure::Error> { + // mutex since handlers gets spawned + let tx_cmd_ref = Arc::new(Mutex::new(tx_status)); + let app = Router::new() + .route("/", get(handler::http::plain_hello_world)) + .route("/set", get(handler::http::handler_set_status)) + .route("/status", get(handler_status)) + .layer(Extension(tx_cmd_ref.clone())); + + // Set the server address. + let localhost = [0, 0, 0, 0]; + let port = 1080; + let socket_address: SocketAddr = SocketAddr::from((localhost, port)); + + log::info!("Setup server at [{socket_address:?}]"); + // run server + let server = Server::bind(&socket_address); + Ok(server.serve(app.into_make_service()).await?) +} + +async fn handler_status() -> Response { + // Add your code to terminate your service here. + let status: bool = false; + let msg: String = format!("status: {status:?}"); + log::info!("{msg}"); + Response::new(Body::from(msg)) +}