From 89d1d280521d60cca16aabb28514626dc578b9cb Mon Sep 17 00:00:00 2001 From: yourfriendoss Date: Tue, 16 Sep 2025 11:03:03 +0300 Subject: [PATCH] feat: multi-client support. still missing lots more of the code, such as the logs not being aligned --- Cargo.lock | 1 + Cargo.toml | 1 + src/client.rs | 4 +- src/log.rs | 33 ++++---- src/main.rs | 205 +++++++++++++++++++++++++++++--------------------- 5 files changed, 144 insertions(+), 100 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1b21156..a7fb95f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1919,6 +1919,7 @@ dependencies = [ "console-subscriber", "dotenv_codegen", "flume", + "futures", "futures-util", "hocon", "midiplayer_rs", diff --git a/Cargo.toml b/Cargo.toml index 284f6bb..ed36117 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,3 +36,4 @@ thousands = "0.2.0" rayon = "1.11.0" hocon = {version = "0.9.0", default-features = false, features = ["serde-support"] } +futures = "0.3.31" diff --git a/src/client.rs b/src/client.rs index 7fa66bf..244ff8c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -235,10 +235,10 @@ impl Client { loop { match self.connect_websocket().await { Ok(_) => { - log!(DISCONNECTED); + log!(DISCONNECTED, self.channel); } Err(e) => { - log!(ERRORED, e); + log!(ERRORED, e, self.channel); } } sleep(Duration::from_secs(1)).await; diff --git a/src/log.rs b/src/log.rs index 52f5178..05c86b3 100644 --- a/src/log.rs +++ b/src/log.rs @@ -22,8 +22,9 @@ macro_rules! log { (JOINED, $channel:expr, $visible:expr, $actual:expr) => { use $crate::log::*; println!( - "{}{}{} | Channel {}{}{} | People: {} (actual: {})", + "{}{}{}{} | Channel {}{}{} | People: {} (actual: {})", GREEN, + $channel, pad_prefix("[JOINED]", MAX_PREFIX_WIDTH), RESET, CYAN, @@ -33,40 +34,44 @@ macro_rules! log { $actual ); }; - (DISCONNECTED) => { + (DISCONNECTED, $channel:expr) => { use $crate::log::*; println!( - "{}{}{} | WebSocket closed, reconnecting in 1 second...", + "{} {} {}{} | WebSocket closed, reconnecting in 1 second...", YELLOW, + $channel, pad_prefix("[DISCONNECTED]", MAX_PREFIX_WIDTH), RESET ); }; - (ERRORED, $e:expr) => { + (ERRORED, $e:expr, $channel:expr) => { use $crate::log::*; println!( - "{}{}{} | WebSocket error: {}, reconnecting in 1 second...", + "{} {} {}{} | WebSocket error: {}, reconnecting in 1 second...", RED, + $channel, pad_prefix("[ERROR]", MAX_PREFIX_WIDTH), RESET, $e ); }; - (CONNECTED) => { + (CONNECTED, $channel:expr) => { use $crate::log::*; println!( - "{}{}{} | Connected!", + "{} {} {}{} | Connected!", GREEN, + $channel, pad_prefix("[CONNECTED]", MAX_PREFIX_WIDTH), RESET ); }; - (MSG, $user:expr, $message:expr) => { + (MSG, $user:expr, $message:expr, $channel:expr) => { use $crate::log::*; println!( - "{}{}{} | {}{}{}: {}", + "{} {} {}{} | {}{}{}: {}", BLUE, + $channel, pad_prefix("[MSG]", MAX_PREFIX_WIDTH), RESET, MAGENTA, @@ -75,11 +80,12 @@ macro_rules! log { $message ); }; - (PLAYER_JOINED, $player:expr) => { + (PLAYER_JOINED, $player:expr, $channel:expr) => { use $crate::log::*; println!( - "{}{}{} | {}{}{} joined", + "{} {} {}{} | {}{}{} joined", GREEN, + $channel, pad_prefix("[PLAYER JOINED]", MAX_PREFIX_WIDTH), RESET, CYAN, @@ -87,11 +93,12 @@ macro_rules! log { RESET ); }; - (PLAYER_LEFT, $player:expr) => { + (PLAYER_LEFT, $player:expr, $channel:expr) => { use $crate::log::*; println!( - "{}{}{} | {}{}{} left", + "{} {} {}{} | {}{}{} left", YELLOW, + $channel, pad_prefix("[PLAYER LEFT]", MAX_PREFIX_WIDTH), RESET, CYAN, diff --git a/src/main.rs b/src/main.rs index 59b4a13..11bf389 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,6 +15,7 @@ use crate::client::Client; use crate::client::ClientEvent; use crate::commands::Arguments; use crate::commands::CommandRegistry; +use futures::future::join_all; use std::collections::HashMap; use cap::Cap; @@ -49,6 +50,35 @@ pub struct Rank { permissions: Vec, } +#[derive(Deserialize, Clone, Debug)] +pub struct Configuration { + database: DatabaseConfig, + commands: CommandsConfig, + client: ClientConfig, +} + +#[derive(Deserialize, Clone, Debug)] +struct DatabaseConfig { + url: String, +} + +#[derive(Deserialize, Clone, Debug)] +struct CommandsConfig { + prefix: String, + name: String, + ntfy: Option, + copyparty: String, + playlists: HashMap>, +} + +#[derive(Deserialize, Clone, Debug)] +struct ClientConfig { + token: String, + ws: String, + room: Option, + rooms: Option>, +} + pub async fn get_ranks(registry: CommandRegistry) -> HashMap { let mut ranks: HashMap = HashMap::new(); let mut command_permissions: Vec = Vec::new(); @@ -114,57 +144,17 @@ macro_rules! register_all { )+ }; } -#[derive(Deserialize, Clone, Debug)] -pub struct Configuration { - database: DatabaseConfig, - commands: CommandsConfig, - client: ClientConfig, -} -#[derive(Deserialize, Clone, Debug)] -struct DatabaseConfig { - url: String, -} - -#[derive(Deserialize, Clone, Debug)] -struct CommandsConfig { - prefix: String, - name: String, - ntfy: Option, - copyparty: String, - playlists: HashMap>, -} - -#[derive(Deserialize, Clone, Debug)] -struct ClientConfig { - token: String, - ws: String, +pub async fn start_client( + conf: Configuration, + pool: Arc>, + mode: &'static str, room: String, -} - -#[tokio::main] -async fn main() -> Result<(), Box> { - rustls::crypto::CryptoProvider::install_default(rustls_rustcrypto::provider()) - .expect("install rustcrypto provider"); - - let s = tokio::fs::read_to_string("config.hocon").await?; - let conf: Configuration = hocon::de::from_str(&s)?; - let mut mode = "R"; - if cfg!(debug_assertions) { - mode = "D"; - } - + prefix: String, +) -> Result<(), Box> { let (mut client, event_rx) = Client::new(); - let pool = PgPoolOptions::new() - .max_connections(5) - .connect(conf.database.url.as_str()) - .await?; - let arc_pool = Arc::new(pool); - let midi_state = Arc::new(Mutex::new(MidiState::new())); - let mut registry = CommandRegistry::new(); - let ranks: HashMap = HashMap::new(); register_all!( registry, @@ -178,16 +168,16 @@ async fn main() -> Result<(), Box> { LaunchCommand, FollowCommand::new(), TestCommand::new(), - BalanceCommand::new(arc_pool.clone()), - InventoryCommand::new(arc_pool.clone()), - FishCommand::new(arc_pool.clone()), - FarmCommand::new(arc_pool.clone()), - ShopCommand::new(arc_pool.clone()), - CoinflipCommand::new(arc_pool.clone()), + BalanceCommand::new(pool.clone()), + InventoryCommand::new(pool.clone()), + FishCommand::new(pool.clone()), + FarmCommand::new(pool.clone()), + ShopCommand::new(pool.clone()), + CoinflipCommand::new(pool.clone()), TranslateCommand, AboutCommand, - RankCommand::new(arc_pool.clone(), ranks.clone()), - PlayCommand::new(midi_state.clone(), conf.clone(), ranks.clone()), + RankCommand::new(pool.clone(), HashMap::new()), + PlayCommand::new(midi_state.clone(), conf.clone(), HashMap::new()), ] ); @@ -199,48 +189,51 @@ async fn main() -> Result<(), Box> { registry .register( - PlayCommand::new(midi_state, conf.clone(), ranks.clone()), + PlayCommand::new(midi_state.clone(), conf.clone(), ranks.clone()), client.clone(), ) .await; + registry .register( - RankCommand::new(arc_pool.clone(), ranks.clone()), + RankCommand::new(pool.clone(), ranks.clone()), client.clone(), ) .await; let client_events = client.clone(); - let events_pool = arc_pool.clone(); + let events_pool = pool.clone(); + let conf_events = conf.clone(); + let mode_events = mode; + let prefix_events = prefix.clone(); + let room_cloned = room.clone(); tokio::spawn(async move { while let Ok(event) = event_rx.recv_async().await { match event { ClientEvent::Connected => { - log!(CONNECTED); + log!(CONNECTED, room); } ClientEvent::Sync { e } => { let t = e - chrono::Utc::now().timestamp_millis(); - - let username = { - let ram = ALLOCATOR.allocated() / 1_000_000; - if ram != 0 { - format!("{} 📶{} ms 🐏{} mb {}", conf.commands.name, -t, ram, mode) - } else { - format!("{} 📶{} ms {}", conf.commands.name, -t, mode) - } + let ram = ALLOCATOR.allocated() / 1_000_000; + let username = if ram != 0 { + format!( + "{} 📶{} ms 🐏{} mb {}", + conf_events.commands.name, -t, ram, mode_events + ) + } else { + format!("{} 📶{} ms {}", conf_events.commands.name, -t, mode_events) }; - let _ = client_events .userset(username.as_str().into(), "#B7410E".into()) .await; } ClientEvent::Message { player, message } => { - if let Some(no_prefix) = message.strip_prefix(conf.commands.prefix.as_str()) { + if let Some(no_prefix) = message.strip_prefix(prefix_events.as_str()) { let mut parts = no_prefix.split_whitespace(); if let Some(cmd_name) = parts.next() { let args = Arguments::new(parts.map(|s| s.to_string()).collect()); - let mut cmd_opt: Option = None; for cmd in registry.values() { let cmd_lock = cmd.lock().await; @@ -251,7 +244,6 @@ async fn main() -> Result<(), Box> { break; } } - if let Some(cmd) = cmd_opt { let mut cmd_lock = cmd.lock().await; let specs = cmd_lock.argument_spec(); @@ -262,15 +254,14 @@ async fn main() -> Result<(), Box> { .await .unwrap() .unwrap(); - if !has_permission( &user, ranks.clone(), format!("commands.{}", cmd_lock.name()), ) { client_events - .message(format!("You do not have permission \"commands.{}\" to run this command.", cmd_name)) - .await; + .message(format!("You do not have permission \"commands.{}\" to run this command.", cmd_name)) + .await; } else { match crate::commands::argument::parse_arguments( specs, &args.args, @@ -295,23 +286,17 @@ async fn main() -> Result<(), Box> { } } } - - log!(MSG, player.name, message); + log!(MSG, player.name, message, room); } ClientEvent::PlayerJoined(player) => { - log!(PLAYER_JOINED, player.name); - + log!(PLAYER_JOINED, player.name, room); let user = sqlx::query_as::<_, User>("SELECT * FROM users WHERE _id = $1") .bind(&player._id) .fetch_optional(events_pool.as_ref()) .await; - - let user_exists = user.unwrap().is_some(); - - if !user_exists { - // INSERT with extra_permissions and rank columns (defaults) + if user.unwrap().is_none() { let _ = sqlx::query( - "INSERT INTO users (_id, balance, items, extra_permissions, rank) VALUES ($1, $2, $3, $4, $5)", + "INSERT INTO users (_id, balance, items, extra_permissions, rank) VALUES ($1, $2, $3, $4, $5)" ) .bind(&player._id) .bind(0_i32) @@ -323,7 +308,7 @@ async fn main() -> Result<(), Box> { } } ClientEvent::PlayerLeft(id) => { - log!(PLAYER_LEFT, id); + log!(PLAYER_LEFT, id, room); } ClientEvent::Mouse { x, y, id } => { let cmd_lock = registry.get("follow").unwrap(); @@ -340,8 +325,58 @@ async fn main() -> Result<(), Box> { }); client - .connect(conf.client.ws, conf.client.token, conf.client.room) + .connect(conf.client.ws, conf.client.token, room_cloned) .await?; + Ok(()) +} + +#[tokio::main(flavor = "multi_thread", worker_threads = 10)] +async fn main() -> Result<(), Box> { + rustls::crypto::CryptoProvider::install_default(rustls_rustcrypto::provider()) + .expect("install rustcrypto provider"); + + let s = tokio::fs::read_to_string("config.hocon").await?; + let conf: Configuration = hocon::de::from_str(&s)?; + let mut mode = "R"; + if cfg!(debug_assertions) { + mode = "D"; + } + + let pool = PgPoolOptions::new() + .max_connections(5) + .connect(conf.database.url.as_str()) + .await?; + let arc_pool = Arc::new(pool); + let mut rooms: Vec = Vec::new(); + + if let Some(room) = &conf.client.room { + rooms.push(room.clone()); + } + + if let Some(extra_rooms) = &conf.client.rooms { + rooms.extend(extra_rooms.clone()); + } + + if rooms.is_empty() { + eprintln!( + "Error: No rooms specified in configuration (conf.client.room and conf.client.rooms are both empty)" + ); + std::process::exit(1); + } + + let tasks: Vec<_> = rooms + .into_iter() + .map(|room_name| { + let conf_clone = conf.clone(); + let arc_pool_clone = arc_pool.clone(); + let prefix_clone = conf.commands.prefix.clone(); + tokio::spawn(async move { + start_client(conf_clone, arc_pool_clone, mode, room_name, prefix_clone).await + }) + }) + .collect(); + + let _results = join_all(tasks).await; Ok(()) }