diff --git a/Cargo.lock b/Cargo.lock index b43ee7a..0d3cd6b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -178,6 +178,17 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "pipeline" +version = "0.1.0" +dependencies = [ + "noprocess", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", +] + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -261,6 +272,17 @@ version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" +[[package]] +name = "scheduled" +version = "0.1.0" +dependencies = [ + "noprocess", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", +] + [[package]] name = "scopeguard" version = "1.1.0" @@ -696,6 +718,17 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" +[[package]] +name = "worker" +version = "0.1.0" +dependencies = [ + "noprocess", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", +] + [[package]] name = "zerocopy" version = "0.8.32" diff --git a/README.md b/README.md new file mode 100644 index 0000000..9b1557d --- /dev/null +++ b/README.md @@ -0,0 +1,62 @@ +# noprocess + +A lightweight Rust library for managing long-running processes with graceful shutdown, restart capabilities, and error handling. + +![demo](assets/demo.gif) + +Designed to work with [nocontrol](https://git.kjuulh.io/kjuulh/nocontrol) for distributed orchestration of Rust workloads — think Kubernetes pods, but for native Rust code. + +## Usage + +```rust +use noprocess::{Process, ProcessHandler, ProcessManager, ProcessResult}; +use tokio_util::sync::CancellationToken; + +struct MyPipeline; + +impl ProcessHandler for MyPipeline { + fn call(&self, cancel: CancellationToken) -> impl Future + Send { + async move { + loop { + tokio::select! { + _ = cancel.cancelled() => break, + _ = do_work() => {} + } + } + Ok(()) + } + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let manager = ProcessManager::new(); + + let id = manager.add_process(Process::new(MyPipeline)).await; + manager.start_process(&id).await?; + + // Later: stop, restart, or kill + manager.stop_process(&id).await?; + Ok(()) +} +``` + +## Features + +- **Graceful shutdown** — processes receive a cancellation token and can clean up +- **Configurable timeouts** — force-kill stubborn processes after a deadline +- **Auto-restart** — optionally restart processes that complete successfully +- **Error callbacks** — handle failures and panics with custom logic +- **Process lifecycle** — start, stop, restart, kill individual processes + +## Examples + +```sh +cargo run --bin simple # Basic start/stop/restart +cargo run --bin pipeline # Data pipeline with backpressure +cargo run --bin worker # Worker pool pattern +``` + +## License + +MIT diff --git a/assets/demo.gif b/assets/demo.gif new file mode 100644 index 0000000..9753a42 Binary files /dev/null and b/assets/demo.gif differ diff --git a/assets/demo.tape b/assets/demo.tape new file mode 100644 index 0000000..2194936 --- /dev/null +++ b/assets/demo.tape @@ -0,0 +1,33 @@ +# VHS script for noprocess demo +# https://github.com/charmbracelet/vhs + +Output assets/demo.gif + +Set Shell "bash" +Set FontSize 14 +Set Width 800 +Set Height 400 +Set Theme "Catppuccin Mocha" + +Type "# noprocess - process lifecycle management for Rust" +Enter +Sleep 1s + +Type "# Let's run the pipeline example" +Enter +Sleep 500ms + +Type "cargo run --bin pipeline" +Enter + +Sleep 8s + +Type "" +Enter +Sleep 500ms + +Type "# Producer sends data, consumer processes it" +Enter +Type "# Graceful shutdown drains the queue" +Enter +Sleep 2s diff --git a/examples/pipeline/Cargo.toml b/examples/pipeline/Cargo.toml new file mode 100644 index 0000000..03493d8 --- /dev/null +++ b/examples/pipeline/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "pipeline" +version = "0.1.0" +edition = "2024" + +[dependencies] +noprocess.workspace = true +tokio.workspace = true +tokio-util.workspace = true +tracing.workspace = true +tracing-subscriber = "0.3" diff --git a/examples/pipeline/src/main.rs b/examples/pipeline/src/main.rs new file mode 100644 index 0000000..019d523 --- /dev/null +++ b/examples/pipeline/src/main.rs @@ -0,0 +1,127 @@ +use std::{future::Future, sync::Arc, time::Duration}; + +use noprocess::{Process, ProcessHandler, ProcessManager, ProcessResult, ShutdownConfig}; +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; + +#[tokio::main] +async fn main() -> Result<(), noprocess::Error> { + tracing_subscriber::fmt::init(); + + let manager = ProcessManager::new(); + + // Create a channel for the pipeline + let (tx, rx) = mpsc::channel::(100); + + // Producer: generates data + let producer = Process::builder(Producer { tx }) + .handle_id("producer") + .shutdown_config(ShutdownConfig { + graceful_timeout: Duration::from_secs(2), + restart_on_success: true, + restart_delay: Duration::from_millis(100), + }) + .build(); + + // Consumer: processes data + let consumer = Process::builder(Consumer { rx: Arc::new(tokio::sync::Mutex::new(rx)) }) + .handle_id("consumer") + .shutdown_config(ShutdownConfig { + graceful_timeout: Duration::from_secs(5), + restart_on_success: false, + restart_delay: Duration::ZERO, + }) + .on_error(|e| eprintln!("Consumer error: {e}")) + .build(); + + let producer_id = manager.add_process(producer).await; + let consumer_id = manager.add_process(consumer).await; + + // Start consumer first, then producer + manager.start_process(&consumer_id).await?; + manager.start_process(&producer_id).await?; + + // Let it run for a bit + tokio::time::sleep(Duration::from_secs(3)).await; + + // Graceful shutdown: stop producer first, let consumer drain + println!("\n--- Initiating graceful shutdown ---"); + manager.stop_process(&producer_id).await?; + tokio::time::sleep(Duration::from_millis(500)).await; + manager.stop_process(&consumer_id).await?; + + println!("Pipeline stopped cleanly"); + Ok(()) +} + +struct Producer { + tx: mpsc::Sender, +} + +impl ProcessHandler for Producer { + fn call(&self, cancel: CancellationToken) -> impl Future + Send { + let tx = self.tx.clone(); + async move { + let mut counter = 0u64; + let mut interval = tokio::time::interval(Duration::from_millis(200)); + + loop { + tokio::select! { + _ = cancel.cancelled() => { + println!("[producer] shutting down at count {counter}"); + break; + } + _ = interval.tick() => { + if tx.send(counter).await.is_err() { + println!("[producer] consumer gone, stopping"); + break; + } + println!("[producer] sent {counter}"); + counter += 1; + } + } + } + Ok(()) + } + } +} + +struct Consumer { + rx: Arc>>, +} + +impl ProcessHandler for Consumer { + fn call(&self, cancel: CancellationToken) -> impl Future + Send { + let rx = self.rx.clone(); + async move { + let mut rx = rx.lock().await; + loop { + tokio::select! { + _ = cancel.cancelled() => { + // Drain remaining items + let mut drained = 0; + while rx.try_recv().is_ok() { + drained += 1; + } + println!("[consumer] shutting down, drained {drained} remaining items"); + break; + } + msg = rx.recv() => { + match msg { + Some(n) => { + // Simulate processing + tokio::time::sleep(Duration::from_millis(50)).await; + println!("[consumer] processed {n}"); + } + None => { + println!("[consumer] channel closed"); + break; + } + } + } + } + } + Ok(()) + } + } +} diff --git a/examples/scheduled/Cargo.toml b/examples/scheduled/Cargo.toml new file mode 100644 index 0000000..b579fd5 --- /dev/null +++ b/examples/scheduled/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "scheduled" +version = "0.1.0" +edition = "2024" + +[dependencies] +noprocess.workspace = true +tokio.workspace = true +tokio-util.workspace = true +tracing.workspace = true +tracing-subscriber = "0.3" diff --git a/examples/scheduled/src/main.rs b/examples/scheduled/src/main.rs new file mode 100644 index 0000000..ed20464 --- /dev/null +++ b/examples/scheduled/src/main.rs @@ -0,0 +1,67 @@ +//! Scheduled batch job example. +//! +//! Demonstrates running a data sync job on a schedule. The job runs to completion, +//! then auto-restarts after a delay. Use `restart_on_success` for this pattern. + +use std::{future::Future, time::Duration}; + +use noprocess::{Process, ProcessHandler, ProcessManager, ProcessResult, ShutdownConfig}; +use tokio_util::sync::CancellationToken; + +#[tokio::main] +async fn main() -> Result<(), noprocess::Error> { + tracing_subscriber::fmt::init(); + + let manager = ProcessManager::new(); + + // A batch job that runs every 2 seconds + let sync_job = Process::builder(DataSyncJob) + .handle_id("data-sync") + .shutdown_config(ShutdownConfig { + graceful_timeout: Duration::from_secs(10), + restart_on_success: true, // Auto-restart after completion + restart_delay: Duration::from_secs(2), // Wait 2s between runs + }) + .on_error(|e| eprintln!("Sync job failed: {e}")) + .build(); + + let job_id = manager.add_process(sync_job).await; + manager.start_process(&job_id).await?; + + println!("Batch job running (Ctrl+C to stop)...\n"); + + // Let it run a few cycles + tokio::time::sleep(Duration::from_secs(10)).await; + + // Graceful shutdown - will wait for current batch to finish + println!("\n--- Requesting graceful shutdown ---"); + manager.stop_process(&job_id).await?; + + println!("Job stopped cleanly"); + Ok(()) +} + +struct DataSyncJob; + +impl ProcessHandler for DataSyncJob { + fn call(&self, cancel: CancellationToken) -> impl Future + Send { + async move { + println!("[sync] Starting batch..."); + + // Simulate a batch job with multiple steps + for step in 1..=3 { + // Check for cancellation between steps + if cancel.is_cancelled() { + println!("[sync] Cancelled at step {step}, cleaning up..."); + return Ok(()); + } + + println!("[sync] Step {step}/3: processing..."); + tokio::time::sleep(Duration::from_millis(300)).await; + } + + println!("[sync] Batch complete!"); + Ok(()) // Will auto-restart due to restart_on_success + } + } +} diff --git a/examples/worker/Cargo.toml b/examples/worker/Cargo.toml new file mode 100644 index 0000000..f9583f0 --- /dev/null +++ b/examples/worker/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "worker" +version = "0.1.0" +edition = "2024" + +[dependencies] +noprocess.workspace = true +tokio.workspace = true +tokio-util.workspace = true +tracing.workspace = true +tracing-subscriber = "0.3" diff --git a/examples/worker/src/main.rs b/examples/worker/src/main.rs new file mode 100644 index 0000000..ae13c8f --- /dev/null +++ b/examples/worker/src/main.rs @@ -0,0 +1,87 @@ +use std::{future::Future, time::Duration}; + +use noprocess::{HandleID, Process, ProcessHandler, ProcessManager, ProcessResult}; +use tokio_util::sync::CancellationToken; + +#[tokio::main] +async fn main() -> Result<(), noprocess::Error> { + tracing_subscriber::fmt::init(); + + let manager = ProcessManager::new(); + + // Start with 2 workers + println!("--- Starting 2 workers ---"); + let mut workers = vec![]; + for i in 0..2 { + let id = spawn_worker(&manager, i).await; + workers.push(id); + } + + tokio::time::sleep(Duration::from_secs(2)).await; + + // Scale up to 4 workers + println!("\n--- Scaling up to 4 workers ---"); + for i in 2..4 { + let id = spawn_worker(&manager, i).await; + workers.push(id); + } + + tokio::time::sleep(Duration::from_secs(2)).await; + + // Scale down to 2 workers + println!("\n--- Scaling down to 2 workers ---"); + for id in workers.drain(2..) { + manager.stop_process(&id).await?; + } + + tokio::time::sleep(Duration::from_secs(2)).await; + + // Stop all remaining workers + println!("\n--- Stopping all workers ---"); + for id in workers { + manager.stop_process(&id).await?; + } + + println!("All workers stopped"); + Ok(()) +} + +async fn spawn_worker(manager: &ProcessManager, id: usize) -> HandleID { + let process = Process::builder(Worker { id }) + .handle_id(format!("worker-{id}")) + .build(); + + let handle_id = manager.add_process(process).await; + manager.start_process(&handle_id).await.unwrap(); + handle_id +} + +struct Worker { + id: usize, +} + +impl ProcessHandler for Worker { + fn call(&self, cancel: CancellationToken) -> impl Future + Send { + let id = self.id; + async move { + println!("[worker-{id}] started"); + let mut interval = tokio::time::interval(Duration::from_millis(500)); + let mut jobs = 0; + + loop { + tokio::select! { + _ = cancel.cancelled() => { + println!("[worker-{id}] stopping after {jobs} jobs"); + break; + } + _ = interval.tick() => { + // Simulate doing work + jobs += 1; + println!("[worker-{id}] completed job #{jobs}"); + } + } + } + Ok(()) + } + } +}