diff --git a/crates/mad/examples/comprehensive/Cargo.toml b/crates/mad/examples/comprehensive/Cargo.toml new file mode 100644 index 0000000..eb2e9c8 --- /dev/null +++ b/crates/mad/examples/comprehensive/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "mad-comprehensive-example" +version = "0.1.0" +edition = "2021" + +[[bin]] +name = "comprehensive" +path = "main.rs" + +[dependencies] +notmad = { path = "../.." } +tokio = { version = "1", features = ["full"] } +tokio-util = "0.7" +async-trait = "0.1" +anyhow = "1" +tracing = "0.1" +tracing-subscriber = "0.3" +rand = "0.8" \ No newline at end of file diff --git a/crates/mad/examples/comprehensive/main.rs b/crates/mad/examples/comprehensive/main.rs new file mode 100644 index 0000000..0fa40cb --- /dev/null +++ b/crates/mad/examples/comprehensive/main.rs @@ -0,0 +1,332 @@ +//! Comprehensive example demonstrating MAD's full capabilities. +//! +//! This example shows: +//! - Multiple component types (struct, closure, conditional) +//! - Component lifecycle (setup, run, close) +//! - Error handling and propagation +//! - Graceful shutdown with cancellation tokens +//! - Concurrent component execution + +use async_trait::async_trait; +use notmad::{Component, Mad, MadError}; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use tokio::time::{Duration, interval}; +use tokio_util::sync::CancellationToken; +use tracing::{error, info, warn}; + +/// A web server component that simulates handling HTTP requests. +struct WebServer { + port: u16, + request_count: Arc, +} + +#[async_trait] +impl Component for WebServer { + fn name(&self) -> Option { + Some(format!("web-server-{}", self.port)) + } + + async fn setup(&self) -> Result<(), MadError> { + info!("Setting up web server on port {}", self.port); + // In a real application, you might: + // - Bind to the port + // - Set up TLS certificates + // - Initialize middleware + tokio::time::sleep(Duration::from_millis(100)).await; + info!("Web server on port {} is ready", self.port); + Ok(()) + } + + async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> { + info!("Web server on port {} started", self.port); + let mut interval = interval(Duration::from_secs(1)); + + while !cancellation.is_cancelled() { + tokio::select! { + _ = cancellation.cancelled() => { + info!("Web server on port {} received shutdown signal", self.port); + break; + } + _ = interval.tick() => { + // Simulate handling requests + let count = self.request_count.fetch_add(1, Ordering::Relaxed); + info!("Server on port {} handled request #{}", self.port, count + 1); + } + } + } + + Ok(()) + } + + async fn close(&self) -> Result<(), MadError> { + info!("Shutting down web server on port {}", self.port); + // In a real application, you might: + // - Drain existing connections + // - Save server state + // - Close database connections + tokio::time::sleep(Duration::from_millis(200)).await; + let total = self.request_count.load(Ordering::Relaxed); + info!( + "Web server on port {} shut down. Total requests handled: {}", + self.port, total + ); + Ok(()) + } +} + +/// A background job processor that simulates processing tasks from a queue. +struct JobProcessor { + queue_name: String, + processing_interval: Duration, +} + +#[async_trait] +impl Component for JobProcessor { + fn name(&self) -> Option { + Some(format!("job-processor-{}", self.queue_name)) + } + + async fn setup(&self) -> Result<(), MadError> { + info!("Connecting to queue: {}", self.queue_name); + // Simulate connecting to a message queue + tokio::time::sleep(Duration::from_millis(150)).await; + Ok(()) + } + + async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> { + info!("Job processor for {} started", self.queue_name); + let mut interval = interval(self.processing_interval); + let mut job_count = 0; + + loop { + tokio::select! { + _ = cancellation.cancelled() => { + info!("Job processor for {} stopping...", self.queue_name); + break; + } + _ = interval.tick() => { + job_count += 1; + info!("Processing job #{} from {}", job_count, self.queue_name); + + // Simulate job processing + tokio::time::sleep(Duration::from_millis(100)).await; + + // Simulate occasional errors (but don't fail the component) + if job_count % 10 == 0 { + warn!("Job #{} from {} required retry", job_count, self.queue_name); + } + } + } + } + + info!( + "Job processor for {} processed {} jobs", + self.queue_name, job_count + ); + Ok(()) + } + + async fn close(&self) -> Result<(), MadError> { + info!("Disconnecting from queue: {}", self.queue_name); + tokio::time::sleep(Duration::from_millis(100)).await; + Ok(()) + } +} + +/// A health check component that monitors system health. +struct HealthChecker { + check_interval: Duration, +} + +#[async_trait] +impl Component for HealthChecker { + fn name(&self) -> Option { + Some("health-checker".to_string()) + } + + async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> { + info!("Health checker started"); + let mut interval = interval(self.check_interval); + + while !cancellation.is_cancelled() { + tokio::select! { + _ = cancellation.cancelled() => { + info!("Health checker stopping..."); + break; + } + _ = interval.tick() => { + // Simulate health checks + let cpu_usage = rand::random::() * 100.0; + let memory_usage = rand::random::() * 100.0; + + info!("System health: CPU={:.1}%, Memory={:.1}%", cpu_usage, memory_usage); + + if cpu_usage > 90.0 { + warn!("High CPU usage detected: {:.1}%", cpu_usage); + } + if memory_usage > 90.0 { + warn!("High memory usage detected: {:.1}%", memory_usage); + } + } + } + } + + Ok(()) + } +} + +/// A component that will fail after some time to demonstrate error handling. +struct FailingComponent { + fail_after: Duration, +} + +#[async_trait] +impl Component for FailingComponent { + fn name(&self) -> Option { + Some("failing-component".to_string()) + } + + async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> { + info!( + "Failing component started (will fail after {:?})", + self.fail_after + ); + + tokio::select! { + _ = cancellation.cancelled() => { + info!("Failing component cancelled before failure"); + Ok(()) + } + _ = tokio::time::sleep(self.fail_after) => { + error!("Failing component encountered an error!"); + Err(anyhow::anyhow!("Simulated component failure").into()) + } + } + } +} + +/// Debug component that logs system status periodically. +struct DebugComponent; + +#[async_trait] +impl Component for DebugComponent { + fn name(&self) -> Option { + Some("debug-component".to_string()) + } + + async fn run(&self, cancel: CancellationToken) -> Result<(), MadError> { + info!("Debug mode enabled - verbose logging active"); + let mut interval = interval(Duration::from_secs(5)); + + while !cancel.is_cancelled() { + tokio::select! { + _ = cancel.cancelled() => break, + _ = interval.tick() => { + info!("DEBUG: System running normally"); + } + } + } + + info!("Debug component shutting down"); + Ok(()) + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + // Initialize tracing for logging + tracing_subscriber::fmt() + .with_target(false) + .without_time() + .init(); + + info!("Starting comprehensive MAD example application"); + + // Check if we should enable the failing component + let enable_failure_demo = std::env::var("ENABLE_FAILURE").is_ok(); + + // Check if we should enable debug mode + let debug_mode = std::env::var("DEBUG").is_ok(); + + // Shared state for demonstration + let request_count = Arc::new(AtomicUsize::new(0)); + + // Build and run the application + let result = Mad::builder() + // Add web servers + .add(WebServer { + port: 8080, + request_count: request_count.clone(), + }) + .add(WebServer { + port: 8081, + request_count: request_count.clone(), + }) + // Add job processors + .add(JobProcessor { + queue_name: "high-priority".to_string(), + processing_interval: Duration::from_secs(2), + }) + .add(JobProcessor { + queue_name: "low-priority".to_string(), + processing_interval: Duration::from_secs(5), + }) + // Add health checker + .add(HealthChecker { + check_interval: Duration::from_secs(3), + }) + // Conditionally add a debug component using add_fn + .add_conditional(debug_mode, DebugComponent) + // Conditionally add failing component to demonstrate error handling + .add_conditional( + enable_failure_demo, + FailingComponent { + fail_after: Duration::from_secs(10), + }, + ) + // Add a simple metrics reporter using add_fn + .add_fn(|cancel: CancellationToken| async move { + info!("Metrics reporter started"); + let mut interval = interval(Duration::from_secs(10)); + let start = std::time::Instant::now(); + + while !cancel.is_cancelled() { + tokio::select! { + _ = cancel.cancelled() => break, + _ = interval.tick() => { + let uptime = start.elapsed(); + info!("Application uptime: {:?}", uptime); + } + } + } + + info!("Metrics reporter stopped"); + Ok(()) + }) + // Configure graceful shutdown timeout + .cancellation(Some(Duration::from_secs(5))) + // Run the application + .run() + .await; + + match result { + Ok(()) => { + info!("Application shut down successfully"); + Ok(()) + } + Err(e) => { + error!("Application failed: {}", e); + + // Check if it's an aggregate error with multiple failures + if let MadError::AggregateError(ref agg) = e { + error!("Multiple component failures detected:"); + for (i, err) in agg.get_errors().iter().enumerate() { + error!(" {}. {}", i + 1, err); + } + } + + Err(e.into()) + } + } +} diff --git a/crates/mad/examples/multi_service/Cargo.toml b/crates/mad/examples/multi_service/Cargo.toml new file mode 100644 index 0000000..1276d7b --- /dev/null +++ b/crates/mad/examples/multi_service/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "mad-multi-service-example" +version = "0.1.0" +edition = "2021" + +[[bin]] +name = "multi_service" +path = "main.rs" + +[dependencies] +notmad = { path = "../.." } +tokio = { version = "1", features = ["full"] } +tokio-util = "0.7" +async-trait = "0.1" +anyhow = "1" \ No newline at end of file diff --git a/crates/mad/examples/multi_service/main.rs b/crates/mad/examples/multi_service/main.rs new file mode 100644 index 0000000..b9b1222 --- /dev/null +++ b/crates/mad/examples/multi_service/main.rs @@ -0,0 +1,214 @@ +//! Example demonstrating running multiple services with MAD. +//! +//! This example shows how to run a web server, queue processor, and +//! scheduled task together, with graceful shutdown on Ctrl+C. + +use async_trait::async_trait; +use notmad::{Component, Mad, MadError}; +use tokio::time::{Duration, interval}; +use tokio_util::sync::CancellationToken; + +/// A simple web server component. +/// +/// In a real application, this would bind to a port and handle HTTP requests. +/// Here we simulate it by periodically logging that we're "handling" requests. +struct WebServer { + port: u16, +} + +#[async_trait] +impl Component for WebServer { + fn name(&self) -> Option { + Some(format!("WebServer:{}", self.port)) + } + + async fn setup(&self) -> Result<(), MadError> { + println!("[{}] Binding to port...", self.name().unwrap()); + // Simulate server setup time + tokio::time::sleep(Duration::from_millis(100)).await; + println!("[{}] Ready to accept connections", self.name().unwrap()); + Ok(()) + } + + async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> { + println!("[{}] Server started", self.name().unwrap()); + + // Simulate handling requests until shutdown + let mut request_id = 0; + let mut interval = interval(Duration::from_secs(2)); + + while !cancellation.is_cancelled() { + tokio::select! { + _ = cancellation.cancelled() => { + println!("[{}] Shutdown signal received", self.name().unwrap()); + break; + } + _ = interval.tick() => { + request_id += 1; + println!("[{}] Handling request #{}", self.name().unwrap(), request_id); + } + } + } + + Ok(()) + } + + async fn close(&self) -> Result<(), MadError> { + println!("[{}] Closing connections...", self.name().unwrap()); + // Simulate graceful connection drain + tokio::time::sleep(Duration::from_millis(200)).await; + println!("[{}] Server stopped", self.name().unwrap()); + Ok(()) + } +} + +/// A queue processor that consumes messages from a queue. +/// +/// This simulates processing messages from a message queue like +/// RabbitMQ, Kafka, or AWS SQS. +struct QueueProcessor { + queue_name: String, +} + +#[async_trait] +impl Component for QueueProcessor { + fn name(&self) -> Option { + Some(format!("QueueProcessor:{}", self.queue_name)) + } + + async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> { + println!("[{}] Started processing", self.name().unwrap()); + + let mut message_count = 0; + + // Process messages until shutdown + while !cancellation.is_cancelled() { + // Simulate waiting for and processing a message + tokio::select! { + _ = cancellation.cancelled() => { + println!("[{}] Stopping message processing", self.name().unwrap()); + break; + } + _ = tokio::time::sleep(Duration::from_secs(1)) => { + message_count += 1; + println!("[{}] Processed message #{}", self.name().unwrap(), message_count); + } + } + } + + println!( + "[{}] Processed {} messages total", + self.name().unwrap(), + message_count + ); + Ok(()) + } +} + +/// A scheduled task that runs periodically. +/// +/// This could be used for tasks like: +/// - Cleaning up old data +/// - Generating reports +/// - Syncing with external systems +struct ScheduledTask { + task_name: String, + interval_secs: u64, +} + +#[async_trait] +impl Component for ScheduledTask { + fn name(&self) -> Option { + Some(format!("ScheduledTask:{}", self.task_name)) + } + + async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> { + println!( + "[{}] Scheduled to run every {} seconds", + self.name().unwrap(), + self.interval_secs + ); + + let mut interval = interval(Duration::from_secs(self.interval_secs)); + let mut run_count = 0; + + while !cancellation.is_cancelled() { + tokio::select! { + _ = cancellation.cancelled() => { + println!("[{}] Scheduler stopping", self.name().unwrap()); + break; + } + _ = interval.tick() => { + run_count += 1; + println!("[{}] Executing run #{}", self.name().unwrap(), run_count); + + // Simulate task execution + tokio::time::sleep(Duration::from_millis(500)).await; + + println!("[{}] Run #{} completed", self.name().unwrap(), run_count); + } + } + } + + Ok(()) + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + println!("Starting multi-service application"); + println!("Press Ctrl+C to trigger graceful shutdown"); + println!("----------------------------------------"); + + // Build the application with multiple services + Mad::builder() + // Add a web server on port 8080 + .add(WebServer { port: 8080 }) + // Add another web server on port 8081 (e.g., admin interface) + .add(WebServer { port: 8081 }) + // Add queue processors for different queues + .add(QueueProcessor { + queue_name: "orders".to_string(), + }) + .add(QueueProcessor { + queue_name: "notifications".to_string(), + }) + // Add scheduled tasks + .add(ScheduledTask { + task_name: "cleanup".to_string(), + interval_secs: 5, + }) + .add(ScheduledTask { + task_name: "report_generator".to_string(), + interval_secs: 10, + }) + // Add a monitoring component using a closure + .add_fn(|cancel| async move { + println!("[Monitor] Starting system monitor"); + let mut interval = interval(Duration::from_secs(15)); + + while !cancel.is_cancelled() { + tokio::select! { + _ = cancel.cancelled() => { + println!("[Monitor] Stopping monitor"); + break; + } + _ = interval.tick() => { + println!("[Monitor] All systems operational"); + } + } + } + + Ok(()) + }) + // Set graceful shutdown timeout to 3 seconds + .cancellation(Some(Duration::from_secs(3))) + // Run all components + .run() + .await?; + + println!("----------------------------------------"); + println!("All services shut down successfully"); + + Ok(()) +} diff --git a/crates/mad/src/lib.rs b/crates/mad/src/lib.rs index 5ec2801..1a16feb 100644 --- a/crates/mad/src/lib.rs +++ b/crates/mad/src/lib.rs @@ -1,3 +1,80 @@ +//! # MAD - Lifecycle Manager for Rust Applications +//! +//! MAD is a robust lifecycle manager designed for long-running Rust operations. It provides +//! a simple, composable way to manage multiple concurrent services within your application, +//! handling graceful startup and shutdown automatically. +//! +//! ## Overview +//! +//! MAD helps you build applications composed of multiple long-running components that need +//! to be orchestrated together. It handles: +//! +//! - **Concurrent execution** of multiple components +//! - **Graceful shutdown** with cancellation tokens +//! - **Error aggregation** from multiple components +//! - **Lifecycle management** with setup, run, and close phases +//! +//! ## Quick Start +//! +//! ```rust,no_run +//! use notmad::{Component, Mad}; +//! use async_trait::async_trait; +//! use tokio_util::sync::CancellationToken; +//! +//! struct MyService { +//! name: String, +//! } +//! +//! #[async_trait] +//! impl Component for MyService { +//! fn name(&self) -> Option { +//! Some(self.name.clone()) +//! } +//! +//! async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> { +//! // Your service logic here +//! while !cancellation.is_cancelled() { +//! // Do work... +//! tokio::time::sleep(std::time::Duration::from_secs(1)).await; +//! } +//! Ok(()) +//! } +//! } +//! +//! #[tokio::main] +//! async fn main() -> anyhow::Result<()> { +//! Mad::builder() +//! .add(MyService { name: "service-1".into() }) +//! .add(MyService { name: "service-2".into() }) +//! .run() +//! .await?; +//! Ok(()) +//! } +//! ``` +//! +//! ## Component Lifecycle +//! +//! Components go through three phases: +//! +//! 1. **Setup**: Optional initialization phase before components start running +//! 2. **Run**: Main execution phase where components perform their work +//! 3. **Close**: Optional cleanup phase after components stop +//! +//! ## Error Handling +//! +//! MAD provides comprehensive error handling through [`MadError`], which can: +//! - Wrap errors from individual components +//! - Aggregate multiple errors when several components fail +//! - Automatically convert from `anyhow::Error` +//! +//! ## Shutdown Behavior +//! +//! MAD handles shutdown gracefully: +//! - Responds to SIGTERM and Ctrl+C signals +//! - Propagates cancellation tokens to all components +//! - Waits for components to finish cleanup +//! - Configurable cancellation timeout + use futures::stream::FuturesUnordered; use futures_util::StreamExt; use std::{fmt::Display, sync::Arc}; @@ -9,23 +86,43 @@ use crate::waiter::Waiter; mod waiter; +/// Error type for MAD operations. +/// +/// This enum represents all possible errors that can occur during +/// the lifecycle of MAD components. #[derive(thiserror::Error, Debug)] pub enum MadError { + /// Generic error wrapper for anyhow errors. + /// + /// This variant is used when components return errors via the `?` operator + /// or when converting from `anyhow::Error`. #[error("component: {0:#?}")] Inner(#[source] anyhow::Error), + /// Error that occurred during the run phase of a component. #[error("component: {run:#?}")] RunError { run: anyhow::Error }, + /// Error that occurred during the close phase of a component. #[error("component(s) failed: {close}")] CloseError { close: anyhow::Error }, + /// Multiple errors from different components. + /// + /// This is used when multiple components fail simultaneously, + /// allowing all errors to be reported rather than just the first one. #[error("component(s): {0}")] AggregateError(AggregateError), + /// Returned when a component doesn't implement the optional setup method. + /// + /// This is not typically an error condition as setup is optional. #[error("setup not defined")] SetupNotDefined, + /// Returned when a component doesn't implement the optional close method. + /// + /// This is not typically an error condition as close is optional. #[error("close not defined")] CloseNotDefined, } @@ -36,12 +133,30 @@ impl From for MadError { } } +/// Container for multiple errors from different components. +/// +/// When multiple components fail, their errors are collected +/// into this struct to provide complete error reporting. #[derive(Debug)] pub struct AggregateError { errors: Vec, } impl AggregateError { + /// Returns a slice of all contained errors. + /// + /// # Example + /// + /// ```rust,ignore + /// match result { + /// Err(notmad::MadError::AggregateError(agg)) => { + /// for error in agg.get_errors() { + /// eprintln!("Component error: {}", error); + /// } + /// } + /// _ => {} + /// } + /// ``` pub fn get_errors(&self) -> &[MadError] { &self.errors } @@ -68,6 +183,35 @@ impl Display for AggregateError { } } +/// The main lifecycle manager for running multiple components. +/// +/// `Mad` orchestrates the lifecycle of multiple components, ensuring they +/// start up in order, run concurrently, and shut down gracefully. +/// +/// # Example +/// +/// ```rust +/// use notmad::{Component, Mad}; +/// use async_trait::async_trait; +/// use tokio_util::sync::CancellationToken; +/// +/// struct MyComponent; +/// +/// #[async_trait] +/// impl Component for MyComponent { +/// async fn run(&self, _cancel: CancellationToken) -> Result<(), notmad::MadError> { +/// Ok(()) +/// } +/// } +/// +/// # async fn example() -> Result<(), notmad::MadError> { +/// Mad::builder() +/// .add(MyComponent) +/// .run() +/// .await?; +/// # Ok(()) +/// # } +/// ``` pub struct Mad { components: Vec>, @@ -80,6 +224,18 @@ struct CompletionResult { } impl Mad { + /// Creates a new `Mad` builder. + /// + /// This is the entry point for constructing a MAD application. + /// Components are added using the builder pattern before calling `run()`. + /// + /// # Example + /// + /// ```rust + /// use notmad::Mad; + /// + /// let mut app = Mad::builder(); + /// ``` pub fn builder() -> Self { Self { components: Vec::default(), @@ -88,12 +244,65 @@ impl Mad { } } + /// Adds a component to the MAD application. + /// + /// Components will be set up in the order they are added, + /// run concurrently, and closed in the order they were added. + /// + /// # Arguments + /// + /// * `component` - Any type that implements `Component` or `IntoComponent` + /// + /// # Example + /// + /// ```rust + /// use notmad::{Component, Mad}; + /// # use async_trait::async_trait; + /// # use tokio_util::sync::CancellationToken; + /// # struct MyService; + /// # #[async_trait] + /// # impl Component for MyService { + /// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) } + /// # } + /// + /// Mad::builder() + /// .add(MyService) + /// .add(MyService); + /// ``` pub fn add(&mut self, component: impl IntoComponent) -> &mut Self { self.components.push(component.into_component()); self } + /// Conditionally adds a component based on a boolean condition. + /// + /// If the condition is false, a waiter component is added instead, + /// which simply waits for cancellation without doing any work. + /// + /// # Arguments + /// + /// * `condition` - If true, adds the component; if false, adds a waiter + /// * `component` - The component to add if condition is true + /// + /// # Example + /// + /// ```rust + /// use notmad::Mad; + /// # use notmad::Component; + /// # use async_trait::async_trait; + /// # use tokio_util::sync::CancellationToken; + /// # struct DebugService; + /// # #[async_trait] + /// # impl Component for DebugService { + /// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) } + /// # } + /// + /// let enable_debug = std::env::var("DEBUG").is_ok(); + /// + /// Mad::builder() + /// .add_conditional(enable_debug, DebugService); + /// ``` pub fn add_conditional(&mut self, condition: bool, component: impl IntoComponent) -> &mut Self { if condition { self.components.push(component.into_component()); @@ -105,12 +314,53 @@ impl Mad { self } + /// Adds a waiter component that does nothing but wait for cancellation. + /// + /// This is useful when you need a placeholder component or want + /// the application to keep running without any specific work. + /// + /// # Example + /// + /// ```rust,no_run + /// # async fn example() { + /// use notmad::Mad; + /// + /// Mad::builder() + /// .add_wait() // Keeps the app running until shutdown signal + /// .run() + /// .await; + /// # } + /// ``` pub fn add_wait(&mut self) -> &mut Self { self.components.push(Waiter::default().into_component()); self } + /// Adds a closure or function as a component. + /// + /// This is a convenient way to add simple components without + /// creating a full struct that implements `Component`. + /// + /// # Arguments + /// + /// * `f` - A closure that takes a `CancellationToken` and returns a future + /// + /// # Example + /// + /// ```rust + /// use notmad::Mad; + /// use tokio_util::sync::CancellationToken; + /// + /// Mad::builder() + /// .add_fn(|cancel: CancellationToken| async move { + /// while !cancel.is_cancelled() { + /// println!("Working..."); + /// tokio::time::sleep(std::time::Duration::from_secs(1)).await; + /// } + /// Ok(()) + /// }); + /// ``` pub fn add_fn(&mut self, f: F) -> &mut Self where F: Fn(CancellationToken) -> Fut + Send + Sync + 'static, @@ -121,12 +371,63 @@ impl Mad { self.add(comp) } + /// Configures the cancellation timeout behavior. + /// + /// When a shutdown signal is received, MAD will: + /// 1. Send cancellation tokens to all components + /// 2. Wait for the specified duration + /// 3. Force shutdown if components haven't stopped + /// + /// # Arguments + /// + /// * `should_cancel` - Duration to wait after cancellation before forcing shutdown. + /// Pass `None` to wait indefinitely. + /// + /// # Example + /// + /// ```rust,no_run + /// # async fn example() { + /// use notmad::Mad; + /// use std::time::Duration; + /// + /// Mad::builder() + /// .cancellation(Some(Duration::from_secs(30))) // 30 second grace period + /// .run() + /// .await; + /// # } + /// ``` pub fn cancellation(&mut self, should_cancel: Option) -> &mut Self { self.should_cancel = should_cancel; self } + /// Runs all components until completion or shutdown. + /// + /// This method: + /// 1. Calls `setup()` on all components (in order) + /// 2. Starts all components concurrently + /// 3. Waits for shutdown signal (SIGTERM, Ctrl+C) or component failure + /// 4. Sends cancellation to all components + /// 5. Calls `close()` on all components (in order) + /// + /// # Returns + /// + /// * `Ok(())` if all components shut down cleanly + /// * `Err(MadError)` if any component fails + /// + /// # Example + /// + /// ```rust,no_run + /// # use notmad::Mad; + /// # async fn example() -> Result<(), notmad::MadError> { + /// Mad::builder() + /// .add_wait() + /// .run() + /// .await?; + /// # Ok(()) + /// # } + /// ``` pub async fn run(&mut self) -> Result<(), MadError> { tracing::info!("running mad setup"); @@ -289,24 +590,148 @@ async fn signal_unix_terminate() { sigterm.recv().await; } +/// Trait for implementing MAD components. +/// +/// Components represent individual services or tasks that run as part +/// of your application. Each component has its own lifecycle with +/// optional setup and cleanup phases. +/// +/// # Example +/// +/// ```rust +/// use notmad::{Component, MadError}; +/// use async_trait::async_trait; +/// use tokio_util::sync::CancellationToken; +/// +/// struct DatabaseConnection { +/// url: String, +/// } +/// +/// #[async_trait] +/// impl Component for DatabaseConnection { +/// fn name(&self) -> Option { +/// Some("database".to_string()) +/// } +/// +/// async fn setup(&self) -> Result<(), MadError> { +/// println!("Connecting to database..."); +/// // Initialize connection pool +/// Ok(()) +/// } +/// +/// async fn run(&self, cancel: CancellationToken) -> Result<(), MadError> { +/// // Keep connection alive, handle queries +/// cancel.cancelled().await; +/// Ok(()) +/// } +/// +/// async fn close(&self) -> Result<(), MadError> { +/// println!("Closing database connection..."); +/// // Clean up resources +/// Ok(()) +/// } +/// } +/// ``` #[async_trait::async_trait] pub trait Component { + /// Returns an optional name for the component. + /// + /// This name is used in logging and error messages to identify + /// which component is being processed. + /// + /// # Default + /// + /// Returns `None` if not overridden. fn name(&self) -> Option { None } + /// Optional setup phase called before the component starts running. + /// + /// Use this for initialization tasks like: + /// - Establishing database connections + /// - Loading configuration + /// - Preparing resources + /// + /// # Default + /// + /// Returns `MadError::SetupNotDefined` which is handled gracefully. + /// + /// # Errors + /// + /// If setup fails with an error other than `SetupNotDefined`, + /// the entire application will stop before any components start running. async fn setup(&self) -> Result<(), MadError> { Err(MadError::SetupNotDefined) } + /// Main execution phase of the component. + /// + /// This method should contain the primary logic of your component. + /// It should respect the cancellation token and shut down gracefully + /// when cancellation is requested. + /// + /// # Arguments + /// + /// * `cancellation_token` - Signal for graceful shutdown + /// + /// # Implementation Guidelines + /// + /// - Check `cancellation_token.is_cancelled()` periodically + /// - Use `tokio::select!` with `cancellation_token.cancelled()` for async operations + /// - Clean up resources before returning + /// + /// # Errors + /// + /// Any error returned will trigger shutdown of all other components. async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError>; + /// Optional cleanup phase called after the component stops. + /// + /// Use this for cleanup tasks like: + /// - Flushing buffers + /// - Closing connections + /// - Saving state + /// + /// # Default + /// + /// Returns `MadError::CloseNotDefined` which is handled gracefully. + /// + /// # Errors + /// + /// Errors during close are logged but don't prevent other components + /// from closing. async fn close(&self) -> Result<(), MadError> { Err(MadError::CloseNotDefined) } } +/// Trait for converting types into components. +/// +/// This trait is automatically implemented for all types that implement +/// `Component + Send + Sync + 'static`, allowing them to be added to MAD +/// directly without manual conversion. +/// +/// # Example +/// +/// ```rust +/// use notmad::{Component, IntoComponent, Mad}; +/// # use async_trait::async_trait; +/// # use tokio_util::sync::CancellationToken; +/// +/// struct MyService; +/// +/// # #[async_trait] +/// # impl Component for MyService { +/// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) } +/// # } +/// +/// // MyService automatically implements IntoComponent +/// Mad::builder() +/// .add(MyService); // Works directly +/// ``` pub trait IntoComponent { + /// Converts self into an Arc-wrapped component. fn into_component(self) -> Arc; } diff --git a/crates/mad/src/waiter.rs b/crates/mad/src/waiter.rs index f54f2e1..5426326 100644 --- a/crates/mad/src/waiter.rs +++ b/crates/mad/src/waiter.rs @@ -1,3 +1,9 @@ +//! Waiter components for MAD. +//! +//! This module provides waiter components that simply wait for cancellation +//! without performing any work. Useful for keeping the application alive +//! or as placeholders in conditional component loading. + use std::sync::Arc; use async_trait::async_trait; @@ -5,6 +11,10 @@ use tokio_util::sync::CancellationToken; use crate::{Component, MadError}; +/// A default waiter component that panics if run. +/// +/// This is used internally as a placeholder that should never +/// actually be executed. pub struct DefaultWaiter {} #[async_trait] impl Component for DefaultWaiter { @@ -13,6 +23,20 @@ impl Component for DefaultWaiter { } } +/// A wrapper component that waits for cancellation. +/// +/// Instead of running the wrapped component's logic, this simply +/// waits for the cancellation token. This is useful for conditionally +/// disabling components while keeping the same structure. +/// +/// # Example +/// +/// ```rust,ignore +/// use mad::Waiter; +/// +/// // Instead of running the service, just wait +/// let waiter = Waiter::new(service.into_component()); +/// ``` pub struct Waiter { comp: Arc, } @@ -26,6 +50,10 @@ impl Default for Waiter { } impl Waiter { + /// Creates a new waiter that wraps the given component. + /// + /// The wrapped component's name will be used (prefixed with "waiter/"), + /// but its run method will not be called. pub fn new(c: Arc) -> Self { Self { comp: c } } @@ -33,6 +61,10 @@ impl Waiter { #[async_trait] impl Component for Waiter { + /// Returns the name of the waiter, prefixed with "waiter/". + /// + /// If the wrapped component has a name, it will be "waiter/{name}". + /// Otherwise, returns "waiter". fn name(&self) -> Option { match self.comp.name() { Some(name) => Some(format!("waiter/{name}")), @@ -40,6 +72,10 @@ impl Component for Waiter { } } + /// Waits for cancellation without performing any work. + /// + /// This method simply waits for the cancellation token to be triggered, + /// then returns successfully. async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError> { cancellation_token.cancelled().await;