diff --git a/Cargo.lock b/Cargo.lock index 22f6a01..ffa10ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -149,6 +149,19 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +[[package]] +name = "parallel" +version = "0.1.0" +dependencies = [ + "anyhow", + "noprocess", + "rand", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", +] + [[package]] name = "parking_lot" version = "0.12.1" diff --git a/crates/noprocess/src/lib.rs b/crates/noprocess/src/lib.rs index e075509..f8c0ccb 100644 --- a/crates/noprocess/src/lib.rs +++ b/crates/noprocess/src/lib.rs @@ -14,10 +14,14 @@ use tracing::{debug, info, warn}; pub use error::{BoxError, Error, ProcessFailure, ProcessFailureKind, ProcessResult}; pub use handle_id::HandleID; -pub use process::{Process, ProcessBuilder}; +pub use process::{Process, ProcessBuilder, ProcessState}; pub use process_handler::ProcessHandler; pub use shutdown_config::ShutdownConfig; +/// Re-export tokio watch receiver for state subscriptions +pub use tokio::sync::watch::Receiver as StateReceiver; + +#[derive(Clone)] pub struct ProcessManager { inner: Arc>, } @@ -58,6 +62,57 @@ impl ProcessManager { pub async fn kill_process(&self, handle_id: &HandleID) -> Result, Error> { self.inner.lock().await.kill_process(handle_id).await } + + /// Returns the current state of a process, or None if the process doesn't exist + pub async fn get_process_state(&self, handle_id: &HandleID) -> Option { + self.inner.lock().await.get_process_state(handle_id) + } + + /// Returns a receiver that can be used to watch for state changes of a process. + /// + /// Returns None if the process doesn't exist. + /// + /// This is useful for building reconcilers that need to react to state changes. + pub async fn subscribe_process_state( + &self, + handle_id: &HandleID, + ) -> Option> { + self.inner.lock().await.subscribe_process_state(handle_id) + } + + /// Returns the state of all processes managed by this manager + pub async fn get_all_process_states(&self) -> Vec<(HandleID, ProcessState)> { + self.inner.lock().await.get_all_process_states() + } + + /// Returns a list of all process handle IDs + pub async fn list_processes(&self) -> Vec { + self.inner.lock().await.list_processes() + } + + /// Returns a list of processes that are in the errored state + pub async fn get_errored_processes(&self) -> Vec { + self.inner + .lock() + .await + .get_processes_in_state(ProcessState::Errored) + } + + /// Returns a list of processes that are stopped + pub async fn get_stopped_processes(&self) -> Vec { + self.inner + .lock() + .await + .get_processes_in_state(ProcessState::Stopped) + } + + /// Returns a list of processes that are currently running + pub async fn get_running_processes(&self) -> Vec { + self.inner + .lock() + .await + .get_processes_in_state(ProcessState::Running) + } } #[derive(Default)] @@ -113,4 +168,34 @@ impl Inner { Ok(None) } + + fn get_process_state(&self, handle_id: &HandleID) -> Option { + self.handles.get(handle_id).map(|p| p.state()) + } + + fn subscribe_process_state( + &self, + handle_id: &HandleID, + ) -> Option> { + self.handles.get(handle_id).map(|p| p.subscribe_state()) + } + + fn get_all_process_states(&self) -> Vec<(HandleID, ProcessState)> { + self.handles + .iter() + .map(|(id, p)| (id.clone(), p.state())) + .collect() + } + + fn list_processes(&self) -> Vec { + self.handles.keys().cloned().collect() + } + + fn get_processes_in_state(&self, state: ProcessState) -> Vec { + self.handles + .iter() + .filter(|(_, p)| p.state() == state) + .map(|(id, _)| id.clone()) + .collect() + } } diff --git a/crates/noprocess/src/process.rs b/crates/noprocess/src/process.rs index a8ee253..3d3381e 100644 --- a/crates/noprocess/src/process.rs +++ b/crates/noprocess/src/process.rs @@ -251,7 +251,38 @@ impl Process { .map_err(|_| Error::RunnerTerminated) } - pub(crate) async fn wait_for(&self, desired: ProcessState) { + /// Returns the current state of the process + pub fn state(&self) -> ProcessState { + *self.state.borrow() + } + + /// Returns a receiver that can be used to watch for state changes. + /// + /// This is useful for building reconcilers that need to react to state changes. + /// + /// # Example + /// ```ignore + /// let mut state_rx = process.subscribe_state(); + /// loop { + /// let state = *state_rx.borrow_and_update(); + /// match state { + /// ProcessState::Errored => { + /// // Restart the process + /// process.start().await?; + /// } + /// _ => {} + /// } + /// if state_rx.changed().await.is_err() { + /// break; // Process was dropped + /// } + /// } + /// ``` + pub fn subscribe_state(&self) -> watch::Receiver { + self.state.clone() + } + + /// Wait until the process reaches the desired state + pub async fn wait_for(&self, desired: ProcessState) { let mut rx = self.state.clone(); while *rx.borrow_and_update() != desired { if rx.changed().await.is_err() { @@ -260,6 +291,21 @@ impl Process { } } + /// Returns true if the process is currently running + pub fn is_running(&self) -> bool { + self.state() == ProcessState::Running + } + + /// Returns true if the process has errored + pub fn is_errored(&self) -> bool { + self.state() == ProcessState::Errored + } + + /// Returns true if the process is stopped + pub fn is_stopped(&self) -> bool { + self.state() == ProcessState::Stopped + } + pub fn handle_id(&self) -> &HandleID { &self.handle_id } @@ -414,10 +460,15 @@ enum ProcessCommand { Kill { response: oneshot::Sender<()> }, } -#[derive(PartialEq)] -pub(crate) enum ProcessState { +/// The current state of a process +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum ProcessState { + /// Process has been created but not yet started Pending, + /// Process is currently running Running, + /// Process encountered an error and stopped Errored, + /// Process was stopped (either manually or completed successfully) Stopped, } diff --git a/examples/parallel/Cargo.toml b/examples/parallel/Cargo.toml new file mode 100644 index 0000000..5066fc6 --- /dev/null +++ b/examples/parallel/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "parallel" +version = "0.1.0" +edition = "2024" + +[dependencies] +noprocess.workspace = true + +tokio.workspace = true +tokio-util.workspace = true +tracing.workspace = true +anyhow.workspace = true +rand.workspace = true +tracing-subscriber = "0.3.22" diff --git a/examples/parallel/src/main.rs b/examples/parallel/src/main.rs new file mode 100644 index 0000000..6c984ed --- /dev/null +++ b/examples/parallel/src/main.rs @@ -0,0 +1,208 @@ +//! Example: Running 10,000 parallel processes with a reconciler +//! +//! This example demonstrates running many concurrent processes that: +//! - Randomly wait for varying durations +//! - Randomly complete successfully or encounter errors +//! - Automatically restart on success +//! - Get restarted by a reconciler when they error +//! +//! Run with: cargo run -p parallel + +use std::{ + future::Future, + sync::atomic::{AtomicUsize, Ordering}, + time::Duration, +}; + +use noprocess::{ + Process, ProcessHandler, ProcessManager, ProcessResult, ProcessState, ShutdownConfig, +}; +use rand::{Rng, SeedableRng}; +use tokio_util::sync::CancellationToken; + +/// Number of parallel processes to spawn +const NUM_PROCESSES: usize = 100_000; + +/// Shared counters for tracking process activity +static RUNNING: AtomicUsize = AtomicUsize::new(0); +static COMPLETED: AtomicUsize = AtomicUsize::new(0); +static ERRORS: AtomicUsize = AtomicUsize::new(0); +static TOTAL_RUNS: AtomicUsize = AtomicUsize::new(0); +static RECONCILER_RESTARTS: AtomicUsize = AtomicUsize::new(0); + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + + println!("Starting {} parallel processes...", NUM_PROCESSES); + + let process_manager = ProcessManager::new(); + let mut handle_ids = Vec::with_capacity(NUM_PROCESSES); + + // Create and add all processes + // Note: restart_on_success is false so processes don't auto-restart. + // The reconciler handles restarting errored processes instead. + for i in 0..NUM_PROCESSES { + let process = Process::builder(RandomWorker { id: i }) + .handle_id(format!("worker-{i}")) + .shutdown_config(ShutdownConfig { + graceful_timeout: Duration::from_secs(1), + restart_on_success: false, + restart_delay: Duration::from_millis(100), + }) + .build(); + + let handle_id = process_manager.add_process(process).await; + handle_ids.push(handle_id); + } + + println!("All processes added. Starting them..."); + + // Start all processes (serialized due to inner mutex) + for id in &handle_ids { + process_manager.start_process(id).await?; + } + + println!("All processes started!"); + println!(); + + // Monitor loop - print stats every second + let monitor_handle = tokio::spawn(async { + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + let total_runs = TOTAL_RUNS.load(Ordering::Relaxed); + let restarts = total_runs.saturating_sub(NUM_PROCESSES); + println!( + "Stats: running={}, completed={}, errors={}, restarts={}, reconciler_restarts={}", + RUNNING.load(Ordering::Relaxed), + COMPLETED.load(Ordering::Relaxed), + ERRORS.load(Ordering::Relaxed), + restarts, + RECONCILER_RESTARTS.load(Ordering::Relaxed), + ); + } + }); + + // Reconciler loop - check for errored processes and restart them + let reconciler_manager = process_manager.clone(); + let reconciler_handle = tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_millis(500)).await; + + let pending = reconciler_manager + .get_all_process_states() + .await + .into_iter() + .filter(|(_, p)| p == &ProcessState::Stopped || p == &ProcessState::Errored) + .collect::>(); + + for (handle_id, _) in &pending { + // Check state again to avoid race conditions + if let Some(ProcessState::Stopped) = + reconciler_manager.get_process_state(handle_id).await + && reconciler_manager.start_process(handle_id).await.is_ok() + { + RECONCILER_RESTARTS.fetch_add(1, Ordering::Relaxed); + } + + if let Some(ProcessState::Errored) = + reconciler_manager.get_process_state(handle_id).await + && reconciler_manager.start_process(handle_id).await.is_ok() + { + RECONCILER_RESTARTS.fetch_add(1, Ordering::Relaxed); + } + } + } + }); + + // Let it run for 10 seconds + tokio::time::sleep(Duration::from_secs(10)).await; + + // Stop reconciler first to prevent it from restarting processes during shutdown + reconciler_handle.abort(); + + println!(); + println!("Stopping all processes..."); + + // Stop all processes (serialized due to inner mutex) + for id in &handle_ids { + let _ = process_manager.stop_process(id).await; + } + + monitor_handle.abort(); + + println!(); + println!("Final stats:"); + println!(" Total completed: {}", COMPLETED.load(Ordering::Relaxed)); + println!(" Total errors: {}", ERRORS.load(Ordering::Relaxed)); + let total_runs = TOTAL_RUNS.load(Ordering::Relaxed); + println!( + " Total restarts (auto): {}", + total_runs.saturating_sub(NUM_PROCESSES) + ); + println!( + " Reconciler restarts: {}", + RECONCILER_RESTARTS.load(Ordering::Relaxed) + ); + + // Demonstrate state inspection API + println!(); + println!("Final process states:"); + let running = process_manager.get_running_processes().await; + let stopped = process_manager.get_stopped_processes().await; + let errored = process_manager.get_errored_processes().await; + println!(" Running: {}", running.len()); + println!(" Stopped: {}", stopped.len()); + println!(" Errored: {}", errored.len()); + + Ok(()) +} + +/// A worker process that randomly waits, completes, or errors +pub struct RandomWorker { + id: usize, +} + +impl ProcessHandler for RandomWorker { + fn call(&self, cancellation: CancellationToken) -> impl Future + Send { + let id = self.id; + + async move { + RUNNING.fetch_add(1, Ordering::Relaxed); + TOTAL_RUNS.fetch_add(1, Ordering::Relaxed); + + // Use StdRng which is Send-safe + let mut rng = rand::rngs::StdRng::from_entropy(); + + loop { + // Random wait between 10ms and 500ms + let wait_ms = rng.gen_range(10..500); + + tokio::select! { + _ = tokio::time::sleep(Duration::from_millis(wait_ms)) => { + // Decide what to do next + let action: u8 = rng.gen_range(0..100); + + if action < 5 { + // 5% chance: error out + RUNNING.fetch_sub(1, Ordering::Relaxed); + ERRORS.fetch_add(1, Ordering::Relaxed); + return Err(format!("Worker {id} encountered an error").into()); + } else if action < 15 { + // 10% chance: complete successfully (will restart due to config) + RUNNING.fetch_sub(1, Ordering::Relaxed); + COMPLETED.fetch_add(1, Ordering::Relaxed); + return Ok(()); + } + // 85% chance: continue working + } + _ = cancellation.cancelled() => { + RUNNING.fetch_sub(1, Ordering::Relaxed); + COMPLETED.fetch_add(1, Ordering::Relaxed); + return Ok(()); + } + } + } + } + } +}