feat: add a very high amount of parallel processes

The library is not intended to handle this many tasks created so fast, and as such it is not optimized for. Generally we only allow a single modifier to the process manager at once. However, even then it can start and run a lot of tasks at once. Note that these processes, are comparable to pods / wasm containers / etc. And as such it wouldn't be realistic to run so many in parallel.

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
2026-01-07 16:10:05 +01:00
parent 24503ab258
commit 5cae3ba93d
5 changed files with 375 additions and 4 deletions

13
Cargo.lock generated
View File

@@ -149,6 +149,19 @@ version = "1.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
[[package]]
name = "parallel"
version = "0.1.0"
dependencies = [
"anyhow",
"noprocess",
"rand",
"tokio",
"tokio-util",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "parking_lot"
version = "0.12.1"

View File

@@ -14,10 +14,14 @@ use tracing::{debug, info, warn};
pub use error::{BoxError, Error, ProcessFailure, ProcessFailureKind, ProcessResult};
pub use handle_id::HandleID;
pub use process::{Process, ProcessBuilder};
pub use process::{Process, ProcessBuilder, ProcessState};
pub use process_handler::ProcessHandler;
pub use shutdown_config::ShutdownConfig;
/// Re-export tokio watch receiver for state subscriptions
pub use tokio::sync::watch::Receiver as StateReceiver;
#[derive(Clone)]
pub struct ProcessManager {
inner: Arc<Mutex<Inner>>,
}
@@ -58,6 +62,57 @@ impl ProcessManager {
pub async fn kill_process(&self, handle_id: &HandleID) -> Result<Option<()>, Error> {
self.inner.lock().await.kill_process(handle_id).await
}
/// Returns the current state of a process, or None if the process doesn't exist
pub async fn get_process_state(&self, handle_id: &HandleID) -> Option<ProcessState> {
self.inner.lock().await.get_process_state(handle_id)
}
/// Returns a receiver that can be used to watch for state changes of a process.
///
/// Returns None if the process doesn't exist.
///
/// This is useful for building reconcilers that need to react to state changes.
pub async fn subscribe_process_state(
&self,
handle_id: &HandleID,
) -> Option<StateReceiver<ProcessState>> {
self.inner.lock().await.subscribe_process_state(handle_id)
}
/// Returns the state of all processes managed by this manager
pub async fn get_all_process_states(&self) -> Vec<(HandleID, ProcessState)> {
self.inner.lock().await.get_all_process_states()
}
/// Returns a list of all process handle IDs
pub async fn list_processes(&self) -> Vec<HandleID> {
self.inner.lock().await.list_processes()
}
/// Returns a list of processes that are in the errored state
pub async fn get_errored_processes(&self) -> Vec<HandleID> {
self.inner
.lock()
.await
.get_processes_in_state(ProcessState::Errored)
}
/// Returns a list of processes that are stopped
pub async fn get_stopped_processes(&self) -> Vec<HandleID> {
self.inner
.lock()
.await
.get_processes_in_state(ProcessState::Stopped)
}
/// Returns a list of processes that are currently running
pub async fn get_running_processes(&self) -> Vec<HandleID> {
self.inner
.lock()
.await
.get_processes_in_state(ProcessState::Running)
}
}
#[derive(Default)]
@@ -113,4 +168,34 @@ impl Inner {
Ok(None)
}
fn get_process_state(&self, handle_id: &HandleID) -> Option<ProcessState> {
self.handles.get(handle_id).map(|p| p.state())
}
fn subscribe_process_state(
&self,
handle_id: &HandleID,
) -> Option<tokio::sync::watch::Receiver<ProcessState>> {
self.handles.get(handle_id).map(|p| p.subscribe_state())
}
fn get_all_process_states(&self) -> Vec<(HandleID, ProcessState)> {
self.handles
.iter()
.map(|(id, p)| (id.clone(), p.state()))
.collect()
}
fn list_processes(&self) -> Vec<HandleID> {
self.handles.keys().cloned().collect()
}
fn get_processes_in_state(&self, state: ProcessState) -> Vec<HandleID> {
self.handles
.iter()
.filter(|(_, p)| p.state() == state)
.map(|(id, _)| id.clone())
.collect()
}
}

View File

@@ -251,7 +251,38 @@ impl Process {
.map_err(|_| Error::RunnerTerminated)
}
pub(crate) async fn wait_for(&self, desired: ProcessState) {
/// Returns the current state of the process
pub fn state(&self) -> ProcessState {
*self.state.borrow()
}
/// Returns a receiver that can be used to watch for state changes.
///
/// This is useful for building reconcilers that need to react to state changes.
///
/// # Example
/// ```ignore
/// let mut state_rx = process.subscribe_state();
/// loop {
/// let state = *state_rx.borrow_and_update();
/// match state {
/// ProcessState::Errored => {
/// // Restart the process
/// process.start().await?;
/// }
/// _ => {}
/// }
/// if state_rx.changed().await.is_err() {
/// break; // Process was dropped
/// }
/// }
/// ```
pub fn subscribe_state(&self) -> watch::Receiver<ProcessState> {
self.state.clone()
}
/// Wait until the process reaches the desired state
pub async fn wait_for(&self, desired: ProcessState) {
let mut rx = self.state.clone();
while *rx.borrow_and_update() != desired {
if rx.changed().await.is_err() {
@@ -260,6 +291,21 @@ impl Process {
}
}
/// Returns true if the process is currently running
pub fn is_running(&self) -> bool {
self.state() == ProcessState::Running
}
/// Returns true if the process has errored
pub fn is_errored(&self) -> bool {
self.state() == ProcessState::Errored
}
/// Returns true if the process is stopped
pub fn is_stopped(&self) -> bool {
self.state() == ProcessState::Stopped
}
pub fn handle_id(&self) -> &HandleID {
&self.handle_id
}
@@ -414,10 +460,15 @@ enum ProcessCommand {
Kill { response: oneshot::Sender<()> },
}
#[derive(PartialEq)]
pub(crate) enum ProcessState {
/// The current state of a process
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ProcessState {
/// Process has been created but not yet started
Pending,
/// Process is currently running
Running,
/// Process encountered an error and stopped
Errored,
/// Process was stopped (either manually or completed successfully)
Stopped,
}

View File

@@ -0,0 +1,14 @@
[package]
name = "parallel"
version = "0.1.0"
edition = "2024"
[dependencies]
noprocess.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true
anyhow.workspace = true
rand.workspace = true
tracing-subscriber = "0.3.22"

View File

@@ -0,0 +1,208 @@
//! Example: Running 10,000 parallel processes with a reconciler
//!
//! This example demonstrates running many concurrent processes that:
//! - Randomly wait for varying durations
//! - Randomly complete successfully or encounter errors
//! - Automatically restart on success
//! - Get restarted by a reconciler when they error
//!
//! Run with: cargo run -p parallel
use std::{
future::Future,
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};
use noprocess::{
Process, ProcessHandler, ProcessManager, ProcessResult, ProcessState, ShutdownConfig,
};
use rand::{Rng, SeedableRng};
use tokio_util::sync::CancellationToken;
/// Number of parallel processes to spawn
const NUM_PROCESSES: usize = 100_000;
/// Shared counters for tracking process activity
static RUNNING: AtomicUsize = AtomicUsize::new(0);
static COMPLETED: AtomicUsize = AtomicUsize::new(0);
static ERRORS: AtomicUsize = AtomicUsize::new(0);
static TOTAL_RUNS: AtomicUsize = AtomicUsize::new(0);
static RECONCILER_RESTARTS: AtomicUsize = AtomicUsize::new(0);
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
println!("Starting {} parallel processes...", NUM_PROCESSES);
let process_manager = ProcessManager::new();
let mut handle_ids = Vec::with_capacity(NUM_PROCESSES);
// Create and add all processes
// Note: restart_on_success is false so processes don't auto-restart.
// The reconciler handles restarting errored processes instead.
for i in 0..NUM_PROCESSES {
let process = Process::builder(RandomWorker { id: i })
.handle_id(format!("worker-{i}"))
.shutdown_config(ShutdownConfig {
graceful_timeout: Duration::from_secs(1),
restart_on_success: false,
restart_delay: Duration::from_millis(100),
})
.build();
let handle_id = process_manager.add_process(process).await;
handle_ids.push(handle_id);
}
println!("All processes added. Starting them...");
// Start all processes (serialized due to inner mutex)
for id in &handle_ids {
process_manager.start_process(id).await?;
}
println!("All processes started!");
println!();
// Monitor loop - print stats every second
let monitor_handle = tokio::spawn(async {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let total_runs = TOTAL_RUNS.load(Ordering::Relaxed);
let restarts = total_runs.saturating_sub(NUM_PROCESSES);
println!(
"Stats: running={}, completed={}, errors={}, restarts={}, reconciler_restarts={}",
RUNNING.load(Ordering::Relaxed),
COMPLETED.load(Ordering::Relaxed),
ERRORS.load(Ordering::Relaxed),
restarts,
RECONCILER_RESTARTS.load(Ordering::Relaxed),
);
}
});
// Reconciler loop - check for errored processes and restart them
let reconciler_manager = process_manager.clone();
let reconciler_handle = tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(500)).await;
let pending = reconciler_manager
.get_all_process_states()
.await
.into_iter()
.filter(|(_, p)| p == &ProcessState::Stopped || p == &ProcessState::Errored)
.collect::<Vec<_>>();
for (handle_id, _) in &pending {
// Check state again to avoid race conditions
if let Some(ProcessState::Stopped) =
reconciler_manager.get_process_state(handle_id).await
&& reconciler_manager.start_process(handle_id).await.is_ok()
{
RECONCILER_RESTARTS.fetch_add(1, Ordering::Relaxed);
}
if let Some(ProcessState::Errored) =
reconciler_manager.get_process_state(handle_id).await
&& reconciler_manager.start_process(handle_id).await.is_ok()
{
RECONCILER_RESTARTS.fetch_add(1, Ordering::Relaxed);
}
}
}
});
// Let it run for 10 seconds
tokio::time::sleep(Duration::from_secs(10)).await;
// Stop reconciler first to prevent it from restarting processes during shutdown
reconciler_handle.abort();
println!();
println!("Stopping all processes...");
// Stop all processes (serialized due to inner mutex)
for id in &handle_ids {
let _ = process_manager.stop_process(id).await;
}
monitor_handle.abort();
println!();
println!("Final stats:");
println!(" Total completed: {}", COMPLETED.load(Ordering::Relaxed));
println!(" Total errors: {}", ERRORS.load(Ordering::Relaxed));
let total_runs = TOTAL_RUNS.load(Ordering::Relaxed);
println!(
" Total restarts (auto): {}",
total_runs.saturating_sub(NUM_PROCESSES)
);
println!(
" Reconciler restarts: {}",
RECONCILER_RESTARTS.load(Ordering::Relaxed)
);
// Demonstrate state inspection API
println!();
println!("Final process states:");
let running = process_manager.get_running_processes().await;
let stopped = process_manager.get_stopped_processes().await;
let errored = process_manager.get_errored_processes().await;
println!(" Running: {}", running.len());
println!(" Stopped: {}", stopped.len());
println!(" Errored: {}", errored.len());
Ok(())
}
/// A worker process that randomly waits, completes, or errors
pub struct RandomWorker {
id: usize,
}
impl ProcessHandler for RandomWorker {
fn call(&self, cancellation: CancellationToken) -> impl Future<Output = ProcessResult> + Send {
let id = self.id;
async move {
RUNNING.fetch_add(1, Ordering::Relaxed);
TOTAL_RUNS.fetch_add(1, Ordering::Relaxed);
// Use StdRng which is Send-safe
let mut rng = rand::rngs::StdRng::from_entropy();
loop {
// Random wait between 10ms and 500ms
let wait_ms = rng.gen_range(10..500);
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(wait_ms)) => {
// Decide what to do next
let action: u8 = rng.gen_range(0..100);
if action < 5 {
// 5% chance: error out
RUNNING.fetch_sub(1, Ordering::Relaxed);
ERRORS.fetch_add(1, Ordering::Relaxed);
return Err(format!("Worker {id} encountered an error").into());
} else if action < 15 {
// 10% chance: complete successfully (will restart due to config)
RUNNING.fetch_sub(1, Ordering::Relaxed);
COMPLETED.fetch_add(1, Ordering::Relaxed);
return Ok(());
}
// 85% chance: continue working
}
_ = cancellation.cancelled() => {
RUNNING.fetch_sub(1, Ordering::Relaxed);
COMPLETED.fetch_add(1, Ordering::Relaxed);
return Ok(());
}
}
}
}
}
}