feat: make midi playing fully async, make queue better w/ more info,

player information, and bug fix for commands which aren't found
This commit is contained in:
Soph :3 2025-09-14 16:40:17 +03:00
parent 257bc55b75
commit b4f989bf92
7 changed files with 234 additions and 81 deletions

View file

@ -7,6 +7,7 @@ use flume::{Receiver, Sender};
use tokio::{sync::Mutex, task::JoinHandle};
use crate::{
User,
client::Client,
midi_helper::{MidiEvent, play_midi},
submods,
@ -107,11 +108,18 @@ pub fn number_to_midi() -> HashMap<u8, &'static str> {
map
}
#[derive(Clone, Debug)]
pub struct QueueEntry {
pub beautiful_url: String,
pub filename: String,
pub user: User,
}
pub struct MidiState {
pub midi_tx: Sender<MidiEvent>,
pub midi_rx: Receiver<MidiEvent>,
pub midi_handle: Option<JoinHandle<()>>,
pub queue: Arc<Mutex<VecDeque<(String, String)>>>,
pub queue: Arc<Mutex<VecDeque<QueueEntry>>>,
}
impl Default for MidiState {
@ -141,8 +149,7 @@ pub fn simple_hash(s: &str) -> u64 {
}
pub async fn play_midi_file(
filename_to_play: String,
filename_beautiful: String,
entry: QueueEntry,
only_queue: bool,
midi_state: Arc<Mutex<MidiState>>,
client: Client,
@ -155,14 +162,13 @@ pub async fn play_midi_file(
queue = midi_state.queue.clone();
}
let handle_filename_to_play = filename_to_play.clone();
let handle_filename_beautiful = filename_beautiful.clone();
let handle_filename_beautiful = entry.beautiful_url.clone();
let handle_client = client.clone();
let midi_handle = tokio::spawn(async move {
let next_mtx = midi_tx.clone();
if !only_queue {
let _ = play_midi(handle_filename_to_play.as_str(), midi_tx).await;
let _ = play_midi(entry, midi_tx).await;
handle_client
.message(format!("{} ended.", handle_filename_beautiful))
.await;
@ -178,10 +184,13 @@ pub async fn play_midi_file(
drop(locked_queue);
handle_client
.message(format!("Queue left: {}, playing {}.", queue_len, midi.0))
.message(format!(
"Queue left: {}, playing {}.",
queue_len, midi.beautiful_url
))
.await;
let _ = play_midi(midi.1.as_str(), next_mtx.clone()).await;
let _ = play_midi(midi, next_mtx.clone()).await;
}
});

View file

@ -1,4 +1,5 @@
use crate::Configuration;
use crate::Rank;
use crate::User;
use crate::client::Client;
use crate::client::ClientEvent;
@ -6,25 +7,38 @@ use crate::client::Note;
use crate::client::Player;
use crate::commands::Command;
use crate::commands::MidiState;
use crate::commands::QueueEntry;
use crate::commands::argument::{ArgumentSpec, ArgumentType, ParsedArguments};
use crate::commands::number_to_midi;
use crate::commands::play_midi_file;
use crate::commands::simple_hash;
use crate::has_permission;
use crate::midi_helper::MidiEvent;
use thousands::Separable;
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
pub struct PlayCommand {
midi_state: Arc<Mutex<MidiState>>,
conf: Configuration,
ranks: HashMap<String, Rank>,
}
impl PlayCommand {
pub fn new(midi_state: Arc<Mutex<MidiState>>, conf: Configuration) -> Self {
Self { midi_state, conf }
pub fn new(
midi_state: Arc<Mutex<MidiState>>,
conf: Configuration,
ranks: HashMap<String, Rank>,
) -> Self {
Self {
midi_state,
conf,
ranks,
}
}
}
@ -54,10 +68,12 @@ impl Command for PlayCommand {
async fn constructed(&mut self, client: Client) {
let ntm = number_to_midi();
let midi_state_cloned = self.midi_state.clone();
let midi_rx = {
let midi_state = self.midi_state.lock().await;
midi_state.midi_rx.clone()
};
let cloned_ranks = self.ranks.clone();
tokio::spawn(async move {
while let Ok(event) = midi_rx.recv_async().await {
@ -92,11 +108,43 @@ impl Command for PlayCommand {
seconds,
millis,
parse_time,
entry,
} => {
if !has_permission(
&entry.user,
cloned_ranks.clone(),
"play.high_note_counts".to_string(),
) && note_count > 100_000
{
client
.message("Midi playing cancelled, you do not have permission to play a midi of this size.")
.await;
let mut midi_state = midi_state_cloned.lock().await;
if let Some(handle) = midi_state.midi_handle.as_ref()
&& !handle.is_finished()
{
handle.abort();
midi_state.midi_handle = None;
drop(midi_state);
play_midi_file(
QueueEntry {
filename: "".to_string(),
beautiful_url: "".to_string(),
user: entry.user,
},
true,
midi_state_cloned.clone(),
client,
)
.await;
return;
}
}
client
.message(format!(
"Tracks: `{}` Events: `{}` Total Duration: `{:02}:{:02}.{:03}` Note Count: `{}` Parse time: `{:.2?}`",
num_tracks, events_count, minutes, seconds, millis, note_count, parse_time
num_tracks.separate_with_commas(), events_count.separate_with_commas(), minutes, seconds, millis, note_count.separate_with_commas(), parse_time
))
.await;
}
@ -107,7 +155,7 @@ impl Command for PlayCommand {
async fn event(&mut self, _: Client, _: ClientEvent) {}
async fn execute(&mut self, client: Client, _: Player, args: ParsedArguments, _: User) {
async fn execute(&mut self, client: Client, _: Player, args: ParsedArguments, user: User) {
let file_arg = match args.get("file") {
Some(crate::commands::argument::ParsedArgument::String(s)) => s,
_ => "",
@ -252,8 +300,11 @@ impl Command for PlayCommand {
}
play_midi_file(
filename_to_play,
filename_beautiful.clone(),
QueueEntry {
filename: filename_to_play,
beautiful_url: filename_beautiful.clone(),
user,
},
false,
self.midi_state.clone(),
client.clone(),

View file

@ -6,6 +6,7 @@ use crate::Configuration;
use crate::User;
use crate::commands::Command;
use crate::commands::MidiState;
use crate::commands::QueueEntry;
use crate::commands::argument::{ArgumentSpec, ArgumentType, ParsedArgument, ParsedArguments};
use crate::commands::simple_hash;
use crate::play_midi_file;
@ -54,7 +55,13 @@ impl Command for PlaylistCommand {
async fn event(&mut self, _: Client, _: ClientEvent) {}
async fn execute(&mut self, client: Client, _: Player, args: ParsedArguments, _: User) {
async fn execute(
&mut self,
client: Client,
_: Player,
args: ParsedArguments,
command_user: User,
) {
let joined_args = match args.get("playlist") {
Some(ParsedArgument::String(s)) => s.as_str(),
_ => "",
@ -128,7 +135,11 @@ impl Command for PlaylistCommand {
for track in tracks.iter().skip(1) {
let hashed = simple_hash(track);
let filename = format!("midis/{}.mid", hashed);
locked_queue.push_back((track.to_string(), filename));
locked_queue.push_back(QueueEntry {
beautiful_url: track.to_string(),
filename,
user: command_user.clone(),
});
}
drop(locked_queue);
drop(locked_state);
@ -267,8 +278,11 @@ impl Command for PlaylistCommand {
{
let mut locked_queue = locked_state.queue.lock().await;
for entry in file_entries.iter().skip(1) {
locked_queue
.push_back((entry.0.clone(), entry.1.clone()));
locked_queue.push_back(QueueEntry {
beautiful_url: entry.0.clone(),
filename: entry.1.clone(),
user: command_user.clone(),
});
}
}
@ -321,8 +335,11 @@ impl Command for PlaylistCommand {
}
play_midi_file(
filename_to_play,
filename_beautiful.clone(),
QueueEntry {
filename: filename_to_play,
beautiful_url: filename_beautiful.clone(),
user: command_user,
},
false,
self.midi_state.clone(),
client.clone(),

View file

@ -49,7 +49,11 @@ impl Command for QueueCommand {
if queue_len == 0 {
client.message("Queue is empty.").await;
} else {
let midis: Vec<String> = locked_queue.iter().cloned().map(|z| z.0).collect();
let midis: Vec<String> = locked_queue
.iter()
.cloned()
.map(|z| z.beautiful_url)
.collect();
let midis_list = midis.join(", ");
client
.message(format!(

View file

@ -4,6 +4,7 @@ use crate::client::ClientEvent;
use crate::client::Player;
use crate::commands::Command;
use crate::commands::MidiState;
use crate::commands::QueueEntry;
use crate::commands::argument::{ArgumentSpec, ParsedArguments};
use crate::commands::play_midi_file;
@ -43,7 +44,7 @@ impl Command for SkipCommand {
async fn event(&mut self, _: Client, _: ClientEvent) {}
async fn execute(&mut self, client: Client, _: Player, _args: ParsedArguments, _: User) {
async fn execute(&mut self, client: Client, _: Player, _args: ParsedArguments, user: User) {
let mut midi_state = self.midi_state.lock().await;
if let Some(handle) = midi_state.midi_handle.as_ref()
&& !handle.is_finished()
@ -53,8 +54,11 @@ impl Command for SkipCommand {
client.message("Skipped current midi.").await;
drop(midi_state);
play_midi_file(
"".to_string(),
"".to_string(),
QueueEntry {
filename: "".to_string(),
beautiful_url: "".to_string(),
user,
},
true,
self.midi_state.clone(),
client,

View file

@ -34,7 +34,7 @@ use std::sync::Arc;
#[global_allocator]
static ALLOCATOR: Cap<std::alloc::System> = Cap::new(std::alloc::System, usize::MAX);
#[derive(sqlx::FromRow)]
#[derive(sqlx::FromRow, Clone, Debug)]
pub struct User {
_id: String,
balance: i32,
@ -156,11 +156,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
registry,
client,
[
PlayCommand::new(midi_state.clone(), conf.clone()),
StopCommand::new(midi_state.clone()),
PlaylistCommand::new(midi_state.clone(), conf.clone()),
QueueCommand::new(midi_state.clone()),
SkipCommand::new(midi_state),
SkipCommand::new(midi_state.clone()),
LaunchCommand,
FollowCommand::new(),
TestCommand::new(),
@ -173,6 +172,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
TranslateCommand,
AboutCommand,
RankCommand::new(arc_pool.clone(), ranks.clone()),
PlayCommand::new(midi_state.clone(), conf.clone(), ranks.clone()),
]
);
@ -182,6 +182,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let ranks = get_ranks(registry.clone()).await;
registry
.register(
PlayCommand::new(midi_state, conf.clone(), ranks.clone()),
client.clone(),
)
.await;
registry
.register(
RankCommand::new(arc_pool.clone(), ranks.clone()),
@ -218,39 +224,39 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
if let Some(no_prefix) = message.strip_prefix(conf.commands.prefix.as_str()) {
let mut parts = no_prefix.split_whitespace();
if let Some(cmd_name) = parts.next() {
let user =
sqlx::query_as::<_, User>("SELECT * FROM users WHERE _id = $1")
.bind(&player._id)
.fetch_optional(events_pool.as_ref())
.await
.unwrap()
.unwrap();
let args = Arguments::new(parts.map(|s| s.to_string()).collect());
if !has_permission(
&user,
ranks.clone(),
format!("commands.{}", cmd_name),
) {
client_events
.message(format!("You do not have permission \"commands.{}\" to run this command.", cmd_name))
.await;
} else {
let args = Arguments::new(parts.map(|s| s.to_string()).collect());
let mut cmd_opt: Option<CommandArc> = None;
for cmd in registry.values() {
let cmd_lock = cmd.lock().await;
if cmd_lock.name() == cmd_name
|| cmd_lock.aliases().contains(&cmd_name)
{
cmd_opt = Some(cmd.clone());
break;
}
let mut cmd_opt: Option<CommandArc> = None;
for cmd in registry.values() {
let cmd_lock = cmd.lock().await;
if cmd_lock.name() == cmd_name
|| cmd_lock.aliases().contains(&cmd_name)
{
cmd_opt = Some(cmd.clone());
break;
}
}
if let Some(cmd) = cmd_opt {
let mut cmd_lock = cmd.lock().await;
let specs = cmd_lock.argument_spec();
if let Some(cmd) = cmd_opt {
let mut cmd_lock = cmd.lock().await;
let specs = cmd_lock.argument_spec();
let user =
sqlx::query_as::<_, User>("SELECT * FROM users WHERE _id = $1")
.bind(&player._id)
.fetch_optional(events_pool.as_ref())
.await
.unwrap()
.unwrap();
if !has_permission(
&user,
ranks.clone(),
format!("commands.{}", cmd_name),
) {
client_events
.message(format!("You do not have permission \"commands.{}\" to run this command.", cmd_name))
.await;
} else {
match crate::commands::argument::parse_arguments(
specs, &args.args,
) {

View file

@ -1,10 +1,14 @@
use flume::Sender;
use midiplayer_rs::midi::loader::load_midi_file;
use midiplayer_rs::midi::player::ParsedMidi;
use midiplayer_rs::midi::player::parse_midi_events;
use midiplayer_rs::midi::player::play_parsed_events;
use midiplayer_rs::midi::utils::get_time_100ns;
use midiplayer_rs::midi::utils::unpack_event;
use std::time::Duration;
use std::time::Instant;
use thousands::Separable;
use crate::commands::QueueEntry;
#[derive(Debug, Clone)]
pub enum MidiEvent {
@ -16,19 +20,20 @@ pub enum MidiEvent {
key: u8,
},
Info {
num_tracks: String,
num_tracks: usize,
time_div: u16,
events_count: String,
note_count: String,
total_ticks: String,
events_count: usize,
note_count: u64,
total_ticks: u64,
minutes: u128,
seconds: u128,
millis: u128,
parse_time: Duration,
entry: QueueEntry,
},
}
pub fn delay_execution_100ns_blocking(delay_in_100ns: i64) {
pub async fn delay_execution_100ns_async(delay_in_100ns: i64) {
if delay_in_100ns <= 0 {
return;
}
@ -37,14 +42,14 @@ pub fn delay_execution_100ns_blocking(delay_in_100ns: i64) {
let nanos = (delay_in_100ns % 10_000_000) * 100;
let duration = std::time::Duration::new(secs as u64, nanos as u32);
std::thread::sleep(duration);
tokio::time::sleep(duration).await;
}
pub async fn play_midi(
path: &str,
entry: QueueEntry,
tx: Sender<MidiEvent>,
) -> Result<(), Box<dyn std::error::Error>> {
let (tracks, time_div) = load_midi_file(path).unwrap();
let (tracks, time_div) = load_midi_file(entry.clone().filename).unwrap();
let num_tracks = tracks.len();
let start = Instant::now();
@ -56,25 +61,45 @@ pub async fn play_midi(
let _ = tx
.send_async(MidiEvent::Info {
num_tracks: num_tracks.separate_with_commas(),
num_tracks,
time_div,
events_count: parsed.events.len().separate_with_commas(),
note_count: parsed.note_count.separate_with_commas(),
total_ticks: parsed.total_ticks.separate_with_commas(),
events_count: parsed.events.len(),
note_count: parsed.note_count,
total_ticks: parsed.total_ticks,
minutes,
seconds,
millis,
entry,
parse_time: start.elapsed(),
})
.await;
let c_tx = tx.clone();
tokio::task::spawn_blocking(move || {
play_parsed_events(
&parsed,
time_div,
move |data| {
let delay_fn = Box::new(async move |ns| delay_execution_100ns_async(ns).await);
let mut bpm_us_per_qn: u64;
let mut tick: u64 = 0;
let mut multiplier: f64 = 0.0;
let max_drift: i64 = 100_000;
let mut old: i64 = 0;
let mut delta: i64 = 0;
let mut last_time = get_time_100ns();
let mut i = 0;
let n = parsed.events.len();
let mut delta_idx = 0;
let n_deltas = parsed.deltas.len();
while i < n {
loop {
let packed = unsafe { *parsed.events.get_unchecked(i) };
let (data, is_tempo) = unpack_event(packed);
if is_tempo {
bpm_us_per_qn = data as u64;
multiplier = (bpm_us_per_qn as f64) / (time_div as f64) * 10.0;
} else {
let status = (data & 0xFF) as u8;
let data1 = ((data >> 8) & 0xFF) as u8;
let data2 = ((data >> 16) & 0xFF) as u8;
@ -110,7 +135,9 @@ pub async fn play_midi(
"[DECODE] Control Change - ch={} controller={} value={}",
channel, data1, data2
),
0xC0 => println!("[DECODE] Program Change - ch={} program={}", channel, data1),
0xC0 => {
println!("[DECODE] Program Change - ch={} program={}", channel, data1)
}
0xD0 => println!(
"[DECODE] Channel Pressure - ch={} pressure={}",
channel, data1
@ -121,11 +148,46 @@ pub async fn play_midi(
}
_ => println!("[WARN] Unknown/Unsupported MIDI message 0x{:02X}", status),
}
},
Some(Box::new(|ns: i64| delay_execution_100ns_blocking(ns))),
);
})
.await?;
}
if delta_idx < n_deltas {
let (idx, delta_ticks) = unsafe { *parsed.deltas.get_unchecked(delta_idx) };
if idx == i as u32 {
let delta_tick = delta_ticks as u64;
tick = tick.wrapping_add(delta_tick);
let now = get_time_100ns();
let elapsed = (now - last_time) as i64;
last_time = now;
let work_time = elapsed - old;
old = (delta_tick as f64 * multiplier) as i64;
delta = delta.wrapping_add(work_time);
let sleep_time = if delta > 0 { old - delta } else { old };
if sleep_time <= 0 {
delta = delta.min(max_drift);
} else {
delay_fn(sleep_time).await;
}
delta_idx += 1;
i += 1;
break;
}
}
i += 1;
if i >= n {
break;
}
}
if i >= n {
break;
}
}
Ok(())
}