feat: add tests

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
2026-01-07 15:14:36 +01:00
parent 1fc3127b84
commit 1de13de61b
7 changed files with 1285 additions and 45 deletions

155
Cargo.lock generated
View File

@@ -2,6 +2,15 @@
# It is not intended for manual editing.
version = 4
[[package]]
name = "aho-corasick"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301"
dependencies = [
"memchr",
]
[[package]]
name = "anyhow"
version = "1.0.71"
@@ -83,6 +92,21 @@ version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4"
[[package]]
name = "matchers"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9"
dependencies = [
"regex-automata",
]
[[package]]
name = "memchr"
version = "2.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273"
[[package]]
name = "mio"
version = "1.1.1"
@@ -100,9 +124,14 @@ version = "0.1.0"
dependencies = [
"anyhow",
"rand",
"test-case",
"thiserror",
"tokio",
"tokio-test",
"tokio-util",
"tracing",
"tracing-subscriber",
"tracing-test",
]
[[package]]
@@ -215,6 +244,23 @@ dependencies = [
"bitflags",
]
[[package]]
name = "regex-automata"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58"
[[package]]
name = "scopeguard"
version = "1.1.0"
@@ -278,6 +324,59 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "test-case"
version = "3.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb2550dd13afcd286853192af8601920d959b14c401fcece38071d53bf0768a8"
dependencies = [
"test-case-macros",
]
[[package]]
name = "test-case-core"
version = "3.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adcb7fd841cd518e279be3d5a3eb0636409487998a4aff22f3de87b81e88384f"
dependencies = [
"cfg-if",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "test-case-macros"
version = "3.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c89e72a01ed4c579669add59014b9a524d609c0c88c6a585ce37485879f6ffb"
dependencies = [
"proc-macro2",
"quote",
"syn",
"test-case-core",
]
[[package]]
name = "thiserror"
version = "2.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "2.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "thread_local"
version = "1.1.9"
@@ -315,6 +414,28 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-stream"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-test"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f6d24790a10a7af737693a3e8f1d03faef7e6ca0cc99aae5066f533766de545"
dependencies = [
"futures-core",
"tokio",
"tokio-stream",
]
[[package]]
name = "tokio-util"
version = "0.7.18"
@@ -330,11 +451,10 @@ dependencies = [
[[package]]
name = "tracing"
version = "0.1.37"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100"
dependencies = [
"cfg-if",
"log",
"pin-project-lite",
"tracing-attributes",
@@ -343,9 +463,9 @@ dependencies = [
[[package]]
name = "tracing-attributes"
version = "0.1.24"
version = "0.1.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74"
checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da"
dependencies = [
"proc-macro2",
"quote",
@@ -379,14 +499,39 @@ version = "0.3.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex-automata",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
]
[[package]]
name = "tracing-test"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68"
dependencies = [
"tracing-core",
"tracing-subscriber",
"tracing-test-macro",
]
[[package]]
name = "tracing-test-macro"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568"
dependencies = [
"quote",
"syn",
]
[[package]]
name = "unicode-ident"
version = "1.0.9"

View File

@@ -4,8 +4,15 @@ version = "0.1.0"
edition = "2024"
[dependencies]
anyhow.workspace = true
rand.workspace = true
thiserror = "2"
tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true
[dev-dependencies]
anyhow.workspace = true
test-case = "3"
tokio-test = "0.4"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-test = { version = "0.2", features = ["no-env-filter"] }

View File

@@ -0,0 +1,53 @@
use std::sync::Arc;
use crate::HandleID;
/// A boxed error type for process handler return values.
/// This allows users to return any error type from their process handlers.
pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// Result type for process handler functions.
pub type ProcessResult = Result<(), BoxError>;
/// Error type for noprocess library operations.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// The process runner has terminated and is no longer accepting commands.
#[error("runner terminated")]
RunnerTerminated,
}
/// Information about a process execution failure.
/// Used in the on_error callback to provide details about what went wrong.
#[derive(Debug, Clone)]
pub struct ProcessFailure {
pub handle_id: HandleID,
pub kind: ProcessFailureKind,
}
/// The kind of process execution failure.
#[derive(Debug, Clone)]
pub enum ProcessFailureKind {
/// The process returned an error.
Returned(String),
/// The process panicked.
Panicked(String),
}
impl std::fmt::Display for ProcessFailure {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.kind {
ProcessFailureKind::Returned(e) => {
write!(f, "process {} returned error: {}", self.handle_id, e)
}
ProcessFailureKind::Panicked(e) => {
write!(f, "process {} panicked: {}", self.handle_id, e)
}
}
}
}
impl std::error::Error for ProcessFailure {}
/// Type alias for error handler callback.
pub type ErrorHandler = Arc<dyn Fn(ProcessFailure) + Send + Sync + 'static>;

View File

@@ -1,15 +1,20 @@
mod error;
mod handle_id;
mod process;
mod process_handler;
mod shutdown_config;
#[cfg(test)]
mod tests;
use std::{collections::BTreeMap, sync::Arc};
use tokio::sync::Mutex;
use tracing::{debug, info, warn};
pub use error::{BoxError, Error, ProcessFailure, ProcessFailureKind, ProcessResult};
pub use handle_id::HandleID;
pub use process::Process;
pub use process::{Process, ProcessBuilder};
pub use process_handler::ProcessHandler;
pub use shutdown_config::ShutdownConfig;
@@ -31,25 +36,26 @@ impl ProcessManager {
}
}
pub async fn add_process(&self, process: Process) {
pub async fn add_process(&self, process: Process) -> HandleID {
let handle_id = process.handle_id().clone();
info!(%handle_id, "adding process to manager");
self.inner.lock().await.add_process(process);
handle_id
}
pub async fn start_process(&self, handle_id: &HandleID) -> anyhow::Result<Option<()>> {
pub async fn start_process(&self, handle_id: &HandleID) -> Result<Option<()>, Error> {
self.inner.lock().await.start_process(handle_id).await
}
pub async fn restart_process(&self, handle_id: &HandleID) -> anyhow::Result<Option<()>> {
pub async fn restart_process(&self, handle_id: &HandleID) -> Result<Option<()>, Error> {
self.inner.lock().await.restart_process(handle_id).await
}
pub async fn stop_process(&self, handle_id: &HandleID) -> anyhow::Result<Option<()>> {
pub async fn stop_process(&self, handle_id: &HandleID) -> Result<Option<()>, Error> {
self.inner.lock().await.stop_process(handle_id).await
}
pub async fn kill_process(&self, handle_id: &HandleID) -> anyhow::Result<Option<()>> {
pub async fn kill_process(&self, handle_id: &HandleID) -> Result<Option<()>, Error> {
self.inner.lock().await.kill_process(handle_id).await
}
}
@@ -64,7 +70,7 @@ impl Inner {
self.handles.insert(process.handle_id().clone(), process);
}
async fn start_process(&mut self, handle_id: &HandleID) -> anyhow::Result<Option<()>> {
async fn start_process(&mut self, handle_id: &HandleID) -> Result<Option<()>, Error> {
let Some(handle) = self.handles.get_mut(handle_id) else {
warn!(%handle_id, "process not found");
return Ok(None);
@@ -75,7 +81,7 @@ impl Inner {
Ok(None)
}
async fn restart_process(&mut self, handle_id: &HandleID) -> anyhow::Result<Option<()>> {
async fn restart_process(&mut self, handle_id: &HandleID) -> Result<Option<()>, Error> {
let Some(handle) = self.handles.get_mut(handle_id) else {
warn!(%handle_id, "process not found");
return Ok(None);
@@ -86,7 +92,7 @@ impl Inner {
Ok(None)
}
async fn stop_process(&mut self, handle_id: &HandleID) -> anyhow::Result<Option<()>> {
async fn stop_process(&mut self, handle_id: &HandleID) -> Result<Option<()>, Error> {
let Some(handle) = self.handles.get_mut(handle_id) else {
warn!(%handle_id, "process not found");
return Ok(None);
@@ -97,7 +103,7 @@ impl Inner {
Ok(None)
}
async fn kill_process(&mut self, handle_id: &HandleID) -> anyhow::Result<Option<()>> {
async fn kill_process(&mut self, handle_id: &HandleID) -> Result<Option<()>, Error> {
let Some(handle) = self.handles.get_mut(handle_id) else {
warn!(%handle_id, "process not found");
return Ok(None);

View File

@@ -8,6 +8,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, debug_span, info, trace, warn};
use crate::{
error::{Error, ErrorHandler, ProcessFailure, ProcessFailureKind, ProcessResult},
handle_id::{generate_short_id, HandleID},
process_handler::{IntoProcess, ProcessHandler, SharedProcess},
shutdown_config::ShutdownConfig,
@@ -38,13 +39,23 @@ pub struct Process {
impl Process {
pub fn new(process: impl ProcessHandler) -> Self {
Self::new_with_handle(HandleID::new_random(), process)
ProcessBuilder::new(process).build()
}
pub fn new_with_handle(handle_id: impl Into<HandleID>, process: impl ProcessHandler) -> Self {
let handle_id = handle_id.into();
let process = process.into_process();
let shutdown_config = ShutdownConfig::default();
ProcessBuilder::new(process).handle_id(handle_id).build()
}
pub fn builder(process: impl ProcessHandler) -> ProcessBuilder {
ProcessBuilder::new(process)
}
fn create(
handle_id: HandleID,
process: SharedProcess,
shutdown_config: ShutdownConfig,
error_handler: Option<ErrorHandler>,
) -> Self {
let (command_tx, command_rx) = mpsc::channel(1);
let (event_tx, event_rx) = watch::channel(ProcessState::Pending);
@@ -52,8 +63,15 @@ impl Process {
let span = debug_span!("process_loop", handle_id = %handle_id);
let handle = tokio::spawn(
Self::run_loop(handle_id.clone(), process, command_rx, shutdown_config, event_tx)
.instrument(span),
Self::run_loop(
handle_id.clone(),
process,
command_rx,
shutdown_config,
event_tx,
error_handler,
)
.instrument(span),
);
Self {
@@ -63,13 +81,59 @@ impl Process {
task: handle,
}
}
}
pub struct ProcessBuilder {
process: SharedProcess,
handle_id: Option<HandleID>,
shutdown_config: Option<ShutdownConfig>,
error_handler: Option<ErrorHandler>,
}
impl ProcessBuilder {
pub fn new(process: impl ProcessHandler) -> Self {
Self {
process: process.into_process(),
handle_id: None,
shutdown_config: None,
error_handler: None,
}
}
pub fn handle_id(mut self, handle_id: impl Into<HandleID>) -> Self {
self.handle_id = Some(handle_id.into());
self
}
pub fn shutdown_config(mut self, config: ShutdownConfig) -> Self {
self.shutdown_config = Some(config);
self
}
/// Set an error handler that will be called when the process errors or panics
pub fn on_error<F>(mut self, handler: F) -> Self
where
F: Fn(ProcessFailure) + Send + Sync + 'static,
{
self.error_handler = Some(std::sync::Arc::new(handler));
self
}
pub fn build(self) -> Process {
let handle_id = self.handle_id.unwrap_or_else(HandleID::new_random);
let shutdown_config = self.shutdown_config.unwrap_or_default();
Process::create(handle_id, self.process, shutdown_config, self.error_handler)
}
}
impl Process {
async fn run_loop(
handle_id: HandleID,
process: SharedProcess,
mut commands: mpsc::Receiver<ProcessCommand>,
shutdown_config: ShutdownConfig,
state: watch::Sender<ProcessState>,
error_handler: Option<ErrorHandler>,
) {
let mut current_task: Option<RunningTask> = None;
@@ -99,6 +163,7 @@ impl Process {
&handle_id,
&state,
&shutdown_config,
&error_handler,
&mut current_task
).await;
}
@@ -136,43 +201,39 @@ impl Process {
trace!("exiting run loop");
}
pub async fn start(&mut self) -> anyhow::Result<()> {
pub async fn start(&mut self) -> Result<(), Error> {
info!(handle_id = %self.handle_id, "starting process");
self.command_tx
.send(ProcessCommand::Start)
.await
.map_err(|_| anyhow::anyhow!("runner terminated"))
.map_err(|_| Error::RunnerTerminated)
}
pub async fn stop(&mut self) -> anyhow::Result<()> {
pub async fn stop(&mut self) -> Result<(), Error> {
info!(handle_id = %self.handle_id, "stopping process");
let (resp_tx, resp_rx) = oneshot::channel();
self.command_tx
.send(ProcessCommand::Stop { response: resp_tx })
.await
.map_err(|_| anyhow::anyhow!("runner terminated"))?;
.map_err(|_| Error::RunnerTerminated)?;
resp_rx
.await
.map_err(|_| anyhow::anyhow!("runner terminated"))
resp_rx.await.map_err(|_| Error::RunnerTerminated)
}
pub async fn kill(&mut self) -> anyhow::Result<()> {
pub async fn kill(&mut self) -> Result<(), Error> {
info!(handle_id = %self.handle_id, "killing process");
let (resp_tx, resp_rx) = oneshot::channel();
self.command_tx
.send(ProcessCommand::Kill { response: resp_tx })
.await
.map_err(|_| anyhow::anyhow!("runner terminated"))?;
.map_err(|_| Error::RunnerTerminated)?;
resp_rx
.await
.map_err(|_| anyhow::anyhow!("runner terminated"))
resp_rx.await.map_err(|_| Error::RunnerTerminated)
}
pub async fn restart(&mut self) -> anyhow::Result<()> {
pub async fn restart(&mut self) -> Result<(), Error> {
info!(handle_id = %self.handle_id, "restarting process");
// Stop without the info log
@@ -180,16 +241,14 @@ impl Process {
self.command_tx
.send(ProcessCommand::Stop { response: resp_tx })
.await
.map_err(|_| anyhow::anyhow!("runner terminated"))?;
resp_rx
.await
.map_err(|_| anyhow::anyhow!("runner terminated"))?;
.map_err(|_| Error::RunnerTerminated)?;
resp_rx.await.map_err(|_| Error::RunnerTerminated)?;
// Start without the info log
self.command_tx
.send(ProcessCommand::Start)
.await
.map_err(|_| anyhow::anyhow!("runner terminated"))
.map_err(|_| Error::RunnerTerminated)
}
pub(crate) async fn wait_for(&self, desired: ProcessState) {
@@ -239,11 +298,12 @@ impl Process {
}
async fn handle_task_completion(
result: Result<anyhow::Result<()>, tokio::task::JoinError>,
result: Result<ProcessResult, tokio::task::JoinError>,
process: SharedProcess,
handle_id: &HandleID,
state: &watch::Sender<ProcessState>,
config: &ShutdownConfig,
error_handler: &Option<ErrorHandler>,
current_task: &mut Option<RunningTask>,
) {
match result {
@@ -264,6 +324,12 @@ impl Process {
}
Ok(Err(e)) => {
warn!(error = %e, "process returned error");
if let Some(handler) = error_handler {
handler(ProcessFailure {
handle_id: handle_id.clone(),
kind: ProcessFailureKind::Returned(e.to_string()),
});
}
let _ = state.send(ProcessState::Errored);
}
Err(e) if e.is_cancelled() => {
@@ -271,6 +337,12 @@ impl Process {
}
Err(e) => {
warn!(error = %e, "process panicked");
if let Some(handler) = error_handler {
handler(ProcessFailure {
handle_id: handle_id.clone(),
kind: ProcessFailureKind::Panicked(e.to_string()),
});
}
let _ = state.send(ProcessState::Errored);
}
}
@@ -285,7 +357,7 @@ impl Drop for Process {
}
struct RunningTask {
handle: JoinHandle<anyhow::Result<()>>,
handle: JoinHandle<ProcessResult>,
cancellation: CancellationToken,
invocation_id: InvocationID,
}

View File

@@ -2,26 +2,28 @@ use std::{future::Future, pin::Pin, sync::Arc};
use tokio_util::sync::CancellationToken;
use crate::error::ProcessResult;
#[allow(async_fn_in_trait)]
pub trait ProcessHandler: Send + Sync + 'static {
fn call(
&self,
cancellation: CancellationToken,
) -> impl Future<Output = anyhow::Result<()>> + Send;
) -> impl Future<Output = ProcessResult> + Send;
}
pub(crate) trait AsyncProcess: Send + Sync {
fn call_async(
&self,
cancellation: CancellationToken,
) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
) -> Pin<Box<dyn Future<Output = ProcessResult> + Send + '_>>;
}
impl<E: ProcessHandler> AsyncProcess for E {
fn call_async(
&self,
cancellation: CancellationToken,
) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
) -> Pin<Box<dyn Future<Output = ProcessResult> + Send + '_>> {
Box::pin(self.call(cancellation))
}
}

View File

@@ -0,0 +1,955 @@
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use tokio::sync::Barrier;
use tokio_util::sync::CancellationToken;
use crate::{process::ProcessState, BoxError, Process, ProcessHandler, ProcessManager, ProcessResult};
// ============================================================================
// Test Helpers
// ============================================================================
/// A process that runs until cancelled
struct LongRunningProcess {
started: Arc<AtomicUsize>,
stopped: Arc<AtomicUsize>,
}
impl LongRunningProcess {
fn new() -> (Self, Arc<AtomicUsize>, Arc<AtomicUsize>) {
let started = Arc::new(AtomicUsize::new(0));
let stopped = Arc::new(AtomicUsize::new(0));
(
Self {
started: started.clone(),
stopped: stopped.clone(),
},
started,
stopped,
)
}
}
impl ProcessHandler for LongRunningProcess {
fn call(
&self,
cancellation: CancellationToken,
) -> impl std::future::Future<Output = ProcessResult> + Send {
let started = self.started.clone();
let stopped = self.stopped.clone();
async move {
started.fetch_add(1, Ordering::SeqCst);
cancellation.cancelled().await;
stopped.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
}
/// A process that returns an error
struct ErroringProcess;
/// Simple error type for testing
#[derive(Debug)]
struct TestError(String);
impl std::fmt::Display for TestError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl std::error::Error for TestError {}
impl ProcessHandler for ErroringProcess {
fn call(
&self,
_cancellation: CancellationToken,
) -> impl std::future::Future<Output = ProcessResult> + Send {
async move { Err(Box::new(TestError("intentional error".to_string())) as BoxError) }
}
}
/// A process that panics
struct PanickingProcess;
impl ProcessHandler for PanickingProcess {
fn call(
&self,
_cancellation: CancellationToken,
) -> impl std::future::Future<Output = ProcessResult> + Send {
async move { panic!("intentional panic") }
}
}
/// A process that ignores cancellation (stubborn)
struct StubbornProcess {
started: Arc<AtomicUsize>,
}
impl StubbornProcess {
fn new() -> (Self, Arc<AtomicUsize>) {
let started = Arc::new(AtomicUsize::new(0));
(
Self {
started: started.clone(),
},
started,
)
}
}
impl ProcessHandler for StubbornProcess {
fn call(
&self,
_cancellation: CancellationToken,
) -> impl std::future::Future<Output = ProcessResult> + Send {
let started = self.started.clone();
async move {
started.fetch_add(1, Ordering::SeqCst);
// Never exits, ignores cancellation
std::future::pending::<()>().await;
Ok(())
}
}
}
/// A process that waits for a barrier before proceeding
struct BarrierProcess {
barrier: Arc<Barrier>,
completed: Arc<AtomicUsize>,
}
impl BarrierProcess {
fn new(barrier: Arc<Barrier>) -> (Self, Arc<AtomicUsize>) {
let completed = Arc::new(AtomicUsize::new(0));
(
Self {
barrier,
completed: completed.clone(),
},
completed,
)
}
}
impl ProcessHandler for BarrierProcess {
fn call(
&self,
cancellation: CancellationToken,
) -> impl std::future::Future<Output = ProcessResult> + Send {
let barrier = self.barrier.clone();
let completed = self.completed.clone();
async move {
barrier.wait().await;
tokio::select! {
_ = cancellation.cancelled() => {}
_ = tokio::time::sleep(Duration::from_secs(60)) => {}
}
completed.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
}
// ============================================================================
// Basic Operation Tests
// ============================================================================
mod basic_operations {
use super::*;
#[tokio::test]
async fn test_start_process() {
let (process, started, _stopped) = LongRunningProcess::new();
let mut proc = Process::new(process);
assert_eq!(started.load(Ordering::SeqCst), 0);
proc.start().await.unwrap();
proc.wait_for(ProcessState::Running).await;
assert_eq!(started.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_stop_process() {
let (process, started, stopped) = LongRunningProcess::new();
let mut proc = Process::new(process);
proc.start().await.unwrap();
proc.wait_for(ProcessState::Running).await;
assert_eq!(started.load(Ordering::SeqCst), 1);
assert_eq!(stopped.load(Ordering::SeqCst), 0);
proc.stop().await.unwrap();
proc.wait_for(ProcessState::Stopped).await;
assert_eq!(stopped.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_kill_process() {
let (process, started, _stopped) = LongRunningProcess::new();
let mut proc = Process::new(process);
proc.start().await.unwrap();
proc.wait_for(ProcessState::Running).await;
assert_eq!(started.load(Ordering::SeqCst), 1);
proc.kill().await.unwrap();
proc.wait_for(ProcessState::Stopped).await;
}
#[tokio::test]
async fn test_restart_process() {
let (process, started, stopped) = LongRunningProcess::new();
let mut proc = Process::new(process);
proc.start().await.unwrap();
proc.wait_for(ProcessState::Running).await;
assert_eq!(started.load(Ordering::SeqCst), 1);
proc.restart().await.unwrap();
proc.wait_for(ProcessState::Running).await;
// Give a moment for the new process to start
tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(started.load(Ordering::SeqCst), 2);
assert_eq!(stopped.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_process_with_custom_handle_id() {
let (process, _started, _stopped) = LongRunningProcess::new();
let proc = Process::new_with_handle("my-custom-id", process);
assert_eq!(proc.handle_id().to_string(), "my-custom-id");
}
#[tokio::test]
async fn test_process_builder() {
let (process, started, stopped) = LongRunningProcess::new();
let mut proc = Process::builder(process)
.handle_id("builder-test")
.build();
assert_eq!(proc.handle_id().to_string(), "builder-test");
proc.start().await.unwrap();
proc.wait_for(ProcessState::Running).await;
assert_eq!(started.load(Ordering::SeqCst), 1);
proc.stop().await.unwrap();
proc.wait_for(ProcessState::Stopped).await;
assert_eq!(stopped.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_process_builder_with_shutdown_config() {
use crate::ShutdownConfig;
let (process, started) = StubbornProcess::new();
let mut proc = Process::builder(process)
.shutdown_config(ShutdownConfig {
graceful_timeout: Duration::from_millis(100),
restart_on_success: false,
restart_delay: Duration::ZERO,
})
.build();
proc.start().await.unwrap();
proc.wait_for(ProcessState::Running).await;
assert_eq!(started.load(Ordering::SeqCst), 1);
// Stop should timeout quickly (100ms) and force kill
let start_time = std::time::Instant::now();
proc.stop().await.unwrap();
proc.wait_for(ProcessState::Stopped).await;
let elapsed = start_time.elapsed();
// Should be around 100ms, not the default 5s
assert!(elapsed < Duration::from_secs(1));
}
#[tokio::test]
async fn test_add_process_returns_handle_id() {
let manager = ProcessManager::new();
let (process, _started, _stopped) = LongRunningProcess::new();
let handle_id = manager.add_process(Process::new(process)).await;
// Can immediately use the returned handle_id
manager.start_process(&handle_id).await.unwrap();
}
}
// ============================================================================
// Edge Case Tests
// ============================================================================
mod edge_cases {
use super::*;
#[tokio::test]
async fn test_start_already_running_process() {
let (process, started, _stopped) = LongRunningProcess::new();
let mut proc = Process::new(process);
proc.start().await.unwrap();
proc.wait_for(ProcessState::Running).await;
// Second start should be ignored
proc.start().await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(started.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_stop_not_running_process() {
let (process, _started, _stopped) = LongRunningProcess::new();
let mut proc = Process::new(process);
// Stop without starting should succeed
proc.stop().await.unwrap();
}
#[tokio::test]
async fn test_kill_not_running_process() {
let (process, _started, _stopped) = LongRunningProcess::new();
let mut proc = Process::new(process);
// Kill without starting should succeed
proc.kill().await.unwrap();
}
#[tokio::test]
async fn test_stop_already_stopped_process() {
let (process, _started, stopped) = LongRunningProcess::new();
let mut proc = Process::new(process);
proc.start().await.unwrap();
proc.wait_for(ProcessState::Running).await;
proc.stop().await.unwrap();
proc.wait_for(ProcessState::Stopped).await;
// Second stop should succeed
proc.stop().await.unwrap();
assert_eq!(stopped.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_restart_not_running_process() {
let (process, started, _stopped) = LongRunningProcess::new();
let mut proc = Process::new(process);
// Restart without starting should work (stop is no-op, start succeeds)
proc.restart().await.unwrap();
proc.wait_for(ProcessState::Running).await;
assert_eq!(started.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_process_that_errors() {
let mut proc = Process::new(ErroringProcess);
proc.start().await.unwrap();
proc.wait_for(ProcessState::Errored).await;
}
#[tokio::test]
async fn test_process_that_panics() {
let mut proc = Process::new(PanickingProcess);
proc.start().await.unwrap();
proc.wait_for(ProcessState::Errored).await;
}
#[tokio::test]
async fn test_start_after_error() {
let (process, started, _stopped) = LongRunningProcess::new();
let mut proc = Process::new(process);
// We can't easily make it error then recover, so test start after error state
// by using the process manager approach instead
proc.start().await.unwrap();
proc.wait_for(ProcessState::Running).await;
assert_eq!(started.load(Ordering::SeqCst), 1);
}
}
// ============================================================================
// Graceful Shutdown Tests
// ============================================================================
mod graceful_shutdown {
use super::*;
#[tokio::test]
async fn test_stubborn_process_is_killed_after_timeout() {
let (process, started) = StubbornProcess::new();
let mut proc = Process::new(process);
proc.start().await.unwrap();
proc.wait_for(ProcessState::Running).await;
assert_eq!(started.load(Ordering::SeqCst), 1);
// Stop will timeout and force kill
let start = std::time::Instant::now();
proc.stop().await.unwrap();
proc.wait_for(ProcessState::Stopped).await;
// Should have waited for graceful timeout (default 5s)
// but we can't easily test that without changing the config
// Just verify it completed
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_secs(4)); // Allow some tolerance
}
#[tokio::test]
async fn test_kill_is_immediate_for_stubborn_process() {
let (process, started) = StubbornProcess::new();
let mut proc = Process::new(process);
proc.start().await.unwrap();
proc.wait_for(ProcessState::Running).await;
assert_eq!(started.load(Ordering::SeqCst), 1);
// Kill should be immediate
let start = std::time::Instant::now();
proc.kill().await.unwrap();
proc.wait_for(ProcessState::Stopped).await;
let elapsed = start.elapsed();
assert!(elapsed < Duration::from_secs(1));
}
}
// ============================================================================
// Process Manager Tests
// ============================================================================
mod process_manager {
use super::*;
#[tokio::test]
async fn test_manager_add_and_start_process() {
let manager = ProcessManager::new();
let (process, started, _stopped) = LongRunningProcess::new();
let proc = Process::new(process);
let handle_id = proc.handle_id().clone();
manager.add_process(proc).await;
manager.start_process(&handle_id).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(started.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_manager_stop_process() {
let manager = ProcessManager::new();
let (process, _started, stopped) = LongRunningProcess::new();
let proc = Process::new(process);
let handle_id = proc.handle_id().clone();
manager.add_process(proc).await;
manager.start_process(&handle_id).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
manager.stop_process(&handle_id).await.unwrap();
assert_eq!(stopped.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_manager_multiple_processes() {
let manager = ProcessManager::new();
let (process1, started1, _) = LongRunningProcess::new();
let (process2, started2, _) = LongRunningProcess::new();
let (process3, started3, _) = LongRunningProcess::new();
let proc1 = Process::new(process1);
let proc2 = Process::new(process2);
let proc3 = Process::new(process3);
let id1 = proc1.handle_id().clone();
let id2 = proc2.handle_id().clone();
let id3 = proc3.handle_id().clone();
manager.add_process(proc1).await;
manager.add_process(proc2).await;
manager.add_process(proc3).await;
manager.start_process(&id1).await.unwrap();
manager.start_process(&id2).await.unwrap();
manager.start_process(&id3).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(started1.load(Ordering::SeqCst), 1);
assert_eq!(started2.load(Ordering::SeqCst), 1);
assert_eq!(started3.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_manager_process_not_found() {
let manager = ProcessManager::new();
let nonexistent_id = "nonexistent".into();
let result = manager.start_process(&nonexistent_id).await.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn test_manager_restart_process() {
let manager = ProcessManager::new();
let (process, started, stopped) = LongRunningProcess::new();
let proc = Process::new(process);
let handle_id = proc.handle_id().clone();
manager.add_process(proc).await;
manager.start_process(&handle_id).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
manager.restart_process(&handle_id).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(started.load(Ordering::SeqCst), 2);
assert_eq!(stopped.load(Ordering::SeqCst), 1);
}
}
// ============================================================================
// Concurrency Tests
// ============================================================================
mod concurrency {
use super::*;
#[tokio::test]
async fn test_sequential_start_commands() {
let (process, started, _stopped) = LongRunningProcess::new();
let mut proc = Process::new(process);
// Send multiple start commands sequentially
proc.start().await.unwrap();
proc.start().await.unwrap();
proc.start().await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
// Only one should have started
assert_eq!(started.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_sequential_stop_commands() {
let (process, _started, stopped) = LongRunningProcess::new();
let mut proc = Process::new(process);
proc.start().await.unwrap();
proc.wait_for(ProcessState::Running).await;
// Send multiple stop commands sequentially
proc.stop().await.unwrap();
proc.stop().await.unwrap();
proc.stop().await.unwrap();
// Only one should have stopped
assert_eq!(stopped.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_rapid_start_stop_cycles() {
let (process, started, stopped) = LongRunningProcess::new();
let mut proc = Process::new(process);
for _ in 0..5 {
proc.start().await.unwrap();
proc.wait_for(ProcessState::Running).await;
proc.stop().await.unwrap();
proc.wait_for(ProcessState::Stopped).await;
}
assert_eq!(started.load(Ordering::SeqCst), 5);
assert_eq!(stopped.load(Ordering::SeqCst), 5);
}
#[tokio::test]
async fn test_manager_concurrent_operations_different_processes() {
let manager = Arc::new(ProcessManager::new());
let (process1, started1, _) = LongRunningProcess::new();
let (process2, started2, _) = LongRunningProcess::new();
let proc1 = Process::new(process1);
let proc2 = Process::new(process2);
let id1 = proc1.handle_id().clone();
let id2 = proc2.handle_id().clone();
manager.add_process(proc1).await;
manager.add_process(proc2).await;
// Start both processes concurrently
let manager1 = manager.clone();
let manager2 = manager.clone();
let id1_clone = id1.clone();
let id2_clone = id2.clone();
let (r1, r2) = tokio::join!(
async move { manager1.start_process(&id1_clone).await },
async move { manager2.start_process(&id2_clone).await },
);
r1.unwrap();
r2.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(started1.load(Ordering::SeqCst), 1);
assert_eq!(started2.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_barrier_synchronized_processes() {
let barrier = Arc::new(Barrier::new(3));
let (process1, completed1) = BarrierProcess::new(barrier.clone());
let (process2, completed2) = BarrierProcess::new(barrier.clone());
let mut proc1 = Process::new(process1);
let mut proc2 = Process::new(process2);
proc1.start().await.unwrap();
proc2.start().await.unwrap();
// Wait at the barrier (the third waiter)
barrier.wait().await;
// Both processes should now be past the barrier
tokio::time::sleep(Duration::from_millis(50)).await;
proc1.stop().await.unwrap();
proc2.stop().await.unwrap();
assert_eq!(completed1.load(Ordering::SeqCst), 1);
assert_eq!(completed2.load(Ordering::SeqCst), 1);
}
}
// ============================================================================
// Parametric Tests
// ============================================================================
mod parametric {
use super::*;
use test_case::test_case;
#[test_case(1; "single start")]
#[test_case(5; "five starts")]
#[test_case(10; "ten starts")]
#[tokio::test]
async fn test_multiple_start_attempts(attempts: usize) {
let (process, started, _stopped) = LongRunningProcess::new();
let mut proc = Process::new(process);
for _ in 0..attempts {
proc.start().await.unwrap();
}
proc.wait_for(ProcessState::Running).await;
tokio::time::sleep(Duration::from_millis(50)).await;
// Only one should have actually started
assert_eq!(started.load(Ordering::SeqCst), 1);
}
#[test_case(1; "single cycle")]
#[test_case(3; "three cycles")]
#[test_case(5; "five cycles")]
#[tokio::test]
async fn test_start_stop_cycles(cycles: usize) {
let (process, started, stopped) = LongRunningProcess::new();
let mut proc = Process::new(process);
for _ in 0..cycles {
proc.start().await.unwrap();
proc.wait_for(ProcessState::Running).await;
proc.stop().await.unwrap();
proc.wait_for(ProcessState::Stopped).await;
}
assert_eq!(started.load(Ordering::SeqCst), cycles);
assert_eq!(stopped.load(Ordering::SeqCst), cycles);
}
#[test_case(Duration::from_millis(10); "10ms delay")]
#[test_case(Duration::from_millis(50); "50ms delay")]
#[test_case(Duration::from_millis(100); "100ms delay")]
#[tokio::test]
async fn test_delayed_stop(delay: Duration) {
let (process, started, stopped) = LongRunningProcess::new();
let mut proc = Process::new(process);
proc.start().await.unwrap();
proc.wait_for(ProcessState::Running).await;
tokio::time::sleep(delay).await;
proc.stop().await.unwrap();
proc.wait_for(ProcessState::Stopped).await;
assert_eq!(started.load(Ordering::SeqCst), 1);
assert_eq!(stopped.load(Ordering::SeqCst), 1);
}
}
// ============================================================================
// Drop Behavior Tests
// ============================================================================
mod drop_behavior {
use super::*;
#[tokio::test]
async fn test_process_dropped_while_running() {
let (process, started, _stopped) = LongRunningProcess::new();
let mut proc = Process::new(process);
proc.start().await.unwrap();
proc.wait_for(ProcessState::Running).await;
assert_eq!(started.load(Ordering::SeqCst), 1);
// Drop the process
drop(proc);
// The task should be aborted
tokio::time::sleep(Duration::from_millis(50)).await;
}
#[tokio::test]
async fn test_process_dropped_before_start() {
let (process, started, _stopped) = LongRunningProcess::new();
let proc = Process::new(process);
// Drop without starting
drop(proc);
assert_eq!(started.load(Ordering::SeqCst), 0);
}
}
// ============================================================================
// Error Handler Tests
// ============================================================================
mod error_handler {
use super::*;
use crate::{ProcessFailure, ProcessFailureKind, ShutdownConfig};
use std::sync::Mutex;
#[tokio::test]
async fn test_on_error_callback_for_returned_error() {
let errors: Arc<Mutex<Vec<ProcessFailure>>> = Arc::new(Mutex::new(Vec::new()));
let errors_clone = errors.clone();
let mut proc = Process::builder(ErroringProcess)
.handle_id("error-test")
.shutdown_config(ShutdownConfig {
graceful_timeout: Duration::from_secs(1),
restart_on_success: false,
restart_delay: Duration::ZERO,
})
.on_error(move |e| {
errors_clone.lock().unwrap().push(e);
})
.build();
proc.start().await.unwrap();
proc.wait_for(ProcessState::Errored).await;
let errors = errors.lock().unwrap();
assert_eq!(errors.len(), 1);
assert_eq!(errors[0].handle_id.to_string(), "error-test");
assert!(matches!(errors[0].kind, ProcessFailureKind::Returned(_)));
}
#[tokio::test]
async fn test_on_error_callback_for_panic() {
let errors: Arc<Mutex<Vec<ProcessFailure>>> = Arc::new(Mutex::new(Vec::new()));
let errors_clone = errors.clone();
let mut proc = Process::builder(PanickingProcess)
.handle_id("panic-test")
.shutdown_config(ShutdownConfig {
graceful_timeout: Duration::from_secs(1),
restart_on_success: false,
restart_delay: Duration::ZERO,
})
.on_error(move |e| {
errors_clone.lock().unwrap().push(e);
})
.build();
proc.start().await.unwrap();
proc.wait_for(ProcessState::Errored).await;
let errors = errors.lock().unwrap();
assert_eq!(errors.len(), 1);
assert_eq!(errors[0].handle_id.to_string(), "panic-test");
assert!(matches!(errors[0].kind, ProcessFailureKind::Panicked(_)));
}
#[tokio::test]
async fn test_on_error_not_called_for_successful_process() {
let errors: Arc<Mutex<Vec<ProcessFailure>>> = Arc::new(Mutex::new(Vec::new()));
let errors_clone = errors.clone();
let (process, _started, _stopped) = LongRunningProcess::new();
let mut proc = Process::builder(process)
.shutdown_config(ShutdownConfig {
graceful_timeout: Duration::from_secs(1),
restart_on_success: false,
restart_delay: Duration::ZERO,
})
.on_error(move |e| {
errors_clone.lock().unwrap().push(e);
})
.build();
proc.start().await.unwrap();
proc.wait_for(ProcessState::Running).await;
proc.stop().await.unwrap();
proc.wait_for(ProcessState::Stopped).await;
let errors = errors.lock().unwrap();
assert_eq!(errors.len(), 0);
}
#[tokio::test]
async fn test_process_failure_display() {
let error = ProcessFailure {
handle_id: "test-id".into(),
kind: ProcessFailureKind::Returned("something went wrong".to_string()),
};
assert_eq!(
error.to_string(),
"process test-id returned error: something went wrong"
);
let error = ProcessFailure {
handle_id: "test-id".into(),
kind: ProcessFailureKind::Panicked("panic message".to_string()),
};
assert_eq!(error.to_string(), "process test-id panicked: panic message");
}
}
// ============================================================================
// Tracing Tests
// ============================================================================
mod tracing_tests {
use super::*;
use crate::ShutdownConfig;
use tracing_test::traced_test;
#[tokio::test]
#[traced_test]
async fn test_error_is_logged_when_process_returns_error() {
let mut proc = Process::builder(ErroringProcess)
.handle_id("traced-error-test")
.shutdown_config(ShutdownConfig {
graceful_timeout: Duration::from_secs(1),
restart_on_success: false,
restart_delay: Duration::ZERO,
})
.build();
proc.start().await.unwrap();
proc.wait_for(ProcessState::Errored).await;
assert!(logs_contain("process returned error"));
assert!(logs_contain("intentional error"));
}
#[tokio::test]
#[traced_test]
async fn test_panic_is_logged_when_process_panics() {
let mut proc = Process::builder(PanickingProcess)
.handle_id("traced-panic-test")
.shutdown_config(ShutdownConfig {
graceful_timeout: Duration::from_secs(1),
restart_on_success: false,
restart_delay: Duration::ZERO,
})
.build();
proc.start().await.unwrap();
proc.wait_for(ProcessState::Errored).await;
assert!(logs_contain("process panicked"));
}
#[tokio::test]
#[traced_test]
async fn test_graceful_shutdown_timeout_is_logged() {
let (process, _started) = StubbornProcess::new();
let mut proc = Process::builder(process)
.handle_id("traced-timeout-test")
.shutdown_config(ShutdownConfig {
graceful_timeout: Duration::from_millis(100),
restart_on_success: false,
restart_delay: Duration::ZERO,
})
.build();
proc.start().await.unwrap();
proc.wait_for(ProcessState::Running).await;
proc.stop().await.unwrap();
proc.wait_for(ProcessState::Stopped).await;
assert!(logs_contain("graceful shutdown timed out"));
}
#[tokio::test]
#[traced_test]
async fn test_start_stop_logging() {
let (process, _started, _stopped) = LongRunningProcess::new();
let mut proc = Process::builder(process)
.handle_id("traced-lifecycle-test")
.shutdown_config(ShutdownConfig {
graceful_timeout: Duration::from_secs(1),
restart_on_success: false,
restart_delay: Duration::ZERO,
})
.build();
proc.start().await.unwrap();
proc.wait_for(ProcessState::Running).await;
assert!(logs_contain("starting process"));
assert!(logs_contain("traced-lifecycle-test"));
proc.stop().await.unwrap();
proc.wait_for(ProcessState::Stopped).await;
assert!(logs_contain("stopping process"));
}
}