diff --git a/Cargo.lock b/Cargo.lock index bd5cbe4..6d4ab83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -224,6 +224,7 @@ dependencies = [ "async-trait", "axum-core 0.4.1", "axum-macros", + "base64", "bytes", "futures-util", "http 1.0.0", @@ -243,8 +244,10 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", + "sha1", "sync_wrapper", "tokio", + "tokio-tungstenite", "tower", "tower-layer", "tower-service", @@ -430,6 +433,7 @@ dependencies = [ "axum-extra", "base64", "chrono", + "futures", "image", "regex", "reqwest", @@ -660,6 +664,12 @@ dependencies = [ "typenum", ] +[[package]] +name = "data-encoding" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" + [[package]] name = "der" version = "0.7.8" @@ -843,6 +853,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.29" @@ -916,6 +941,7 @@ version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -3301,6 +3327,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.10" @@ -3554,6 +3592,25 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 0.2.11", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.17.0" @@ -3643,6 +3700,12 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "uuid" version = "1.6.1" diff --git a/Cargo.toml b/Cargo.toml index 22c718f..1be3adb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,10 +6,11 @@ edition = "2021" [dependencies] askama = { version = "0.12.1", features = ["with-axum"] } askama_axum = "0.4.0" -axum = { version = "0.7.2", features = ["macros", "multipart"] } +axum = { version = "0.7.2", features = ["macros", "multipart", "ws"] } axum-extra = { version = "0.9.0", features = ["typed-header"] } base64 = "0.21.5" chrono = { version = "0.4.31", default-features = false, features = ["std"] } +futures = "0.3.29" image = "0.24.7" regex = "1.10.2" reqwest = { version = "0.11.22", features = ["json"] } diff --git a/src/cal/day19.rs b/src/cal/day19.rs new file mode 100644 index 0000000..05a3489 --- /dev/null +++ b/src/cal/day19.rs @@ -0,0 +1,128 @@ +use axum::{ + extract::{ + ws::{WebSocket, WebSocketUpgrade}, + Path, State, + }, + response::IntoResponse, + routing::{get, post}, + Router, +}; +use futures::{SinkExt, StreamExt}; +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, Mutex, + }, +}; +use tokio::{spawn, sync::broadcast::Sender}; + +#[derive(Clone, serde::Serialize)] +struct Message { + user: String, + message: String, +} + +#[derive(Clone)] +struct Day19State { + view_count: Arc, + sockets: Arc>>>, +} + +pub(crate) fn router() -> Router { + Router::new() + .route("/19/ws/ping", get(ping_handler)) + .route("/19/reset", post(reset)) + .route("/19/views", get(views)) + .route("/19/ws/room/:id/user/:user", get(room_handler)) + .with_state(Day19State { + view_count: Arc::new(AtomicU64::new(0)), + sockets: Arc::new(Mutex::new(HashMap::new())), + }) +} + +async fn ping_handler(ws: WebSocketUpgrade) -> impl IntoResponse { + ws.on_upgrade(ping) +} + +async fn ping(mut ws: WebSocket) { + let mut served = false; + while let Some(Ok(msg)) = ws.recv().await { + if let Ok(msg) = msg.to_text() { + match msg { + "serve" => served = true, + "ping" if served => { + let _ = ws.send("pong".into()).await; + } + _ => {} + } + } + } +} + +async fn reset(State(state): State) { + state.view_count.store(0, Ordering::Relaxed); +} + +async fn views(State(state): State) -> impl IntoResponse { + state.view_count.load(Ordering::Relaxed).to_string() +} + +async fn room_handler( + ws: WebSocketUpgrade, + Path((id, user)): Path<(u64, String)>, + State(state): State, +) -> impl IntoResponse { + ws.on_upgrade(move |s| room(s, id, user, state)) +} + +#[derive(serde::Deserialize)] +struct WsMsg { + message: String, +} + +async fn room(ws: WebSocket, id: u64, user: String, state: Day19State) { + let send = { + let Ok(mut map) = state.sockets.lock() else { + return; + }; + + if let Some(ch) = map.get(&id) { + ch.clone() + } else { + let ch = Sender::new(128); + map.insert(id, ch.clone()); + ch + } + }; + + let (mut ws_send, mut ws_recv) = ws.split(); + + let mut recv = send.subscribe(); + let recv_task = spawn(async move { + while let Ok(message) = recv.recv().await { + if let Ok(message) = serde_json::to_string(&message) { + if ws_send.send(message.into()).await.is_ok() { + state.view_count.fetch_add(1, Ordering::Relaxed); + } else { + return; + } + } + } + }); + + while let Some(Ok(msg)) = ws_recv.next().await { + if let Ok(msg) = msg.into_text() { + if let Ok(WsMsg { message }) = serde_json::from_str::(&msg) { + if message.len() <= 128 { + let _ = send.send(Message { + user: user.clone(), + message, + }); + } + } + } + } + + recv_task.abort(); +} diff --git a/src/cal/mod.rs b/src/cal/mod.rs index 7cddaf9..f544ec4 100644 --- a/src/cal/mod.rs +++ b/src/cal/mod.rs @@ -10,6 +10,7 @@ mod day13; mod day14; mod day15; mod day18; +mod day19; pub(crate) fn router(pool: sqlx::PgPool) -> axum::Router { axum::Router::new() @@ -25,4 +26,5 @@ pub(crate) fn router(pool: sqlx::PgPool) -> axum::Router { .nest("/", day14::router()) .nest("/", day15::router()) .nest("/", day18::router(pool)) + .nest("/", day19::router()) }