From 71c9d5139bcfac79c2e76c97a6ba99ee2dab0283 Mon Sep 17 00:00:00 2001 From: Matthew Gregan Date: Thu, 23 Apr 2026 16:14:57 +1200 Subject: [PATCH] Promote callback threads only while streams are active. Previously the client and server callback threads were promoted to real-time priority for their entire lifetime. Promote on the 0->1 active-stream transition and demote on 1->0 so idle contexts don't hold RT priority. Adds ipccore::EventLoopHandle::run_task to dispatch closures onto the event loop thread, since promote/demote must run on the target thread itself. Linux client demote is performed locally (SCHED_OTHER needs no privilege); the promote path still routes through the server via RPC. --- audioipc/src/ipccore.rs | 22 +++++++ client/src/context.rs | 55 +++++------------ client/src/lib.rs | 1 + client/src/stream.rs | 61 ++++++++++++++++++- client/src/thread_priority.rs | 109 ++++++++++++++++++++++++++++++++++ server/src/lib.rs | 19 ++++-- server/src/server.rs | 73 ++++++++++++++++++++--- server/src/thread_priority.rs | 56 +++++++++++++++++ 8 files changed, 340 insertions(+), 56 deletions(-) create mode 100644 client/src/thread_priority.rs create mode 100644 server/src/thread_priority.rs diff --git a/audioipc/src/ipccore.rs b/audioipc/src/ipccore.rs index 21c334d..b2fd36f 100644 --- a/audioipc/src/ipccore.rs +++ b/audioipc/src/ipccore.rs @@ -46,6 +46,10 @@ enum Request { Shutdown, // See EventLoop::wake_connection WakeConnection(Token), + // Run an arbitrary closure on the EventLoop's thread. Used to perform + // work that must execute on a specific thread (e.g. thread priority + // changes that can only be made by the target thread). + RunTask(Box), } // EventLoopHandle is a cloneable external reference @@ -144,6 +148,20 @@ impl EventLoopHandle { self.waker.wake() } + // Queue a closure to run on the EventLoop's thread. The closure is + // executed after any currently-pending requests and before the next + // poll iteration. Returns immediately without waiting for the closure + // to complete. + pub fn run_task(&self, f: F) -> Result<()> { + self.requests + .push(Request::RunTask(Box::new(f))) + .map_err(|_| { + debug!("EventLoopHandle::run_task send failed"); + io::ErrorKind::ConnectionAborted + })?; + self.waker.wake() + } + // Signal EventLoop to wake connection specified by `token` for processing. pub(crate) fn wake_connection(&self, token: Token) { if self.requests.push(Request::WakeConnection(token)).is_ok() { @@ -284,6 +302,10 @@ impl EventLoop { debug!("{}: EventLoop: handling shutdown", self.name); return Ok(false); } + Request::RunTask(f) => { + trace!("{}: EventLoop: handling run_task", self.name); + f(); + } Request::WakeConnection(token) => { debug!( "{}: EventLoop: handling wake_connection {:?}", diff --git a/client/src/context.rs b/client/src/context.rs index 7025ee2..814f8e9 100644 --- a/client/src/context.rs +++ b/client/src/context.rs @@ -6,10 +6,6 @@ use crate::stream; use crate::{assert_not_in_callback, run_in_callback}; use crate::{ClientStream, AUDIOIPC_INIT_PARAMS}; -#[cfg(target_os = "linux")] -use audio_thread_priority::get_current_thread_info; -#[cfg(not(target_os = "linux"))] -use audio_thread_priority::promote_current_thread_to_real_time; use audioipc::ipccore::EventLoopHandle; use audioipc::{ipccore, rpccore, sys, PlatformHandle}; use audioipc::{ @@ -22,6 +18,7 @@ use cubeb_backend::{ }; use std::ffi::{CStr, CString}; use std::os::raw::c_void; +use std::sync::atomic::AtomicUsize; use std::sync::{Arc, Mutex}; use std::thread; use std::{fmt, ptr}; @@ -46,6 +43,10 @@ pub struct ClientContext { backend_id: CString, device_collection_rpc: bool, device_collection_callbacks: Arc>, + // Number of ClientStreams on this context that are currently started. + // Used to gate callback-thread promotion: the thread is promoted on the + // 0->1 transition and demoted on 1->0. + pub(crate) active_streams: Arc, } impl ClientContext { @@ -65,31 +66,6 @@ impl ClientContext { } } -#[cfg(target_os = "linux")] -fn promote_thread(rpc: &rpccore::Proxy) { - match get_current_thread_info() { - Ok(info) => { - let bytes = info.serialize(); - let _ = rpc.call(ServerMessage::PromoteThreadToRealTime(bytes)); - } - Err(_) => { - warn!("Could not remotely promote thread to RT."); - } - } -} - -#[cfg(not(target_os = "linux"))] -fn promote_thread(_rpc: &rpccore::Proxy) { - match promote_current_thread_to_real_time(0, 48000) { - Ok(_) => { - info!("Audio thread promoted to real-time."); - } - Err(_) => { - warn!("Could not promote thread to real-time."); - } - } -} - fn register_thread(callback: Option) { if let Some(func) = callback { let thr = thread::current(); @@ -104,14 +80,6 @@ fn unregister_thread(callback: Option) { } } -fn promote_and_register_thread( - rpc: &rpccore::Proxy, - callback: Option, -) { - promote_thread(rpc); - register_thread(callback); -} - #[derive(Default)] struct DeviceCollectionCallbacks { input_cb: ffi::cubeb_device_collection_changed_callback, @@ -181,7 +149,6 @@ impl ContextOps for ClientContext { .handle() .bind_client::(server_connection) .map_err(|_| Error::Error)?; - let rpc2 = rpc.clone(); // Don't let errors bubble from here. Later calls against this context // will return errors the caller expects to handle. @@ -192,11 +159,18 @@ impl ContextOps for ClientContext { let backend_id = CString::new(backend_id).expect("backend_id query failed"); // TODO: remove params.pool_size from init params. + // The callback thread starts at normal priority; it is promoted on demand + // when the first stream on this context is started, and demoted when the + // last stream stops. let callback_thread = ipccore::EventLoopThread::new( "AudioIPC Client Callback".to_string(), Some(params.stack_size), - move || promote_and_register_thread(&rpc2, thread_create_callback), - move || unregister_thread(thread_destroy_callback), + move || register_thread(thread_create_callback), + move || { + // Best-effort: if still promoted at shutdown, demote first. + crate::thread_priority::demote(); + unregister_thread(thread_destroy_callback); + }, ) .map_err(|_| Error::Error)?; @@ -208,6 +182,7 @@ impl ContextOps for ClientContext { backend_id, device_collection_rpc: false, device_collection_callbacks: Arc::new(Mutex::new(Default::default())), + active_streams: Arc::new(AtomicUsize::new(0)), }); Ok(ctx) } diff --git a/client/src/lib.rs b/client/src/lib.rs index 99b3420..57cc6d1 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -11,6 +11,7 @@ extern crate log; mod send_recv; mod context; mod stream; +mod thread_priority; use crate::context::ClientContext; use crate::stream::ClientStream; diff --git a/client/src/stream.rs b/client/src/stream.rs index bfa9c96..1136733 100644 --- a/client/src/stream.rs +++ b/client/src/stream.rs @@ -3,6 +3,7 @@ // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details +use crate::thread_priority; use crate::ClientContext; use crate::{assert_not_in_callback, run_in_callback}; use audioipc::messages::StreamCreateParams; @@ -14,6 +15,7 @@ use std::convert::TryFrom; use std::ffi::{CStr, CString}; use std::os::raw::c_void; use std::ptr; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc; use std::sync::{Arc, Mutex}; @@ -45,6 +47,10 @@ pub struct ClientStream<'ctx> { device_change_cb: Arc>, // Signals ClientStream that CallbackServer has dropped. shutdown_rx: mpsc::Receiver<()>, + // Whether this stream is currently contributing to the context's + // active-stream count. Transitioned atomically on successful start/stop + // so concurrent calls can't over- or under-count. + active: AtomicBool, } struct CallbackServer { @@ -257,14 +263,61 @@ impl<'ctx> ClientStream<'ctx> { token: data.token, device_change_cb, shutdown_rx, + active: AtomicBool::new(false), })); Ok(unsafe { Stream::from_ptr(stream as *mut _) }) } + + // Flip `active` false->true and bump the context's active-stream count, + // dispatching `enter_active` to the callback thread on the 0->1 transition. + fn mark_active(&self) { + if self + .active + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + let prev = self.context.active_streams.fetch_add(1, Ordering::AcqRel); + if prev == 0 { + let rpc = self.context.rpc(); + if let Err(e) = self + .context + .callback_handle() + .run_task(move || thread_priority::promote(&rpc)) + { + warn!("failed to dispatch thread promotion: {e:?}"); + } + } + } + } + + // Flip `active` true->false and decrement the context's active-stream count, + // dispatching `leave_active` to the callback thread on the 1->0 transition. + fn mark_inactive(&self) { + if self + .active + .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + let prev = self.context.active_streams.fetch_sub(1, Ordering::AcqRel); + if prev == 1 { + if let Err(e) = self + .context + .callback_handle() + .run_task(thread_priority::demote) + { + warn!("failed to dispatch thread demotion: {e:?}"); + } + } + } + } } impl Drop for ClientStream<'_> { fn drop(&mut self) { debug!("ClientStream drop"); + // Release any contribution this stream was making to the context's + // active-stream count before tearing down the remote stream. + self.mark_inactive(); let _ = send_recv!(self.context.rpc(), StreamDestroy(self.token) => StreamDestroyed); debug!("ClientStream drop - stream destroyed"); // Wait for CallbackServer to shutdown. The remote server drops the RPC @@ -281,13 +334,17 @@ impl StreamOps for ClientStream<'_> { fn start(&mut self) -> Result<()> { assert_not_in_callback(); let rpc = self.context.rpc(); - send_recv!(rpc, StreamStart(self.token) => StreamStarted) + send_recv!(rpc, StreamStart(self.token) => StreamStarted)?; + self.mark_active(); + Ok(()) } fn stop(&mut self) -> Result<()> { assert_not_in_callback(); let rpc = self.context.rpc(); - send_recv!(rpc, StreamStop(self.token) => StreamStopped) + send_recv!(rpc, StreamStop(self.token) => StreamStopped)?; + self.mark_inactive(); + Ok(()) } fn position(&mut self) -> Result { diff --git a/client/src/thread_priority.rs b/client/src/thread_priority.rs new file mode 100644 index 0000000..9c44c8a --- /dev/null +++ b/client/src/thread_priority.rs @@ -0,0 +1,109 @@ +// Copyright © 2026 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +//! Per-thread real-time priority management for the client's callback thread. +//! +//! On Linux the promote path goes through the server via RPC (since sandboxed +//! content processes cannot call rtkit directly); demote is performed locally +//! via `pthread_setschedparam`, which does not require elevated privilege. +//! On macOS/Windows/Linux-no-dbus both promote and demote are performed +//! directly on the calling thread. +//! +//! All functions here are intended to run on the callback thread itself, +//! dispatched via `ipccore::EventLoopHandle::run_task`. + +#[cfg(not(target_os = "linux"))] +use audio_thread_priority::{ + demote_current_thread_from_real_time, promote_current_thread_to_real_time, RtPriorityHandle, +}; +#[cfg(target_os = "linux")] +use audio_thread_priority::{ + demote_thread_from_real_time, get_current_thread_info, RtPriorityThreadInfo, +}; + +use audioipc::rpccore::Proxy; +use audioipc::{ClientMessage, ServerMessage}; +use std::cell::RefCell; + +#[cfg(target_os = "linux")] +thread_local! { + // Thread info captured at promote time. Kept so that `demote` + // can demote locally without another round-trip to the server. + static THREAD_INFO: RefCell> = const { RefCell::new(None) }; +} + +#[cfg(not(target_os = "linux"))] +thread_local! { + static RT_HANDLE: RefCell> = const { RefCell::new(None) }; +} + +#[cfg(target_os = "linux")] +pub(crate) fn promote(rpc: &Proxy) { + THREAD_INFO.with(|slot| { + let mut slot = slot.borrow_mut(); + if slot.is_some() { + return; + } + match get_current_thread_info() { + Ok(info) => { + let bytes = info.serialize(); + if rpc + .call(ServerMessage::PromoteThreadToRealTime(bytes)) + .is_ok() + { + *slot = Some(info); + debug!("callback thread promoted to real-time via server"); + } else { + warn!("callback thread promotion RPC failed"); + } + } + Err(e) => warn!("get_current_thread_info failed: {e:?}"), + } + }); +} + +#[cfg(not(target_os = "linux"))] +pub(crate) fn promote(_rpc: &Proxy) { + RT_HANDLE.with(|slot| { + let mut slot = slot.borrow_mut(); + if slot.is_some() { + return; + } + match promote_current_thread_to_real_time(0, 48000) { + Ok(handle) => { + *slot = Some(handle); + debug!("callback thread promoted to real-time"); + } + Err(e) => warn!("failed to promote callback thread: {e:?}"), + } + }); +} + +#[cfg(target_os = "linux")] +pub(crate) fn demote() { + THREAD_INFO.with(|slot| { + if let Some(info) = slot.borrow_mut().take() { + // Demotion to SCHED_OTHER is always permitted; no RPC needed. + if let Err(e) = demote_thread_from_real_time(info) { + warn!("failed to demote callback thread: {e:?}"); + } else { + debug!("callback thread demoted from real-time"); + } + } + }); +} + +#[cfg(not(target_os = "linux"))] +pub(crate) fn demote() { + RT_HANDLE.with(|slot| { + if let Some(handle) = slot.borrow_mut().take() { + if let Err(e) = demote_current_thread_from_real_time(handle) { + warn!("failed to demote callback thread: {e:?}"); + } else { + debug!("callback thread demoted from real-time"); + } + } + }); +} diff --git a/server/src/lib.rs b/server/src/lib.rs index 9622c33..195864f 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -7,7 +7,6 @@ #[macro_use] extern crate log; -use audio_thread_priority::promote_current_thread_to_real_time; use audioipc::ipccore; use audioipc::sys; use audioipc::PlatformHandleType; @@ -15,10 +14,12 @@ use once_cell::sync::Lazy; use std::ffi::{CStr, CString}; use std::os::raw::c_void; use std::ptr; -use std::sync::Mutex; +use std::sync::atomic::AtomicUsize; +use std::sync::{Arc, Mutex}; use std::thread; mod server; +pub(crate) mod thread_priority; struct CubebContextParams { context_name: CString, @@ -38,6 +39,10 @@ struct ServerWrapper { rpc_thread: ipccore::EventLoopThread, callback_thread: ipccore::EventLoopThread, device_collection_thread: ipccore::EventLoopThread, + // Shared across all CubebServer instances hosted by this process. Counts + // the number of streams that are currently started; on 0->1/1->0 transitions + // the callback thread is promoted/demoted via ipccore::run_task. + active_streams: Arc, } fn register_thread(callback: Option) { @@ -82,12 +87,14 @@ fn init_threads( None, move || { trace!("Starting {callback_name} thread"); - if let Err(e) = promote_current_thread_to_real_time(0, 48000) { - debug!("Failed to promote {callback_name} thread to real-time: {e:?}"); - } + // Thread starts at normal priority; it is promoted on demand when + // the first stream is started, and demoted when the last stream stops. register_thread(thread_create_callback); }, move || { + // Best-effort: if the thread is still promoted at shutdown, drop it + // back to normal priority before unregistering. + thread_priority::demote(); unregister_thread(thread_destroy_callback); trace!("Stopping {callback_name} thread"); }, @@ -119,6 +126,7 @@ fn init_threads( rpc_thread, callback_thread, device_collection_thread, + active_streams: Arc::new(AtomicUsize::new(0)), }) } @@ -183,6 +191,7 @@ pub extern "C" fn audioipc2_server_new_client( let server = server::CubebServer::new( callback_thread.clone(), device_collection_thread.clone(), + wrapper.active_streams.clone(), remote_pid, shm_area_size, ); diff --git a/server/src/server.rs b/server/src/server.rs index 9033741..eba3b96 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -22,9 +22,12 @@ use std::mem::size_of; use std::os::raw::{c_int, c_long, c_void}; use std::rc::Rc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::Arc; use std::{cell::RefCell, sync::Mutex}; use std::{panic, slice}; +use crate::thread_priority; + use audioipc::errors::Result; fn error(error: cubeb::Error) -> ClientMessage { @@ -360,12 +363,31 @@ struct ServerStream { stream: Option, cbs: Box, client_pipe: Option, + // Whether this stream is currently contributing to the server's active-stream + // count. Flipped on successful cubeb start/stop transitions; false for a + // freshly-created stream that hasn't been started yet. + active: bool, + // Shared counter and callback-thread handle used to drive promote/demote + // transitions when this stream's active state changes. + active_streams: Arc, + callback_thread: ipccore::EventLoopHandle, } impl Drop for ServerStream { fn drop(&mut self) { // `stream` *must* be dropped before `cbs`. drop(self.stream.take()); + // An active stream being dropped (StreamDestroy or client disconnect) + // must release its contribution to the active-stream count. + if self.active { + self.active = false; + let prev = self.active_streams.fetch_sub(1, Ordering::AcqRel); + if prev == 1 { + if let Err(e) = self.callback_thread.run_task(thread_priority::demote) { + warn!("failed to dispatch thread demotion: {e:?}"); + } + } + } } } @@ -387,6 +409,9 @@ pub struct CubebServer { callback_thread: ipccore::EventLoopHandle, device_collection_thread: ipccore::EventLoopHandle, streams: slab::Slab, + // Server-wide active-stream counter, shared with every ServerStream so + // their Drop/start/stop can participate in the promote/demote refcount. + active_streams: Arc, remote_pid: Option, device_collection_change_callbacks: Option>, devidmap: DevIdMap, @@ -462,6 +487,7 @@ impl CubebServer { pub fn new( callback_thread: ipccore::EventLoopHandle, device_collection_thread: ipccore::EventLoopHandle, + active_streams: Arc, remote_pid: u32, shm_area_size: usize, ) -> Self { @@ -469,6 +495,7 @@ impl CubebServer { callback_thread, device_collection_thread, streams: slab::Slab::::new(), + active_streams, remote_pid: Some(remote_pid), device_collection_change_callbacks: None, devidmap: DevIdMap::new(), @@ -567,15 +594,40 @@ impl CubebServer { ClientMessage::StreamDestroyed } - ServerMessage::StreamStart(stm_tok) => try_stream!(self, stm_tok) - .start() - .map(|_| ClientMessage::StreamStarted) - .unwrap_or_else(error), - - ServerMessage::StreamStop(stm_tok) => try_stream!(self, stm_tok) - .stop() - .map(|_| ClientMessage::StreamStopped) - .unwrap_or_else(error), + ServerMessage::StreamStart(stm_tok) => match try_stream!(self, stm_tok).start() { + Ok(()) => { + let stream = &mut self.streams[stm_tok]; + if !stream.active { + stream.active = true; + let prev = self.active_streams.fetch_add(1, Ordering::AcqRel); + if prev == 0 { + if let Err(e) = self.callback_thread.run_task(thread_priority::promote) + { + warn!("failed to dispatch thread promotion: {e:?}"); + } + } + } + ClientMessage::StreamStarted + } + Err(e) => error(e), + }, + + ServerMessage::StreamStop(stm_tok) => match try_stream!(self, stm_tok).stop() { + Ok(()) => { + let stream = &mut self.streams[stm_tok]; + if stream.active { + stream.active = false; + let prev = self.active_streams.fetch_sub(1, Ordering::AcqRel); + if prev == 1 { + if let Err(e) = self.callback_thread.run_task(thread_priority::demote) { + warn!("failed to dispatch thread demotion: {e:?}"); + } + } + } + ClientMessage::StreamStopped + } + Err(e) => error(e), + }, ServerMessage::StreamGetPosition(stm_tok) => try_stream!(self, stm_tok) .position() @@ -783,6 +835,9 @@ impl CubebServer { stream: None, cbs, client_pipe: Some(client_pipe), + active: false, + active_streams: self.active_streams.clone(), + callback_thread: self.callback_thread.clone(), }); Ok(ClientMessage::StreamCreated(StreamCreate { diff --git a/server/src/thread_priority.rs b/server/src/thread_priority.rs new file mode 100644 index 0000000..d42a803 --- /dev/null +++ b/server/src/thread_priority.rs @@ -0,0 +1,56 @@ +// Copyright © 2026 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +//! Per-thread real-time priority management for the server's callback thread. +//! +//! Promote/demote operations must be performed on the target thread on every +//! supported platform (macOS/Windows use per-thread state that isn't safely +//! modifiable from other threads, and Linux's self-demote asserts +//! `pthread_self()` matches the saved handle). Callers arrange for these +//! functions to run on the callback thread via `ipccore::EventLoopHandle::run_task`. + +use audio_thread_priority::{ + demote_current_thread_from_real_time, promote_current_thread_to_real_time, RtPriorityHandle, +}; +use std::cell::RefCell; + +thread_local! { + // The RT priority handle for this thread, if currently promoted. + static RT_HANDLE: RefCell> = const { RefCell::new(None) }; +} + +/// Promote the current thread to real-time audio priority. Idempotent: a +/// second call on an already-promoted thread is a no-op. +pub(crate) fn promote() { + RT_HANDLE.with(|slot| { + let mut slot = slot.borrow_mut(); + if slot.is_some() { + return; + } + match promote_current_thread_to_real_time(0, 48000) { + Ok(handle) => { + *slot = Some(handle); + debug!("callback thread promoted to real-time"); + } + Err(e) => { + warn!("failed to promote callback thread to real-time: {e:?}"); + } + } + }); +} + +/// Demote the current thread from real-time audio priority. Idempotent: a +/// call on an un-promoted thread is a no-op. +pub(crate) fn demote() { + RT_HANDLE.with(|slot| { + if let Some(handle) = slot.borrow_mut().take() { + if let Err(e) = demote_current_thread_from_real_time(handle) { + warn!("failed to demote callback thread from real-time: {e:?}"); + } else { + debug!("callback thread demoted from real-time"); + } + } + }); +}