first commit

This commit is contained in:
Soph :3 2025-11-11 11:29:38 +02:00
commit 65430188aa
11 changed files with 2819 additions and 0 deletions

3
.gitignore vendored Normal file
View file

@ -0,0 +1,3 @@
/target
push.sh
u2c.py

6
.luarc.json Normal file
View file

@ -0,0 +1,6 @@
{
"$schema": "https://raw.githubusercontent.com/LuaLS/vscode-lua/master/setting/schema.json",
"runtime.version": "Lua 5.2",
"format.enable": true,
"workspace.library": ["~/lua-ls-cc-tweaked/library"]
}

7
README.md Normal file
View file

@ -0,0 +1,7 @@
# livestream-cc
## in development audio player from pipewire to cc
pfowrd -> ssh -R 0.0.0.0:5821:localhost:8080 vps
lua -> wget run https://files.sad.ovh/public/livestream-cc/.installer.lua
rust -> cargo run -- --device pipewire --bind '127.0.0.1:8080' --frame-ms 300 --sample-rate 48000 --stereo

81
lua/src/.installer.lua Normal file
View file

@ -0,0 +1,81 @@
local base_url = "https://files.sad.ovh/public/livestream-cc"
local api_url = base_url .. "?ls"
local download_root = "livestream-cc"
local function fetch_folder_list()
local response = http.get(api_url)
if not response then
error("Failed to get folder list from Copyparty API")
end
local body = response.readAll()
response.close()
local data = textutils.unserializeJSON(body)
return data
end
local function download_file(path)
local file_url = base_url .. "/" .. path
local local_path = download_root .. "/" .. path
local response = http.get(file_url)
if response then
local file = fs.open(local_path, "wb")
if file then
file.write(response.readAll())
file.close()
end
response.close()
else
print("Failed to download: " .. file_url)
end
end
local function traverse_and_download(folder_data, prefix)
prefix = prefix or ""
for _, file in ipairs(folder_data.files or {}) do
print("Downloading: " .. prefix .. file.href)
download_file(prefix .. file.href)
end
for _, dir in ipairs(folder_data.dirs or {}) do
fs.makeDir(download_root .. "/" .. prefix .. dir.href)
local subdir_url = base_url .. "/" .. dir.href .. "?ls"
local response = http.get(subdir_url)
if response then
local body = response.readAll()
response.close()
local subdir_data = textutils.unserializeJSON(body)
traverse_and_download(subdir_data, prefix .. dir.href)
else
print("Failed to get subdirectory: " .. subdir_url)
end
end
end
if fs.exists(download_root) then
if fs.exists(download_root .. "/version") then
local previousVersion = fs.open(download_root .. "/version", "r").readAll()
local currentVersion = http.get(base_url .. "/version").readAll();
if previousVersion == currentVersion then
print("Previous version " .. previousVersion .. " is already installed, we're on " .. currentVersion .. " aswell, so skipping installation.")
shell.run("livestream-cc/main.lua")
return
else
print("Version " .. previousVersion .. " was already installed. Uninstalling.")
fs.delete(download_root)
end
else
print("Version marker does not exist. Cannot install.")
shell.run("livestream-cc/main.lua")
return
end
end
fs.makeDir(download_root)
local folder_list = fetch_folder_list()
traverse_and_download(folder_list, "")
print("Done :), installed livestream-cc version " .. (http.get(base_url .. "/version").readAll()))
shell.run("livestream-cc/main.lua")

87
lua/src/dcode.lua Normal file
View file

@ -0,0 +1,87 @@
local char, byte, floor, band, rshift = string.char, string.byte, math.floor, bit32.band, bit32.arshift
local PREC = 8
local PREC_POW = 2 ^ PREC
local PREC_POW_HALF = 2 ^ (PREC - 1)
local STRENGTH_MIN = 2 ^ (PREC - 8 + 1)
local function make_predictor()
local charge, strength, previous_bit = 0, 0, false
return function(current_bit)
local target = current_bit and 127 or -128
local next_charge = charge + floor((strength * (target - charge) + PREC_POW_HALF) / PREC_POW)
if next_charge == charge and next_charge ~= target then
next_charge = next_charge + (current_bit and 1 or -1)
end
local z = current_bit == previous_bit and PREC_POW - 1 or 0
local next_strength = strength
if next_strength ~= z then next_strength = next_strength + (current_bit == previous_bit and 1 or -1) end
if next_strength < STRENGTH_MIN then next_strength = STRENGTH_MIN end
charge, strength, previous_bit = next_charge, next_strength, current_bit
return charge
end
end
local function make_dec()
local predictor = make_predictor()
local low_pass_charge = 0
local previous_charge, previous_bit = 0, false
return function (input)
local output, output_n = {}, 0
for i = 1, #input do
local input_byte = byte(input, i)
for _ = 1, 8 do
local current_bit = band(input_byte, 1) ~= 0
local charge = predictor(current_bit)
local antijerk = charge
if current_bit ~= previous_bit then
antijerk = floor((charge + previous_charge + 1) / 2)
end
previous_charge, previous_bit = charge, current_bit
low_pass_charge = low_pass_charge + floor(((antijerk - low_pass_charge) * 140 + 0x80) / 256)
output_n = output_n + 1
output[output_n] = low_pass_charge
input_byte = rshift(input_byte, 1)
end
end
return output
end
end
local function make_truebit_dec(max_amp)
max_amp = max_amp or 127
return function (input)
local output, output_n = {}, 0
for i = 1, #input do
local input_byte = byte(input, i)
for _ = 1, 8 do
local bit_is_1 = band(input_byte, 1) ~= 0
output_n = output_n + 1
output[output_n] = bit_is_1 and max_amp or -max_amp
input_byte = rshift(input_byte, 1)
end
end
return output
end
end
return {
make_dec = make_dec,
make_truebit_dec = make_truebit_dec
}

261
lua/src/main.lua Normal file
View file

@ -0,0 +1,261 @@
local monitor = peripheral.find("monitor")
local w, h = monitor.getSize()
local recent_logs = {
["misc"] = {}
}
function ui()
monitor.clear()
local x = 1
local y = 3
monitor.setTextScale(0.5)
monitor.setCursorPos(x,1)
monitor.write("Log")
for k, v in pairs(recent_logs) do
if k ~= "misc" then
monitor.setCursorPos(x, y)
monitor.write(k .. " -> " .. v.time)
monitor.setCursorPos(x, y+1)
monitor.write(v.log:sub(1, (w/2)-1))
y = y+3;
if y >= h then
y = 3
x = (w/2)
end
end
end
monitor.setCursorPos(x,y)
monitor.write("misc")
y = y +1
for _, v in pairs(recent_logs.misc) do
monitor.setCursorPos(x,y)
monitor.write(v.log .. " " .. v.time)
y = y + 1
if y >= h then
y = 3
x = (w/2)
end
end
end
local dcode = require "dcode"
-- Use the normal decoder for real audio; TrueBit is just for testing
local make_decoder = dcode.make_truebit_dec -- or dcode.make_truebit_dec
local ws = http.websocket("ws://vps.sad.ovh:5821/ws")
-------------------------------------------------------
-- metadata
-------------------------------------------------------
local raw = ws.receive()
local meta = textutils.unserializeJSON(raw)
print(("codec=%s rate=%dHz frame_ms=%d ch_enc=%d frame_bytes/chan=%d")
:format(meta.codec, meta.sample_rate, meta.frame_ms, meta.channels_encoded, meta.frame_bytes))
---@return string[]
--[[local function find_speakers()
local names = peripheral.getNames()
local found = {}
for _, n in ipairs(names) do
if peripheral.getType(n) == "speaker" then
table.insert(found, n)
if #found == 2 then break end
end
end
return found
end]]
--local speakers = find_speakers()
--if #speakers == 0 then error("no speaker found") end
local namesSpkrsL = {
"speaker_756",
"speaker_755",
"speaker_754",
}
local namesSpkrsR = {
"speaker_751",
"speaker_752",
"speaker_753",
}
local spkrsL = {}
local spkrsR = {}
for i=1,#namesSpkrsL do
spkrsL[i] = peripheral.wrap(namesSpkrsL[i])
spkrsR[i] = peripheral.wrap(namesSpkrsR[i])
end
--print("SpeakerL: " .. (speakers[1]).. ", SpeakerR: " .. (speakers[2] or speakers[1] ))
-------------------------------------------------------
-- deque queue implementation (fast push/pop)
-------------------------------------------------------
local function make_queue() return { buf = {}, head = 1, tail = 0 } end
local function q_len(q) return q.tail - q.head + 1 end
local function q_push(q, v) q.tail = q.tail + 1; q.buf[q.tail] = v end
local function q_pop(q) if q.head > q.tail then return nil end local v=q.buf[q.head]; q.buf[q.head]=nil; q.head=q.head+1; return v end
local function q_trim_to(q, n) while q_len(q) > n do q_pop(q) end end
-------------------------------------------------------
-- buffering policy
-------------------------------------------------------
local MIN_BUFFER, TARGET_BUFFER, MAX_BUFFER = 5, 10, 30
local frames = make_queue()
-------------------------------------------------------
-- helper logging
-------------------------------------------------------
local function log(event, extra)
local t = textutils.formatTime(os.time(), true)
if extra == nil then
table.insert(recent_logs.misc, {
log = event,
time = os.time()
})
if #recent_logs.misc > 5 then
table.remove(recent_logs.misc, 1)
end
else
recent_logs[event] = {
log = extra,
time = os.time()
}
end
--ui()
print(("[%s] %-12s | %s"):format(t, event, extra or ""))
end
-------------------------------------------------------
-- receiver
-------------------------------------------------------
local function receiver()
while true do
local msg = ws.receive((meta.frame_ms / 1000) * 1.5)
if msg then
q_push(frames, msg)
local n = q_len(frames)
log("recv", "frames=" .. n)
if n > MAX_BUFFER then
log("drop", ("queue %d>%d trimming to %d"):format(n, MAX_BUFFER, TARGET_BUFFER))
q_trim_to(frames, TARGET_BUFFER)
end
else
log("ws underrun")
end
end
end
-------------------------------------------------------
-- playback
-------------------------------------------------------
local function player()
local buffering = true
local last_log = 0
local decL, decR = make_decoder(), make_decoder()
while true do
local queued = q_len(frames)
if buffering then
if queued < MIN_BUFFER then
if os.clock() - last_log > 0.5 then
log("buffering", ("waiting %d/%d"):format(queued, MIN_BUFFER))
last_log = os.clock()
end
os.sleep(meta.frame_ms / 1000 / 4)
goto continue
else
log("start", ("starting playback with %d frames buffered"):format(queued))
buffering = false
end
end
local frame = q_pop(frames)
if not frame then
log("underrun", "no frame available — rebuffering")
buffering = true
else
if meta.channels_encoded == 2 and #frame >= meta.frame_bytes * 6 then
local B = meta.frame_bytes
-- Split frame into per-speaker byte chunks (L1,L2,L3,R1,R2,R3)
local lBytes = {
frame:sub(1, B),
frame:sub(B + 1, 2 * B),
frame:sub(2 * B + 1, 3 * B),
}
local rBytes = {
frame:sub(3 * B + 1, 4 * B),
frame:sub(4 * B + 1, 5 * B),
frame:sub(5 * B + 1, 6 * B),
}
-- Decode per speaker
local lPcm = { decL(lBytes[1]), decL(lBytes[2]), decL(lBytes[3]) }
local rPcm = { decR(rBytes[1]), decR(rBytes[2]), decR(rBytes[3]) }
for i = 1, 3 do spkrsL[i].playAudio(lPcm[i]) end
for i = 1, 3 do spkrsR[i].playAudio(rPcm[i]) end
local done = {}
for i = 1, 3 do done[namesSpkrsL[i]] = false; done[namesSpkrsR[i]] = false end
while true do
local allDone = true
for _, v in pairs(done) do if not v then allDone = false break end end
if allDone then
print("all speakers are done")
break end
local _, speaker = os.pullEvent("speaker_audio_empty")
print("speaker: ".. speaker .. " is done")
done[speaker] = true
end
else
-- mono stream or short packet: decode and play sequentially
local pcm = decL(frame)
if pcm then
local lDone = (not spkL)
local rDone = (not spkR)
while not (lDone and rDone) do
if not lDone then
lDone = spkL.playAudio(pcm)
end
if not rDone then
rDone = spkR.playAudio(pcm)
end
if not (lDone and rDone) then
os.pullEvent("speaker_audio_empty")
end
end
end
end
-- Pacing is governed by speaker_audio_empty waits above.
local n = q_len(frames)
if n < 1 then
log("underrun", "ran out after frame — rebuffering")
buffering = true
elseif n < MIN_BUFFER and os.clock() - last_log > 0.5 then
log("lowbuf", ("frames=%d < %d"):format(n, MIN_BUFFER))
last_log = os.clock()
end
end
::continue::
end
end
log("init", "waiting for frames…")
parallel.waitForAny(receiver, player)

1
lua/src/version Normal file
View file

@ -0,0 +1 @@
800

1
rust/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/target

1799
rust/Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

11
rust/Cargo.toml Normal file
View file

@ -0,0 +1,11 @@
[package]
name = "rust"
version = "0.1.0"
edition = "2024"
[dependencies]
axum = { version = "0.7", features = ["ws"] }
tokio = { version = "1", features = ["full"] }
clap = { version = "4", features = ["derive"] }
serde = { version = "1", features = ["derive"] }
cpal = "0.15"

562
rust/src/main.rs Normal file
View file

@ -0,0 +1,562 @@
use std::{
fmt::Write as _,
sync::Arc,
};
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
State,
},
response::IntoResponse,
routing::get,
Router,
};
use clap::Parser;
use cpal::{
traits::{DeviceTrait, HostTrait, StreamTrait},
Device, Sample, SampleFormat, SampleRate, Stream, StreamConfig,
};
use tokio::sync::broadcast;
#[derive(Parser, Debug)]
#[command(name = "livestream-cc", version, about = "Capture audio (PipeWire/CPAL) -> DFPWM -> WebSocket broadcast")]
struct Args {
// Substring to match an input device by name. If omitted, uses the default input device.
#[arg(long)]
device: Option<String>,
// Bind address for the WebSocket server.
#[arg(long, default_value = "127.0.0.1:8080")]
bind: String,
// Frame duration in milliseconds (must result in samples divisible by 8).
#[arg(long, default_value_t = 20)]
frame_ms: u32,
// Attempt to use this sample rate. Falls back to the device default if not supported.
#[arg(long)]
sample_rate: Option<u32>,
// List input devices and exit.
#[arg(long)]
list_devices: bool,
// NEW: request stereo (if device has ≥2 channels; mono dup if 1 ch)
#[arg(long)] stereo: bool,
}
struct AppState {
tx: broadcast::Sender<Vec<u8>>,
meta: StreamMeta,
}
#[derive(Clone)]
struct StreamMeta {
codec: &'static str,
const_prec: i32,
frame_ms: u32,
frame_samples: usize,
frame_bytes: usize, // DFPWM bytes per channel
sample_rate: u32,
channels_source: u16, // input channels from device
channels_encoded: u16, // 1 or 2
}
#[tokio::main]
async fn main() {
let args = Args::parse();
let host = cpal::default_host();
if args.list_devices {
list_input_devices(&host);
return;
}
// Pick device
let device = match pick_input_device(&host, args.device.as_deref()) {
Ok(dev) => dev,
Err(e) => {
eprintln!("Failed to get input device: {e}");
return;
}
};
println!("Using input device: {}", device.name().unwrap_or_else(|_| "<unknown>".into()));
// Pick config
let (mut config, sample_format) = match pick_stream_config(&device, args.sample_rate) {
Ok(cfg) => cfg,
Err(e) => {
eprintln!("Failed to get stream config: {e}");
return;
}
};
if args.stereo {
config.channels = 2;
}
println!(
"Device reports: {} Hz, {} channels, format {:?}",
config.sample_rate.0, config.channels, sample_format
);
let sr = config.sample_rate.0;
let ch = config.channels;
let enc_ch = if args.stereo { 2 } else { 1 };
// Determine per-frame sample count (ensure multiple of 8)
let mut frame_samples =
((sr as u64) * (args.frame_ms as u64) / 1000) as usize;
if frame_samples < 8 {
frame_samples = 8;
}
frame_samples -= frame_samples % 8;
let frame_bytes = frame_samples / 8;
let meta = StreamMeta {
codec: "dfpwm-1a",
const_prec: DfpwmEncoder::CONST_PREC,
frame_ms: args.frame_ms,
frame_samples,
frame_bytes, // per channel!
sample_rate: sr,
channels_source: ch,
channels_encoded: enc_ch,
};
println!(
"Configured: {} Hz, {}ch (downmix->mono), frame {} ms -> {} samples -> {} dfpwm bytes, CONST_PREC={}",
meta.sample_rate,
meta.channels_source,
meta.frame_ms,
meta.frame_samples,
meta.frame_bytes,
meta.const_prec
);
// Broadcast channel for DFPWM frames
let (tx, _rx) = broadcast::channel::<Vec<u8>>(128);
let state = Arc::new(AppState { tx: tx.clone(), meta: meta.clone() });
// Build and start CPAL stream
let stream = match build_cpal_stream(
&device,
&config,
sample_format,
frame_samples,
tx.clone(),
enc_ch
) {
Ok(s) => s,
Err(e) => {
eprintln!("Failed to build input stream: {e}");
return;
}
};
stream.play().expect("failed to start input stream");
// Axum WebSocket server
let app = Router::new()
.route("/ws", get(ws_handler))
.with_state(state);
println!("WebSocket server listening on ws://{}/ws", args.bind);
let listener = match tokio::net::TcpListener::bind(&args.bind).await {
Ok(l) => l,
Err(e) => {
eprintln!("Failed to bind {}: {e}", args.bind);
return;
}
};
if let Err(e) = axum::serve(listener, app).await {
eprintln!("Server error: {e}");
}
// Keep the stream alive
drop(stream);
}
async fn ws_handler(ws: WebSocketUpgrade, State(state): State<Arc<AppState>>) -> impl IntoResponse {
ws.on_upgrade(move |socket| ws_on_upgraded(socket, state))
}
async fn ws_on_upgraded(mut socket: WebSocket, state: Arc<AppState>) {
// Send a small textual metadata message first.
if let Err(e) = socket
.send(Message::Text(build_meta_text(&state.meta)))
.await
{
eprintln!("WS send meta failed: {e}");
return;
}
let mut rx = state.tx.subscribe();
loop {
match rx.recv().await {
Ok(block) => {
if socket.send(Message::Binary(block)).await.is_err() {
break;
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
eprintln!("WS receiver lagged by {n} frames, dropping old frames");
continue;
}
Err(_) => break,
}
}
}
fn build_meta_text(m: &StreamMeta) -> String {
let mut s = String::new();
let _ = write!(
s,
"{{\"type\":\"meta\",\"codec\":\"{}\",\"const_prec\":{},\"frame_ms\":{},\"frame_samples\":{},\"frame_bytes\":{},\"sample_rate\":{},\"channels_source\":{},\"channels_encoded\":{}}}",
m.codec, m.const_prec, m.frame_ms, m.frame_samples, m.frame_bytes,
m.sample_rate, m.channels_source, m.channels_encoded
);
s
}
fn list_input_devices(host: &cpal::Host) {
println!("Input devices:");
match host.input_devices() {
Ok(devs) => {
for (i, d) in devs.enumerate() {
let name = d.name().unwrap_or_else(|_| "<unknown>".into());
println!(" [{i}] {name}");
}
}
Err(e) => eprintln!(" Failed to enumerate input devices: {e}"),
}
}
/// Pick an input device, matching by substring if provided; else default input device.
fn pick_input_device(host: &cpal::Host, name_substr: Option<&str>) -> Result<Device, String> {
if let Some(sub) = name_substr {
let sub_l = sub.to_lowercase();
let mut found: Option<Device> = None;
for d in host.input_devices().map_err(|e| e.to_string())? {
let name = d.name().unwrap_or_else(|_| "<unknown>".into());
if name.to_lowercase().contains(&sub_l) {
found = Some(d);
break;
}
}
if let Some(d) = found {
Ok(d)
} else {
Err(format!("No input device matching substring: {sub}"))
}
} else {
host.default_input_device().ok_or_else(|| "No default input device".into())
}
}
/// Pick a usable stream config, preferring the requested sample rate if possible.
fn pick_stream_config(
device: &Device,
target_sample_rate: Option<u32>,
) -> Result<(StreamConfig, SampleFormat), String> {
// Try supported configs first so we can request a specific sample rate.
if let Ok(mut supported) = device.supported_input_configs() {
let mut candidates = Vec::new();
for cfg in supported {
candidates.push(cfg);
}
// Try to find exact sample rate match
if let Some(sr) = target_sample_rate {
// Most devices expose ranges; pick one that contains the sample rate.
for supp in &candidates {
if supp.min_sample_rate().0 <= sr && sr <= supp.max_sample_rate().0 {
let sf = supp.sample_format();
let cfg = StreamConfig {
channels: supp.channels(),
sample_rate: SampleRate(sr),
buffer_size: cpal::BufferSize::Default,
};
return Ok((cfg, sf));
}
}
eprintln!("Target sample rate {sr} not in any supported range; falling back to default input config");
}
}
// Fall back to device default config
let default_cfg = device.default_input_config().map_err(|e| e.to_string())?;
let sf = default_cfg.sample_format();
Ok((default_cfg.config(), sf))
}
/// Build and return a CPAL input stream that performs:
/// - downmix to mono
/// - convert to i8 PCM [-128,127]
/// - DFPWM encode in frames
/// - broadcast frames over a channel
fn build_cpal_stream(
device: &Device,
cfg: &StreamConfig,
sample_format: SampleFormat,
frame_samples: usize,
tx: broadcast::Sender<Vec<u8>>,
enc_ch: u16,
) -> Result<Stream, String> {
match sample_format {
SampleFormat::F32 => build_stream_f32(device, cfg, frame_samples, tx),
SampleFormat::I16 => build_stream_i16(device, cfg, frame_samples, tx),
SampleFormat::U16 => build_stream_u16(device, cfg, frame_samples, tx),
SampleFormat::U8 => build_stream_u8(device, cfg, frame_samples, tx, enc_ch),
other => Err(format!("Unsupported sample format: {other:?}")),
}
}
fn build_stream_f32(
device: &Device,
cfg: &StreamConfig,
frame_samples: usize,
tx: broadcast::Sender<Vec<u8>>,
) -> Result<Stream, String> {
let channels = cfg.channels as usize;
let mut encoder = DfpwmEncoder::new();
let mut acc: Vec<i8> = Vec::with_capacity(frame_samples * 2);
let mut process_buffer = move |data: &[f32]| {
// Downmix to mono and convert to i8
for frame in data.chunks(channels) {
let mut sum = 0.0f32;
for &s in frame {
sum += s;
}
let f = sum / channels as f32;
let i = f32_to_i8(f);
acc.push(i);
}
// Encode full frames
while acc.len() >= frame_samples {
let frame: Vec<i8> = acc.drain(..frame_samples).collect();
let encoded = encoder.encode(&frame);
let _ = tx.send(encoded);
}
};
let err_fn = |err: cpal::StreamError| {
eprintln!("Stream error: {err}");
};
device
.build_input_stream(cfg, move |data: &[f32], _| process_buffer(data), err_fn, None)
.map_err(|e| e.to_string())
}
fn build_stream_i16(
device: &Device,
cfg: &StreamConfig,
frame_samples: usize,
tx: broadcast::Sender<Vec<u8>>,
) -> Result<Stream, String> {
let channels = cfg.channels as usize;
let mut encoder = DfpwmEncoder::new();
let mut acc: Vec<i8> = Vec::with_capacity(frame_samples * 2);
let mut process_buffer = move |data: &[i16]| {
for frame in data.chunks(channels) {
let mut sum = 0.0f32;
for &s in frame {
// Map i16 [-32768, 32767] -> f32 [-1.0, 1.0)
sum += (s as f32) / 32768.0;
}
let f = sum / channels as f32;
let i = f32_to_i8(f);
acc.push(i);
}
while acc.len() >= frame_samples {
let frame: Vec<i8> = acc.drain(..frame_samples).collect();
let encoded = encoder.encode(&frame);
let _ = tx.send(encoded);
}
};
let err_fn = |err: cpal::StreamError| {
eprintln!("Stream error: {err}");
};
device
.build_input_stream(cfg, move |data: &[i16], _| process_buffer(data), err_fn, None)
.map_err(|e| e.to_string())
}
fn build_stream_u8(
device: &Device,
cfg: &StreamConfig,
frame_samples: usize,
tx: broadcast::Sender<Vec<u8>>,
enc_ch: u16, // 1 or 2
) -> Result<Stream, String> {
let channels = cfg.channels as usize;
let mut accL: Vec<i8> = Vec::with_capacity(frame_samples * 2);
let mut accR: Vec<i8> = Vec::with_capacity(frame_samples * 2);
let mut enc = DfpwmEncoder::new();
let mut process_buffer = move |data: &[u8]| {
for frame in data.chunks(channels.max(1)) {
// Convert to f32 [-1,1)
let s0 = ((frame.get(0).copied().unwrap_or(128) as f32) - 128.0) / 128.0;
let left_i8 = f32_to_i8(s0);
let right_i8 = if enc_ch == 2 {
let s1 = if channels >= 2 {
((frame[1] as f32) - 128.0) / 128.0
} else {
s0 // mono dup to right
};
f32_to_i8(s1)
} else {
left_i8 // mono encoder
};
accL.push(left_i8);
if enc_ch == 2 { accR.push(right_i8); }
}
while accL.len() >= frame_samples && (enc_ch == 1 || accR.len() >= frame_samples) {
let frameL: Vec<i8> = accL.drain(..frame_samples).collect();
if enc_ch == 2 {
let frameR: Vec<i8> = accR.drain(..frame_samples).collect();
let L = enc.encode(&frameL);
let R = enc.encode(&frameR);
// LR concatenation: [left][right]
let mut packet = Vec::with_capacity(L.len() * 3 + R.len() * 3);
packet.extend_from_slice(&L);
packet.extend_from_slice(&L);
packet.extend_from_slice(&L);
packet.extend_from_slice(&R);
packet.extend_from_slice(&R);
packet.extend_from_slice(&R);
let _ = tx.send(packet);
} else {
let outL = enc.encode(&frameL);
let _ = tx.send(outL);
}
}
};
let err_fn = |err: cpal::StreamError| eprintln!("Stream error: {err}");
device
.build_input_stream(cfg, move |data: &[u8], _| process_buffer(data), err_fn, None)
.map_err(|e| e.to_string())
}
fn build_stream_u16(
device: &Device,
cfg: &StreamConfig,
frame_samples: usize,
tx: broadcast::Sender<Vec<u8>>,
) -> Result<Stream, String> {
let channels = cfg.channels as usize;
let mut encoder = DfpwmEncoder::new();
let mut acc: Vec<i8> = Vec::with_capacity(frame_samples * 2);
let mut process_buffer = move |data: &[u16]| {
for frame in data.chunks(channels) {
let mut sum = 0.0f32;
for &s in frame {
// Map u16 [0, 65535] -> f32 [-1.0, 1.0)
sum += ((s as f32) - 32768.0) / 32768.0;
}
let f = sum / channels as f32;
let i = f32_to_i8(f);
acc.push(i);
}
while acc.len() >= frame_samples {
let frame: Vec<i8> = acc.drain(..frame_samples).collect();
let encoded = encoder.encode(&frame);
let _ = tx.send(encoded);
}
};
let err_fn = |err: cpal::StreamError| {
eprintln!("Stream error: {err}");
};
device
.build_input_stream(cfg, move |data: &[u16], _| process_buffer(data), err_fn, None)
.map_err(|e| e.to_string())
}
/// Convert f32 [-1.0, 1.0] to i8 [-128, 127] with clamping and rounding.
fn f32_to_i8(x: f32) -> i8 {
// Scale so -1.0 -> -128, 1.0 -> 127, then clamp.
let v = (x * 128.0).round() as i32;
v.clamp(-128, 127) as i8
}
struct DfpwmEncoder {
q: i32,
s: i32,
lt: i32,
}
impl DfpwmEncoder {
const CONST_PREC: i32 = 10;
fn new() -> Self {
Self { q: 0, s: 0, lt: -128 }
}
fn encode(&mut self, input: &[i8]) -> Vec<u8> {
assert!(
input.len() % 8 == 0,
"DFPWM encode expects input length multiple of 8"
);
let mut out = Vec::with_capacity(input.len() / 8);
for chunk in input.chunks(8) {
let mut d: u8 = 0;
for (bit, &v_i8) in chunk.iter().enumerate() {
let v = v_i8 as i32;
let t = if v < self.q || v == -128 { -128 } else { 127 };
if t > 0 {
d |= 1 << bit;
}
let mut nq = self.q + ((self.s * (t - self.q) + (1 << (Self::CONST_PREC - 1))) >> Self::CONST_PREC);
if nq == self.q && nq != t {
nq += if t == 127 { 1 } else { -1 };
}
self.q = nq;
let st = if t != self.lt { 0 } else { (1 << Self::CONST_PREC) - 1 };
let mut ns = self.s;
if ns != st {
ns += if st != 0 { 1 } else { -1 };
}
if Self::CONST_PREC > 8 {
let min = 1 + (1 << (Self::CONST_PREC - 8));
if ns < min { ns = min; }
}
self.s = ns;
self.lt = t;
}
out.push(d);
}
out
}
}