From 1fc3127b84d0378cdc412c88bbb1d2eb5b6eec57 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Wed, 7 Jan 2026 14:38:17 +0100 Subject: [PATCH] refactor: into more reasonable project structure Signed-off-by: kjuulh --- Cargo.lock | 176 ++++------- Cargo.toml | 1 + crates/noprocess/Cargo.toml | 3 +- crates/noprocess/src/handle_id.rs | 35 +++ crates/noprocess/src/lib.rs | 393 ++++-------------------- crates/noprocess/src/process.rs | 351 +++++++++++++++++++++ crates/noprocess/src/process_handler.rs | 44 +++ crates/noprocess/src/shutdown_config.rs | 17 + 8 files changed, 571 insertions(+), 449 deletions(-) create mode 100644 crates/noprocess/src/handle_id.rs create mode 100644 crates/noprocess/src/process.rs create mode 100644 crates/noprocess/src/process_handler.rs create mode 100644 crates/noprocess/src/shutdown_config.rs diff --git a/Cargo.lock b/Cargo.lock index 29e24d3..8874e37 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20,12 +20,6 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" -[[package]] -name = "bumpalo" -version = "3.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" - [[package]] name = "bytes" version = "1.11.0" @@ -52,24 +46,13 @@ checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "getrandom" -version = "0.3.4" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" +checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" dependencies = [ "cfg-if", "libc", - "r-efi", - "wasip2", -] - -[[package]] -name = "js-sys" -version = "0.3.83" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "464a3709c7f55f1f721e5389aa6ea4e3bc6aba669353300af094b29ffbdde1d8" -dependencies = [ - "once_cell", - "wasm-bindgen", + "wasi", ] [[package]] @@ -116,11 +99,10 @@ name = "noprocess" version = "0.1.0" dependencies = [ "anyhow", - "thiserror", + "rand", "tokio", "tokio-util", "tracing", - "uuid", ] [[package]] @@ -167,6 +149,15 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "ppv-lite86" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" +dependencies = [ + "zerocopy", +] + [[package]] name = "proc-macro2" version = "1.0.105" @@ -186,10 +177,34 @@ dependencies = [ ] [[package]] -name = "r-efi" -version = "5.3.0" +name = "rand" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] [[package]] name = "redox_syscall" @@ -200,12 +215,6 @@ dependencies = [ "bitflags", ] -[[package]] -name = "rustversion" -version = "1.0.22" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" - [[package]] name = "scopeguard" version = "1.1.0" @@ -269,26 +278,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "thiserror" -version = "2.0.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" -dependencies = [ - "thiserror-impl", -] - -[[package]] -name = "thiserror-impl" -version = "2.0.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "thread_local" version = "1.1.9" @@ -404,17 +393,6 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15811caf2415fb889178633e7724bad2509101cde276048e013b9def5e51fa0" -[[package]] -name = "uuid" -version = "1.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a" -dependencies = [ - "getrandom", - "js-sys", - "wasm-bindgen", -] - [[package]] name = "valuable" version = "0.1.1" @@ -427,60 +405,6 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" -[[package]] -name = "wasip2" -version = "1.0.1+wasi-0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" -dependencies = [ - "wit-bindgen", -] - -[[package]] -name = "wasm-bindgen" -version = "0.2.106" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d759f433fa64a2d763d1340820e46e111a7a5ab75f993d1852d70b03dbb80fd" -dependencies = [ - "cfg-if", - "once_cell", - "rustversion", - "wasm-bindgen-macro", - "wasm-bindgen-shared", -] - -[[package]] -name = "wasm-bindgen-macro" -version = "0.2.106" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48cb0d2638f8baedbc542ed444afc0644a29166f1595371af4fecf8ce1e7eeb3" -dependencies = [ - "quote", - "wasm-bindgen-macro-support", -] - -[[package]] -name = "wasm-bindgen-macro-support" -version = "0.2.106" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cefb59d5cd5f92d9dcf80e4683949f15ca4b511f4ac0a6e14d4e1ac60c6ecd40" -dependencies = [ - "bumpalo", - "proc-macro2", - "quote", - "syn", - "wasm-bindgen-shared", -] - -[[package]] -name = "wasm-bindgen-shared" -version = "0.2.106" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbc538057e648b67f72a982e708d485b2efa771e1ac05fec311f9f63e5800db4" -dependencies = [ - "unicode-ident", -] - [[package]] name = "windows-link" version = "0.2.1" @@ -628,7 +552,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" [[package]] -name = "wit-bindgen" -version = "0.46.0" +name = "zerocopy" +version = "0.8.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" +checksum = "1fabae64378cb18147bb18bca364e63bdbe72a0ffe4adf0addfec8aa166b2c56" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9c2d862265a8bb4471d87e033e730f536e2a285cc7cb05dbce09a2a97075f90" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/Cargo.toml b/Cargo.toml index a9b813e..9ada945 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ resolver = "2" noprocess = { path = "crates/noprocess" } anyhow = { version = "1.0.71" } +rand = "0.8" tokio = { version = "1", features = ["full"] } tracing = { version = "0.1", features = ["log"] } tokio-util = "0.7.18" diff --git a/crates/noprocess/Cargo.toml b/crates/noprocess/Cargo.toml index c9596b2..04dd945 100644 --- a/crates/noprocess/Cargo.toml +++ b/crates/noprocess/Cargo.toml @@ -5,8 +5,7 @@ edition = "2024" [dependencies] anyhow.workspace = true -thiserror = "2.0.17" +rand.workspace = true tokio.workspace = true tokio-util.workspace = true tracing.workspace = true -uuid = { version = "1.19.0", features = ["v4", "v7"] } diff --git a/crates/noprocess/src/handle_id.rs b/crates/noprocess/src/handle_id.rs new file mode 100644 index 0000000..2edd579 --- /dev/null +++ b/crates/noprocess/src/handle_id.rs @@ -0,0 +1,35 @@ +use std::fmt::Display; + +const ID_BYTES: usize = 6; + +#[derive(Debug, Clone, Ord, Eq, PartialEq, PartialOrd)] +pub struct HandleID(String); + +impl HandleID { + pub(crate) fn new_random() -> Self { + Self(generate_short_id()) + } +} + +impl Display for HandleID { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.0) + } +} + +impl From<&str> for HandleID { + fn from(value: &str) -> Self { + Self(value.to_string()) + } +} + +impl From for HandleID { + fn from(value: String) -> Self { + Self(value) + } +} + +pub(crate) fn generate_short_id() -> String { + let bytes: [u8; ID_BYTES] = rand::random(); + bytes.iter().map(|b| format!("{b:02x}")).collect() +} diff --git a/crates/noprocess/src/lib.rs b/crates/noprocess/src/lib.rs index 5f71a62..6a10a48 100644 --- a/crates/noprocess/src/lib.rs +++ b/crates/noprocess/src/lib.rs @@ -1,35 +1,72 @@ -use std::{collections::BTreeMap, fmt::Display, pin::Pin, sync::Arc, time::Duration}; +mod handle_id; +mod process; +mod process_handler; +mod shutdown_config; -use tokio::{ - sync::{Mutex, mpsc, oneshot, watch}, - task::JoinHandle, -}; -use tokio_util::sync::CancellationToken; +use std::{collections::BTreeMap, sync::Arc}; + +use tokio::sync::Mutex; +use tracing::{debug, info, warn}; + +pub use handle_id::HandleID; +pub use process::Process; +pub use process_handler::ProcessHandler; +pub use shutdown_config::ShutdownConfig; pub struct ProcessManager { - inner: Arc>, + inner: Arc>, } -#[derive(Default)] -pub struct InnerProcessManager { - handles: BTreeMap, +impl Default for ProcessManager { + fn default() -> Self { + Self::new() + } } -impl InnerProcessManager { +impl ProcessManager { pub fn new() -> Self { + debug!("creating process manager"); Self { - handles: BTreeMap::default(), + inner: Arc::new(Mutex::new(Inner::default())), } } - pub fn add_process(&mut self, process: Process) -> &mut Self { - self.handles.insert(process.handle_id.clone(), process); - - self + pub async fn add_process(&self, process: Process) { + let handle_id = process.handle_id().clone(); + info!(%handle_id, "adding process to manager"); + self.inner.lock().await.add_process(process); } - pub async fn start_process(&mut self, handle_id: &HandleID) -> anyhow::Result> { + pub async fn start_process(&self, handle_id: &HandleID) -> anyhow::Result> { + self.inner.lock().await.start_process(handle_id).await + } + + pub async fn restart_process(&self, handle_id: &HandleID) -> anyhow::Result> { + self.inner.lock().await.restart_process(handle_id).await + } + + pub async fn stop_process(&self, handle_id: &HandleID) -> anyhow::Result> { + self.inner.lock().await.stop_process(handle_id).await + } + + pub async fn kill_process(&self, handle_id: &HandleID) -> anyhow::Result> { + self.inner.lock().await.kill_process(handle_id).await + } +} + +#[derive(Default)] +struct Inner { + handles: BTreeMap, +} + +impl Inner { + fn add_process(&mut self, process: Process) { + self.handles.insert(process.handle_id().clone(), process); + } + + async fn start_process(&mut self, handle_id: &HandleID) -> anyhow::Result> { let Some(handle) = self.handles.get_mut(handle_id) else { + warn!(%handle_id, "process not found"); return Ok(None); }; @@ -38,8 +75,9 @@ impl InnerProcessManager { Ok(None) } - pub async fn restart_process(&mut self, handle_id: &HandleID) -> anyhow::Result> { + async fn restart_process(&mut self, handle_id: &HandleID) -> anyhow::Result> { let Some(handle) = self.handles.get_mut(handle_id) else { + warn!(%handle_id, "process not found"); return Ok(None); }; @@ -48,8 +86,9 @@ impl InnerProcessManager { Ok(None) } - pub async fn stop_process(&mut self, handle_id: &HandleID) -> anyhow::Result> { + async fn stop_process(&mut self, handle_id: &HandleID) -> anyhow::Result> { let Some(handle) = self.handles.get_mut(handle_id) else { + warn!(%handle_id, "process not found"); return Ok(None); }; @@ -57,317 +96,15 @@ impl InnerProcessManager { Ok(None) } -} -pub struct Process { - handle_id: HandleID, - command_tx: mpsc::Sender, - state: watch::Receiver, - task: tokio::task::JoinHandle<()>, -} + async fn kill_process(&mut self, handle_id: &HandleID) -> anyhow::Result> { + let Some(handle) = self.handles.get_mut(handle_id) else { + warn!(%handle_id, "process not found"); + return Ok(None); + }; -impl Process { - pub fn new(process: impl IntoProcess) -> Self { - Self::new_with_handle(HandleID::new_random(), process) - } + handle.kill().await?; - pub fn new_with_handle(handle_id: impl Into, process: impl IntoProcess) -> Self { - let handle_id = handle_id.into(); - let process = process.into_process(); - let shutdown_config = ShutdownConfig::default(); - let (command_tx, command_rx) = mpsc::channel(1); - let (event_tx, event_rx) = watch::channel(ProcessState::Pending); - - let handle = tokio::spawn(Self::run_loop( - handle_id.clone(), - process, - command_rx, - shutdown_config, - event_tx, - )); - - Self { - handle_id, - state: event_rx, - command_tx, - - task: handle, - } - } - - async fn run_loop( - handle_id: HandleID, - process: SharedProcess, - mut commands: mpsc::Receiver, - shutdown_config: ShutdownConfig, - state: watch::Sender, - ) { - let mut current_task: Option = None; - - loop { - match current_task.take() { - Some(mut task) => { - tokio::select! { - biased; - Some(cmd) = commands.recv() => { - current_task = Self::handle_command_while_running( - cmd, - task, - &state, - &shutdown_config - ).await; - } - - result = &mut task.handle => { - Self::handle_task_completion(result, process.clone(), &state, &shutdown_config, &mut current_task).await; - } - } - } - None => match commands.recv().await { - Some(ProcessCommand::Start) => { - if matches!( - *state.borrow(), - ProcessState::Pending | ProcessState::Stopped | ProcessState::Errored - ) { - current_task = Some(RunningTask::spawn(process.clone())); - let _ = state.send(ProcessState::Running); - } - } - Some(ProcessCommand::Stop { response } | ProcessCommand::Kill { response }) => { - let _ = response.send(()); - } - None => break, - }, - } - } - } - - pub async fn start(&mut self) -> anyhow::Result<()> { - self.command_tx - .send(ProcessCommand::Start) - .await - .map_err(|_| anyhow::anyhow!("runner terminated")) - } - - pub async fn stop(&mut self) -> anyhow::Result<()> { - let (resp_tx, resp_rx) = oneshot::channel(); - - self.command_tx - .send(ProcessCommand::Stop { response: resp_tx }) - .await - .map_err(|_| anyhow::anyhow!("runner terminated"))?; - - resp_rx - .await - .map_err(|_| anyhow::anyhow!("runner terminated")) - } - - pub async fn kill(&mut self) -> anyhow::Result<()> { - let (resp_tx, resp_rx) = oneshot::channel(); - - self.command_tx - .send(ProcessCommand::Kill { response: resp_tx }) - .await - .map_err(|_| anyhow::anyhow!("runner terminated"))?; - - resp_rx - .await - .map_err(|_| anyhow::anyhow!("runner terminated")) - } - - pub async fn restart(&mut self) -> anyhow::Result<()> { - self.stop().await?; - self.start().await - } - - pub(crate) async fn wait_for(&self, desired: ProcessState) { - let mut rx = self.state.clone(); - while *rx.borrow_and_update() != desired { - if rx.changed().await.is_err() { - break; - } - } - } - - pub fn handle_id(&self) -> &HandleID { - &self.handle_id - } - - async fn handle_command_while_running( - cmd: ProcessCommand, - task: RunningTask, - state: &watch::Sender, - config: &ShutdownConfig, - ) -> Option { - match cmd { - ProcessCommand::Start => Some(task), - ProcessCommand::Stop { response } => { - task.shutdown_gracefully(config.graceful_timeout).await; - let _ = state.send(ProcessState::Stopped); - let _ = response.send(()); - None - } - ProcessCommand::Kill { response } => { - task.handle.abort(); - let _ = task.handle.await; - let _ = state.send(ProcessState::Stopped); - let _ = response.send(()); - None - } - } - } - - async fn handle_task_completion( - result: Result, tokio::task::JoinError>, - process: SharedProcess, - state: &watch::Sender, - config: &ShutdownConfig, - current_task: &mut Option, - ) { - match result { - Ok(Ok(())) => { - if config.restart_on_success { - tokio::time::sleep(config.restart_delay).await; - *current_task = Some(RunningTask::spawn(process)); - } else { - let _ = state.send(ProcessState::Stopped); - } - } - Ok(Err(e)) => { - tracing::error!("process error: {e:#}"); - let _ = state.send(ProcessState::Errored); - } - Err(e) if e.is_cancelled() => { - // Aborted - state already set by kill handler - } - Err(e) => { - tracing::error!("process panicked: {e}"); - let _ = state.send(ProcessState::Errored); - } - } - } -} - -impl Drop for Process { - fn drop(&mut self) { - self.task.abort(); - } -} - -struct RunningTask { - handle: JoinHandle>, - cancellation: CancellationToken, -} - -impl RunningTask { - pub fn spawn(process: SharedProcess) -> Self { - let cancellation = CancellationToken::new(); - let handle = tokio::spawn({ - let cancellation = cancellation.child_token(); - async move { process.process.call_async(cancellation).await } - }); - - Self { - handle, - cancellation, - } - } - - async fn shutdown_gracefully(mut self, timeout: Duration) { - self.cancellation.cancel(); - match tokio::time::timeout(timeout, &mut self.handle).await { - Ok(_) => { - tracing::debug!("process stopped") - } - Err(_) => { - tracing::error!("graceful shutdown timed out, process will be dropped"); - self.handle.abort(); - } - } - } -} - -enum ProcessCommand { - Start, - Stop { response: oneshot::Sender<()> }, - Kill { response: oneshot::Sender<()> }, -} - -pub struct ShutdownConfig { - pub graceful_timeout: Duration, - pub restart_on_success: bool, - pub restart_delay: Duration, -} - -impl Default for ShutdownConfig { - fn default() -> Self { - Self { - graceful_timeout: Duration::from_secs(5), - restart_on_success: true, - restart_delay: Duration::ZERO, - } - } -} - -#[derive(PartialEq)] -pub(crate) enum ProcessState { - Pending, - Running, - Errored, - Stopped, -} - -#[derive(Debug, Clone, Ord, Eq, PartialEq, PartialOrd)] -pub struct HandleID(String); -impl HandleID { - fn new_random() -> Self { - Self(uuid::Uuid::now_v7().to_string()) - } -} - -impl Display for HandleID { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(&self.0) - } -} - -#[allow(async_fn_in_trait)] -pub trait ProcessHandler: Send + Sync + 'static { - fn call( - &self, - cancellation: CancellationToken, - ) -> impl Future> + Send; -} - -trait AsyncProcess: Send + Sync { - fn call_async( - &self, - cancellation: CancellationToken, - ) -> Pin> + Send + '_>>; -} - -impl AsyncProcess for E { - fn call_async( - &self, - cancellation: CancellationToken, - ) -> Pin> + Send + '_>> { - Box::pin(self.call(cancellation)) - } -} - -#[derive(Clone)] -pub struct SharedProcess { - process: Arc, -} - -pub trait IntoProcess { - fn into_process(self) -> SharedProcess; -} - -impl IntoProcess for E { - fn into_process(self) -> SharedProcess { - SharedProcess { - process: Arc::new(self), - } + Ok(None) } } diff --git a/crates/noprocess/src/process.rs b/crates/noprocess/src/process.rs new file mode 100644 index 0000000..83a99b9 --- /dev/null +++ b/crates/noprocess/src/process.rs @@ -0,0 +1,351 @@ +use std::time::Duration; + +use tokio::{ + sync::{mpsc, oneshot, watch}, + task::JoinHandle, +}; +use tokio_util::sync::CancellationToken; +use tracing::{Instrument, debug, debug_span, info, trace, warn}; + +use crate::{ + handle_id::{generate_short_id, HandleID}, + process_handler::{IntoProcess, ProcessHandler, SharedProcess}, + shutdown_config::ShutdownConfig, +}; + +/// Unique identifier for each process invocation (generated each time a process starts) +#[derive(Debug, Clone)] +struct InvocationID(String); + +impl InvocationID { + fn new() -> Self { + Self(generate_short_id()) + } +} + +impl std::fmt::Display for InvocationID { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.0) + } +} + +pub struct Process { + handle_id: HandleID, + command_tx: mpsc::Sender, + state: watch::Receiver, + task: JoinHandle<()>, +} + +impl Process { + pub fn new(process: impl ProcessHandler) -> Self { + Self::new_with_handle(HandleID::new_random(), process) + } + + pub fn new_with_handle(handle_id: impl Into, process: impl ProcessHandler) -> Self { + let handle_id = handle_id.into(); + let process = process.into_process(); + let shutdown_config = ShutdownConfig::default(); + let (command_tx, command_rx) = mpsc::channel(1); + let (event_tx, event_rx) = watch::channel(ProcessState::Pending); + + debug!(handle_id = %handle_id, "creating process"); + + let span = debug_span!("process_loop", handle_id = %handle_id); + let handle = tokio::spawn( + Self::run_loop(handle_id.clone(), process, command_rx, shutdown_config, event_tx) + .instrument(span), + ); + + Self { + handle_id, + state: event_rx, + command_tx, + task: handle, + } + } + + async fn run_loop( + handle_id: HandleID, + process: SharedProcess, + mut commands: mpsc::Receiver, + shutdown_config: ShutdownConfig, + state: watch::Sender, + ) { + let mut current_task: Option = None; + + trace!("entering run loop"); + + loop { + match current_task.take() { + Some(mut task) => { + tokio::select! { + biased; + Some(cmd) = commands.recv() => { + trace!(command = ?cmd, "received command while running"); + current_task = Self::handle_command_while_running( + cmd, + task, + &state, + &shutdown_config + ).await; + } + + result = &mut task.handle => { + let invocation_id = task.invocation_id.clone(); + trace!(%invocation_id, "task completed, handling result"); + Self::handle_task_completion( + result, + process.clone(), + &handle_id, + &state, + &shutdown_config, + &mut current_task + ).await; + } + } + } + None => match commands.recv().await { + Some(ProcessCommand::Start) => { + if matches!( + *state.borrow(), + ProcessState::Pending | ProcessState::Stopped | ProcessState::Errored + ) { + let task = RunningTask::spawn(process.clone(), &handle_id); + debug!( + invocation_id = %task.invocation_id, + "process started" + ); + current_task = Some(task); + let _ = state.send(ProcessState::Running); + } else { + debug!("ignoring start command, process already running"); + } + } + Some(ProcessCommand::Stop { response } | ProcessCommand::Kill { response }) => { + trace!("received stop/kill command while not running"); + let _ = response.send(()); + } + None => { + trace!("command channel closed, exiting run loop"); + break; + } + }, + } + } + + trace!("exiting run loop"); + } + + pub async fn start(&mut self) -> anyhow::Result<()> { + info!(handle_id = %self.handle_id, "starting process"); + self.command_tx + .send(ProcessCommand::Start) + .await + .map_err(|_| anyhow::anyhow!("runner terminated")) + } + + pub async fn stop(&mut self) -> anyhow::Result<()> { + info!(handle_id = %self.handle_id, "stopping process"); + let (resp_tx, resp_rx) = oneshot::channel(); + + self.command_tx + .send(ProcessCommand::Stop { response: resp_tx }) + .await + .map_err(|_| anyhow::anyhow!("runner terminated"))?; + + resp_rx + .await + .map_err(|_| anyhow::anyhow!("runner terminated")) + } + + pub async fn kill(&mut self) -> anyhow::Result<()> { + info!(handle_id = %self.handle_id, "killing process"); + let (resp_tx, resp_rx) = oneshot::channel(); + + self.command_tx + .send(ProcessCommand::Kill { response: resp_tx }) + .await + .map_err(|_| anyhow::anyhow!("runner terminated"))?; + + resp_rx + .await + .map_err(|_| anyhow::anyhow!("runner terminated")) + } + + pub async fn restart(&mut self) -> anyhow::Result<()> { + info!(handle_id = %self.handle_id, "restarting process"); + + // Stop without the info log + let (resp_tx, resp_rx) = oneshot::channel(); + self.command_tx + .send(ProcessCommand::Stop { response: resp_tx }) + .await + .map_err(|_| anyhow::anyhow!("runner terminated"))?; + resp_rx + .await + .map_err(|_| anyhow::anyhow!("runner terminated"))?; + + // Start without the info log + self.command_tx + .send(ProcessCommand::Start) + .await + .map_err(|_| anyhow::anyhow!("runner terminated")) + } + + pub(crate) async fn wait_for(&self, desired: ProcessState) { + let mut rx = self.state.clone(); + while *rx.borrow_and_update() != desired { + if rx.changed().await.is_err() { + break; + } + } + } + + pub fn handle_id(&self) -> &HandleID { + &self.handle_id + } + + async fn handle_command_while_running( + cmd: ProcessCommand, + task: RunningTask, + state: &watch::Sender, + config: &ShutdownConfig, + ) -> Option { + let invocation_id = task.invocation_id.clone(); + + match cmd { + ProcessCommand::Start => { + debug!(%invocation_id, "ignoring start command, process already running"); + Some(task) + } + ProcessCommand::Stop { response } => { + debug!(%invocation_id, timeout_secs = config.graceful_timeout.as_secs(), "initiating graceful shutdown"); + task.shutdown_gracefully(config.graceful_timeout).await; + let _ = state.send(ProcessState::Stopped); + let _ = response.send(()); + debug!(%invocation_id, "process stopped"); + None + } + ProcessCommand::Kill { response } => { + debug!(%invocation_id, "aborting process"); + task.handle.abort(); + let _ = task.handle.await; + let _ = state.send(ProcessState::Stopped); + let _ = response.send(()); + debug!(%invocation_id, "process killed"); + None + } + } + } + + async fn handle_task_completion( + result: Result, tokio::task::JoinError>, + process: SharedProcess, + handle_id: &HandleID, + state: &watch::Sender, + config: &ShutdownConfig, + current_task: &mut Option, + ) { + match result { + Ok(Ok(())) => { + debug!("process completed successfully"); + if config.restart_on_success { + if !config.restart_delay.is_zero() { + debug!(delay_ms = config.restart_delay.as_millis(), "waiting before restart"); + } + tokio::time::sleep(config.restart_delay).await; + let task = RunningTask::spawn(process, handle_id); + debug!(invocation_id = %task.invocation_id, "process auto-restarted"); + *current_task = Some(task); + } else { + debug!("restart_on_success disabled, not restarting"); + let _ = state.send(ProcessState::Stopped); + } + } + Ok(Err(e)) => { + warn!(error = %e, "process returned error"); + let _ = state.send(ProcessState::Errored); + } + Err(e) if e.is_cancelled() => { + trace!("task was cancelled/aborted"); + } + Err(e) => { + warn!(error = %e, "process panicked"); + let _ = state.send(ProcessState::Errored); + } + } + } +} + +impl Drop for Process { + fn drop(&mut self) { + trace!(handle_id = %self.handle_id, "dropping process, aborting task"); + self.task.abort(); + } +} + +struct RunningTask { + handle: JoinHandle>, + cancellation: CancellationToken, + invocation_id: InvocationID, +} + +impl RunningTask { + pub fn spawn(process: SharedProcess, handle_id: &HandleID) -> Self { + let cancellation = CancellationToken::new(); + let invocation_id = InvocationID::new(); + + trace!(%handle_id, invocation_id = %invocation_id, "spawning process task"); + + let span = debug_span!( + "process_invocation", + %handle_id, + invocation_id = %invocation_id + ); + + let handle = tokio::spawn({ + let cancellation = cancellation.child_token(); + async move { process.process.call_async(cancellation).await }.instrument(span) + }); + + Self { + handle, + cancellation, + invocation_id, + } + } + + async fn shutdown_gracefully(mut self, timeout: Duration) { + trace!(invocation_id = %self.invocation_id, "cancelling process token"); + self.cancellation.cancel(); + + match tokio::time::timeout(timeout, &mut self.handle).await { + Ok(_) => { + debug!(invocation_id = %self.invocation_id, "process stopped gracefully"); + } + Err(_) => { + warn!( + invocation_id = %self.invocation_id, + timeout_secs = timeout.as_secs(), + "graceful shutdown timed out, aborting" + ); + self.handle.abort(); + } + } + } +} + +#[derive(Debug)] +enum ProcessCommand { + Start, + Stop { response: oneshot::Sender<()> }, + Kill { response: oneshot::Sender<()> }, +} + +#[derive(PartialEq)] +pub(crate) enum ProcessState { + Pending, + Running, + Errored, + Stopped, +} diff --git a/crates/noprocess/src/process_handler.rs b/crates/noprocess/src/process_handler.rs new file mode 100644 index 0000000..b6f4412 --- /dev/null +++ b/crates/noprocess/src/process_handler.rs @@ -0,0 +1,44 @@ +use std::{future::Future, pin::Pin, sync::Arc}; + +use tokio_util::sync::CancellationToken; + +#[allow(async_fn_in_trait)] +pub trait ProcessHandler: Send + Sync + 'static { + fn call( + &self, + cancellation: CancellationToken, + ) -> impl Future> + Send; +} + +pub(crate) trait AsyncProcess: Send + Sync { + fn call_async( + &self, + cancellation: CancellationToken, + ) -> Pin> + Send + '_>>; +} + +impl AsyncProcess for E { + fn call_async( + &self, + cancellation: CancellationToken, + ) -> Pin> + Send + '_>> { + Box::pin(self.call(cancellation)) + } +} + +#[derive(Clone)] +pub(crate) struct SharedProcess { + pub(crate) process: Arc, +} + +pub(crate) trait IntoProcess { + fn into_process(self) -> SharedProcess; +} + +impl IntoProcess for E { + fn into_process(self) -> SharedProcess { + SharedProcess { + process: Arc::new(self), + } + } +} diff --git a/crates/noprocess/src/shutdown_config.rs b/crates/noprocess/src/shutdown_config.rs new file mode 100644 index 0000000..1adb93e --- /dev/null +++ b/crates/noprocess/src/shutdown_config.rs @@ -0,0 +1,17 @@ +use std::time::Duration; + +pub struct ShutdownConfig { + pub graceful_timeout: Duration, + pub restart_on_success: bool, + pub restart_delay: Duration, +} + +impl Default for ShutdownConfig { + fn default() -> Self { + Self { + graceful_timeout: Duration::from_secs(5), + restart_on_success: true, + restart_delay: Duration::ZERO, + } + } +}