Skip to content

Commit

Permalink
Merge pull request #3 from kanekoshoyu/feature/ws_framework
Browse files Browse the repository at this point in the history
Feature/ws framework
  • Loading branch information
kanekoshoyu authored Aug 20, 2023
2 parents 87586a2 + d541ce4 commit d710385
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 110 deletions.
1 change: 1 addition & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
components: clippy
override: true
- uses: actions-rs/clippy-check@v1
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[package]
name = "chaiwala"
version = "0.1.1"
version = "0.1.2"
edition = "2021"
authors = ["Sho Kaneko <[email protected]>"]
description = "Endpoints for Kucoin Arbitrage Deployment"
description = "Endpoints for Kucoin Arbitrage ECS Deployment"
repository = "https://github.com/kanekoshoyu/chaiwala"
license = "MIT"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
25 changes: 25 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Rust 2021 latest image
FROM rust:1.71

# Metadata
LABEL maintainer="[email protected]"

# The /app directory should act as the main application directory
WORKDIR /app

# Either clone the repo remotely
# RUN git clone https://github.com/kanekoshoyu/chaiwala.git
# WORKDIR /app/chaiwala

# Or copy the files locally
COPY ./ ./

# Build release
RUN cargo build --bin ws_broadcast --release

# Open application endpoints
EXPOSE 3000

# Run the binary
# CMD ["cargo", "run", "--release", "--bin", "ws_broadcast"]
CMD ["./target/release/ws_broadcast"]
2 changes: 2 additions & 0 deletions rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[toolchain]
channel = "nightly"
2 changes: 1 addition & 1 deletion src/bin/helloworld.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ async fn main() {
let app = Router::new().route("/", get(|| async { "Chaiwala!" }));

// Set the server address.
let socket_address = SocketAddr::from(([127, 0, 0, 1], 3000));
let socket_address: SocketAddr = SocketAddr::from(([0, 0, 0, 0], 3000));

// run it with hyper on localhost:3000
let server = Server::bind(&socket_address);
Expand Down
61 changes: 22 additions & 39 deletions src/bin/ws_broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,11 @@
use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
use axum::Extension;
use axum::Router;
use axum::{Extension, Router, Server};
use chaiwala::handler as chai_handler;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::{broadcast, Mutex};

async fn handle_http() -> &'static str {
"Hello, World!"
}

/// Spawn number generator and websocket callback
async fn handle_ws(
ws: WebSocketUpgrade,
rx: Extension<Arc<Mutex<broadcast::Receiver<i32>>>>,
) -> impl axum::response::IntoResponse {
// callback upon reception
ws.on_upgrade(move |socket: WebSocket| ws_upgrade_callback(socket, rx.0))
}

/// Send number to the broadcast
async fn generate_numbers(tx: broadcast::Sender<i32>) {
/// broadcast numbers
async fn broadcast_numbers(tx: broadcast::Sender<i32>) {
let mut i = 0;
loop {
tx.send(i).unwrap();
Expand All @@ -32,16 +18,6 @@ async fn generate_numbers(tx: broadcast::Sender<i32>) {
}
}

// Websocket Callback
async fn ws_upgrade_callback(mut ws: WebSocket, rx: Arc<Mutex<broadcast::Receiver<i32>>>) {
// while websocket is on connection
while let Ok(number) = rx.lock().await.recv().await {
ws.send(Message::Text(format!("{number}"))).await.unwrap();
}
// sends Message::Close()
ws.close().await.unwrap();
}

#[tokio::main]
async fn main() {
if std::env::var_os("RUST_LOG").is_none() {
Expand All @@ -52,22 +28,29 @@ async fn main() {
// Setup broadcast
let (tx, rx) = broadcast::channel::<i32>(100);

let arc_rx = Arc::new(Mutex::new(rx));

// Clone the sender to pass it to the broadcast loop
tokio::spawn(generate_numbers(tx));
// Spawn a global number broadcast
tokio::spawn(broadcast_numbers(tx));

// router
let app = Router::new()
// HTTP
.route("/", axum::routing::get(handle_http))
.route("/", axum::routing::get(chai_handler::handle_http))
// Websocket
.route("/ws", axum::routing::get(handle_ws))
// Adds extension
.layer(Extension(arc_rx));

let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
.route(
"/broadcast",
axum::routing::get(chai_handler::handle_ws_broadcast),
)
.route(
"/pingpong",
axum::routing::get(chai_handler::handle_ws_pingpong),
)
// Adds extension for broadcast receiver
.layer(Extension(Arc::new(Mutex::new(rx))));

// address
let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
log::info!("listening on {}", addr);
axum::Server::bind(&addr)
Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
Expand Down
67 changes: 0 additions & 67 deletions src/bin/ws_pingpong.rs

This file was deleted.

76 changes: 76 additions & 0 deletions src/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
use axum::extract::TypedHeader;
use axum::Extension;
use std::sync::Arc;
use tokio::sync::{broadcast, Mutex};

/// HTTP handler, returns plain text
pub async fn handle_http(user_agent: Option<TypedHeader<headers::UserAgent>>) -> &'static str {
log::info!("Connected: {}", user_agent.unwrap().as_str());
"Hello, World!"
}

/// WebSocket handler, returns response from callback
pub async fn handle_ws_broadcast(
ws: WebSocketUpgrade,
user_agent: Option<TypedHeader<headers::UserAgent>>,
rx: Extension<Arc<Mutex<broadcast::Receiver<i32>>>>,
) -> impl axum::response::IntoResponse {
// callback upon reception
log::info!("Connected: {}", user_agent.unwrap().as_str());

ws.on_upgrade(move |socket: WebSocket| ws_upgrade_callback(socket, rx.0))
}

/// Websocket Callback that sends received data from broadcast
async fn ws_upgrade_callback(mut ws: WebSocket, rx: Arc<Mutex<broadcast::Receiver<i32>>>) {
// TODO spawn both the broadcast loop and the receiver loop for real-time control
// while websocket is on connection
while let Ok(number) = rx.lock().await.recv().await {
ws.send(Message::Text(format!("{number}"))).await.unwrap();
}
// sends Message::Close()
ws.close().await.unwrap();
}

/// WebSocket handler, returns response from callback
pub async fn handle_ws_pingpong(
ws: WebSocketUpgrade,
user_agent: Option<TypedHeader<headers::UserAgent>>,
) -> impl axum::response::IntoResponse {
if let Some(TypedHeader(user_agent)) = user_agent {
log::info!("Connected: {}", user_agent.as_str());
}

ws.on_upgrade(ws_callback_pingpong)
}

/// Websocket Callback that sends received data
async fn ws_callback_pingpong(mut socket: WebSocket) {
loop {
let res = socket.recv().await;
if res.is_none() {
break;
}
let res = res.unwrap();
if let Err(e) = res {
log::warn!("Failed receiving message {e}");
break;
}
let msg = res.unwrap();
log::info!("RX: {:?}", msg);
if let Message::Close(_) = msg {
log::warn!("Close message received");
break;
}
if let Message::Text(text) = msg {
let res = socket.send(Message::Text(text.clone())).await;
if res.is_err() {
log::warn!("Failed sending message, disconnecting client");
return;
}
log::info!("TX: {:?}", text);
}
}
log::warn!("Escaping handler");
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
/// Logger intialization
pub mod logger;

/// Handlers
pub mod handler;

0 comments on commit d710385

Please sign in to comment.