1 Commits

Author SHA1 Message Date
cuddle-please
fd76feff3f chore(release): 0.7.5
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2025-07-24 20:45:16 +00:00
12 changed files with 45 additions and 1439 deletions

View File

@@ -6,17 +6,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
## [0.8.1] - 2025-08-09
### Other
- error logging
## [0.8.0] - 2025-08-08
### Added
- add docs
- update readme
## [0.7.5] - 2025-07-24 ## [0.7.5] - 2025-07-24
### Added ### Added

2
Cargo.lock generated
View File

@@ -278,7 +278,7 @@ dependencies = [
[[package]] [[package]]
name = "notmad" name = "notmad"
version = "0.7.5" version = "0.7.4"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",

View File

@@ -3,7 +3,7 @@ members = ["crates/*"]
resolver = "2" resolver = "2"
[workspace.package] [workspace.package]
version = "0.8.1" version = "0.7.5"
[workspace.dependencies] [workspace.dependencies]
mad = { path = "crates/mad" } mad = { path = "crates/mad" }

159
README.md
View File

@@ -1,92 +1,39 @@
# MAD - Lifecycle Manager for Rust Applications # MAD
[![Crates.io](https://img.shields.io/crates/v/notmad.svg)](https://crates.io/crates/notmad) Mad is a life-cycle manager for long running rust operations.
[![Documentation](https://docs.rs/notmad/badge.svg)](https://docs.rs/notmad)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
## Overview - Webservers
- Queue bindings
- gRPC servers etc
- Cron runners
MAD is a robust lifecycle manager designed for long-running Rust operations. It provides a simple, composable way to manage multiple concurrent services within your application, handling graceful startup and shutdown automatically. It is supposed to be the main thing the application runs, and everything from it is spawned and managed by it.
### Perfect for:
- 🌐 Web servers
- 📨 Queue consumers and message processors
- 🔌 gRPC servers
- ⏰ Cron job runners
- 🔄 Background workers
- 📡 Any long-running async operations
## Features
- **Component-based architecture** - Build your application from composable, reusable components
- **Graceful shutdown** - Automatic handling of shutdown signals with proper cleanup
- **Concurrent execution** - Run multiple components in parallel with tokio
- **Error handling** - Built-in error propagation and logging
- **Cancellation tokens** - Coordinate shutdown across all components
- **Minimal boilerplate** - Focus on your business logic, not lifecycle management
## Installation
Add MAD to your `Cargo.toml`:
```toml
[dependencies]
notmad = "0.7.5"
tokio = { version = "1", features = ["full"] }
async-trait = "0.1"
```
## Quick Start
Here's a simple example of a component that simulates a long-running server:
```rust ```rust
use mad::{Component, Mad}; struct WaitServer {}
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
// Define your component
struct WebServer {
port: u16,
}
#[async_trait] #[async_trait]
impl Component for WebServer { impl Component for WaitServer {
fn name(&self) -> Option<String> { fn name(&self) -> Option<String> {
Some(format!("WebServer on port {}", self.port)) Some("NeverEndingRun".into())
} }
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> { async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
println!("Starting web server on port {}", self.port); let millis_wait = rand::thread_rng().gen_range(50..1000);
// Your server logic here // Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
// The cancellation token will be triggered on shutdown tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
tokio::select! {
_ = cancellation.cancelled() => {
println!("Shutting down web server");
}
_ = self.serve() => {
println!("Server stopped");
}
}
Ok(()) Ok(())
} }
} }
impl WebServer {
async fn serve(&self) {
// Simulate a running server
tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
}
}
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
// Build and run your application
Mad::builder() Mad::builder()
.add(WebServer { port: 8080 }) .add(WaitServer {})
.add(WebServer { port: 8081 }) // You can add multiple instances .add(WaitServer {})
.add(WaitServer {})
.run() .run()
.await?; .await?;
@@ -94,75 +41,11 @@ async fn main() -> anyhow::Result<()> {
} }
``` ```
## Advanced Usage
### Custom Components
Components can be anything that implements the `Component` trait:
```rust
use mad::{Component, Mad};
use async_trait::async_trait;
struct QueueProcessor {
queue_name: String,
}
#[async_trait]
impl Component for QueueProcessor {
fn name(&self) -> Option<String> {
Some(format!("QueueProcessor-{}", self.queue_name))
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
while !cancellation.is_cancelled() {
// Process messages from queue
self.process_next_message().await?;
}
Ok(())
}
}
```
### Error Handling
MAD provides comprehensive error handling through the `MadError` type with automatic conversion from `anyhow::Error`:
```rust
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
// Errors automatically convert from anyhow::Error to MadError
database_operation().await?;
// Or return explicit errors
if some_condition {
return Err(anyhow::anyhow!("Something went wrong").into());
}
Ok(())
}
```
## Examples ## Examples
Check out the [examples directory](crates/mad/examples) for more detailed examples: Can be found (here)[crates/mad/examples]
- **basic** - Simple component lifecycle - basic
- **fn** - Using functions as components - fn
- **signals** - Handling system signals - signals
- **error_log** - Error handling and logging - error_log
## Contributing
Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
## Author
Created and maintained by [kjuulh](https://github.com/kjuulh)
## Repository
Find the source code at [https://github.com/kjuulh/mad](https://github.com/kjuulh/mad)

View File

@@ -1,18 +0,0 @@
[package]
name = "mad-comprehensive-example"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "comprehensive"
path = "main.rs"
[dependencies]
notmad = { path = "../.." }
tokio = { version = "1", features = ["full"] }
tokio-util = "0.7"
async-trait = "0.1"
anyhow = "1"
tracing = "0.1"
tracing-subscriber = "0.3"
rand = "0.8"

View File

@@ -1,332 +0,0 @@
//! Comprehensive example demonstrating MAD's full capabilities.
//!
//! This example shows:
//! - Multiple component types (struct, closure, conditional)
//! - Component lifecycle (setup, run, close)
//! - Error handling and propagation
//! - Graceful shutdown with cancellation tokens
//! - Concurrent component execution
use async_trait::async_trait;
use notmad::{Component, Mad, MadError};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::time::{Duration, interval};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
/// A web server component that simulates handling HTTP requests.
struct WebServer {
port: u16,
request_count: Arc<AtomicUsize>,
}
#[async_trait]
impl Component for WebServer {
fn name(&self) -> Option<String> {
Some(format!("web-server-{}", self.port))
}
async fn setup(&self) -> Result<(), MadError> {
info!("Setting up web server on port {}", self.port);
// In a real application, you might:
// - Bind to the port
// - Set up TLS certificates
// - Initialize middleware
tokio::time::sleep(Duration::from_millis(100)).await;
info!("Web server on port {} is ready", self.port);
Ok(())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
info!("Web server on port {} started", self.port);
let mut interval = interval(Duration::from_secs(1));
while !cancellation.is_cancelled() {
tokio::select! {
_ = cancellation.cancelled() => {
info!("Web server on port {} received shutdown signal", self.port);
break;
}
_ = interval.tick() => {
// Simulate handling requests
let count = self.request_count.fetch_add(1, Ordering::Relaxed);
info!("Server on port {} handled request #{}", self.port, count + 1);
}
}
}
Ok(())
}
async fn close(&self) -> Result<(), MadError> {
info!("Shutting down web server on port {}", self.port);
// In a real application, you might:
// - Drain existing connections
// - Save server state
// - Close database connections
tokio::time::sleep(Duration::from_millis(200)).await;
let total = self.request_count.load(Ordering::Relaxed);
info!(
"Web server on port {} shut down. Total requests handled: {}",
self.port, total
);
Ok(())
}
}
/// A background job processor that simulates processing tasks from a queue.
struct JobProcessor {
queue_name: String,
processing_interval: Duration,
}
#[async_trait]
impl Component for JobProcessor {
fn name(&self) -> Option<String> {
Some(format!("job-processor-{}", self.queue_name))
}
async fn setup(&self) -> Result<(), MadError> {
info!("Connecting to queue: {}", self.queue_name);
// Simulate connecting to a message queue
tokio::time::sleep(Duration::from_millis(150)).await;
Ok(())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
info!("Job processor for {} started", self.queue_name);
let mut interval = interval(self.processing_interval);
let mut job_count = 0;
loop {
tokio::select! {
_ = cancellation.cancelled() => {
info!("Job processor for {} stopping...", self.queue_name);
break;
}
_ = interval.tick() => {
job_count += 1;
info!("Processing job #{} from {}", job_count, self.queue_name);
// Simulate job processing
tokio::time::sleep(Duration::from_millis(100)).await;
// Simulate occasional errors (but don't fail the component)
if job_count % 10 == 0 {
warn!("Job #{} from {} required retry", job_count, self.queue_name);
}
}
}
}
info!(
"Job processor for {} processed {} jobs",
self.queue_name, job_count
);
Ok(())
}
async fn close(&self) -> Result<(), MadError> {
info!("Disconnecting from queue: {}", self.queue_name);
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
}
/// A health check component that monitors system health.
struct HealthChecker {
check_interval: Duration,
}
#[async_trait]
impl Component for HealthChecker {
fn name(&self) -> Option<String> {
Some("health-checker".to_string())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
info!("Health checker started");
let mut interval = interval(self.check_interval);
while !cancellation.is_cancelled() {
tokio::select! {
_ = cancellation.cancelled() => {
info!("Health checker stopping...");
break;
}
_ = interval.tick() => {
// Simulate health checks
let cpu_usage = rand::random::<f32>() * 100.0;
let memory_usage = rand::random::<f32>() * 100.0;
info!("System health: CPU={:.1}%, Memory={:.1}%", cpu_usage, memory_usage);
if cpu_usage > 90.0 {
warn!("High CPU usage detected: {:.1}%", cpu_usage);
}
if memory_usage > 90.0 {
warn!("High memory usage detected: {:.1}%", memory_usage);
}
}
}
}
Ok(())
}
}
/// A component that will fail after some time to demonstrate error handling.
struct FailingComponent {
fail_after: Duration,
}
#[async_trait]
impl Component for FailingComponent {
fn name(&self) -> Option<String> {
Some("failing-component".to_string())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
info!(
"Failing component started (will fail after {:?})",
self.fail_after
);
tokio::select! {
_ = cancellation.cancelled() => {
info!("Failing component cancelled before failure");
Ok(())
}
_ = tokio::time::sleep(self.fail_after) => {
error!("Failing component encountered an error!");
Err(anyhow::anyhow!("Simulated component failure").into())
}
}
}
}
/// Debug component that logs system status periodically.
struct DebugComponent;
#[async_trait]
impl Component for DebugComponent {
fn name(&self) -> Option<String> {
Some("debug-component".to_string())
}
async fn run(&self, cancel: CancellationToken) -> Result<(), MadError> {
info!("Debug mode enabled - verbose logging active");
let mut interval = interval(Duration::from_secs(5));
while !cancel.is_cancelled() {
tokio::select! {
_ = cancel.cancelled() => break,
_ = interval.tick() => {
info!("DEBUG: System running normally");
}
}
}
info!("Debug component shutting down");
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Initialize tracing for logging
tracing_subscriber::fmt()
.with_target(false)
.without_time()
.init();
info!("Starting comprehensive MAD example application");
// Check if we should enable the failing component
let enable_failure_demo = std::env::var("ENABLE_FAILURE").is_ok();
// Check if we should enable debug mode
let debug_mode = std::env::var("DEBUG").is_ok();
// Shared state for demonstration
let request_count = Arc::new(AtomicUsize::new(0));
// Build and run the application
let result = Mad::builder()
// Add web servers
.add(WebServer {
port: 8080,
request_count: request_count.clone(),
})
.add(WebServer {
port: 8081,
request_count: request_count.clone(),
})
// Add job processors
.add(JobProcessor {
queue_name: "high-priority".to_string(),
processing_interval: Duration::from_secs(2),
})
.add(JobProcessor {
queue_name: "low-priority".to_string(),
processing_interval: Duration::from_secs(5),
})
// Add health checker
.add(HealthChecker {
check_interval: Duration::from_secs(3),
})
// Conditionally add a debug component using add_fn
.add_conditional(debug_mode, DebugComponent)
// Conditionally add failing component to demonstrate error handling
.add_conditional(
enable_failure_demo,
FailingComponent {
fail_after: Duration::from_secs(10),
},
)
// Add a simple metrics reporter using add_fn
.add_fn(|cancel: CancellationToken| async move {
info!("Metrics reporter started");
let mut interval = interval(Duration::from_secs(10));
let start = std::time::Instant::now();
while !cancel.is_cancelled() {
tokio::select! {
_ = cancel.cancelled() => break,
_ = interval.tick() => {
let uptime = start.elapsed();
info!("Application uptime: {:?}", uptime);
}
}
}
info!("Metrics reporter stopped");
Ok(())
})
// Configure graceful shutdown timeout
.cancellation(Some(Duration::from_secs(5)))
// Run the application
.run()
.await;
match result {
Ok(()) => {
info!("Application shut down successfully");
Ok(())
}
Err(e) => {
error!("Application failed: {}", e);
// Check if it's an aggregate error with multiple failures
if let MadError::AggregateError(ref agg) = e {
error!("Multiple component failures detected:");
for (i, err) in agg.get_errors().iter().enumerate() {
error!(" {}. {}", i + 1, err);
}
}
Err(e.into())
}
}
}

View File

@@ -1,15 +0,0 @@
[package]
name = "mad-multi-service-example"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "multi_service"
path = "main.rs"
[dependencies]
notmad = { path = "../.." }
tokio = { version = "1", features = ["full"] }
tokio-util = "0.7"
async-trait = "0.1"
anyhow = "1"

View File

@@ -1,214 +0,0 @@
//! Example demonstrating running multiple services with MAD.
//!
//! This example shows how to run a web server, queue processor, and
//! scheduled task together, with graceful shutdown on Ctrl+C.
use async_trait::async_trait;
use notmad::{Component, Mad, MadError};
use tokio::time::{Duration, interval};
use tokio_util::sync::CancellationToken;
/// A simple web server component.
///
/// In a real application, this would bind to a port and handle HTTP requests.
/// Here we simulate it by periodically logging that we're "handling" requests.
struct WebServer {
port: u16,
}
#[async_trait]
impl Component for WebServer {
fn name(&self) -> Option<String> {
Some(format!("WebServer:{}", self.port))
}
async fn setup(&self) -> Result<(), MadError> {
println!("[{}] Binding to port...", self.name().unwrap());
// Simulate server setup time
tokio::time::sleep(Duration::from_millis(100)).await;
println!("[{}] Ready to accept connections", self.name().unwrap());
Ok(())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
println!("[{}] Server started", self.name().unwrap());
// Simulate handling requests until shutdown
let mut request_id = 0;
let mut interval = interval(Duration::from_secs(2));
while !cancellation.is_cancelled() {
tokio::select! {
_ = cancellation.cancelled() => {
println!("[{}] Shutdown signal received", self.name().unwrap());
break;
}
_ = interval.tick() => {
request_id += 1;
println!("[{}] Handling request #{}", self.name().unwrap(), request_id);
}
}
}
Ok(())
}
async fn close(&self) -> Result<(), MadError> {
println!("[{}] Closing connections...", self.name().unwrap());
// Simulate graceful connection drain
tokio::time::sleep(Duration::from_millis(200)).await;
println!("[{}] Server stopped", self.name().unwrap());
Ok(())
}
}
/// A queue processor that consumes messages from a queue.
///
/// This simulates processing messages from a message queue like
/// RabbitMQ, Kafka, or AWS SQS.
struct QueueProcessor {
queue_name: String,
}
#[async_trait]
impl Component for QueueProcessor {
fn name(&self) -> Option<String> {
Some(format!("QueueProcessor:{}", self.queue_name))
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
println!("[{}] Started processing", self.name().unwrap());
let mut message_count = 0;
// Process messages until shutdown
while !cancellation.is_cancelled() {
// Simulate waiting for and processing a message
tokio::select! {
_ = cancellation.cancelled() => {
println!("[{}] Stopping message processing", self.name().unwrap());
break;
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {
message_count += 1;
println!("[{}] Processed message #{}", self.name().unwrap(), message_count);
}
}
}
println!(
"[{}] Processed {} messages total",
self.name().unwrap(),
message_count
);
Ok(())
}
}
/// A scheduled task that runs periodically.
///
/// This could be used for tasks like:
/// - Cleaning up old data
/// - Generating reports
/// - Syncing with external systems
struct ScheduledTask {
task_name: String,
interval_secs: u64,
}
#[async_trait]
impl Component for ScheduledTask {
fn name(&self) -> Option<String> {
Some(format!("ScheduledTask:{}", self.task_name))
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
println!(
"[{}] Scheduled to run every {} seconds",
self.name().unwrap(),
self.interval_secs
);
let mut interval = interval(Duration::from_secs(self.interval_secs));
let mut run_count = 0;
while !cancellation.is_cancelled() {
tokio::select! {
_ = cancellation.cancelled() => {
println!("[{}] Scheduler stopping", self.name().unwrap());
break;
}
_ = interval.tick() => {
run_count += 1;
println!("[{}] Executing run #{}", self.name().unwrap(), run_count);
// Simulate task execution
tokio::time::sleep(Duration::from_millis(500)).await;
println!("[{}] Run #{} completed", self.name().unwrap(), run_count);
}
}
}
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
println!("Starting multi-service application");
println!("Press Ctrl+C to trigger graceful shutdown");
println!("----------------------------------------");
// Build the application with multiple services
Mad::builder()
// Add a web server on port 8080
.add(WebServer { port: 8080 })
// Add another web server on port 8081 (e.g., admin interface)
.add(WebServer { port: 8081 })
// Add queue processors for different queues
.add(QueueProcessor {
queue_name: "orders".to_string(),
})
.add(QueueProcessor {
queue_name: "notifications".to_string(),
})
// Add scheduled tasks
.add(ScheduledTask {
task_name: "cleanup".to_string(),
interval_secs: 5,
})
.add(ScheduledTask {
task_name: "report_generator".to_string(),
interval_secs: 10,
})
// Add a monitoring component using a closure
.add_fn(|cancel| async move {
println!("[Monitor] Starting system monitor");
let mut interval = interval(Duration::from_secs(15));
while !cancel.is_cancelled() {
tokio::select! {
_ = cancel.cancelled() => {
println!("[Monitor] Stopping monitor");
break;
}
_ = interval.tick() => {
println!("[Monitor] All systems operational");
}
}
}
Ok(())
})
// Set graceful shutdown timeout to 3 seconds
.cancellation(Some(Duration::from_secs(3)))
// Run all components
.run()
.await?;
println!("----------------------------------------");
println!("All services shut down successfully");
Ok(())
}

View File

@@ -1,86 +0,0 @@
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
struct NestedErrorComponent {
name: String,
}
#[async_trait]
impl notmad::Component for NestedErrorComponent {
fn name(&self) -> Option<String> {
Some(self.name.clone())
}
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
// Simulate a deeply nested error
let io_error = std::io::Error::new(
std::io::ErrorKind::PermissionDenied,
"access denied to /etc/secret",
);
Err(anyhow::Error::from(io_error)
.context("failed to read configuration file")
.context("unable to initialize database connection pool")
.context(format!("component '{}' startup failed", self.name))
.into())
}
}
struct AnotherFailingComponent;
#[async_trait]
impl notmad::Component for AnotherFailingComponent {
fn name(&self) -> Option<String> {
Some("another-component".into())
}
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Err(anyhow::anyhow!("network timeout after 30s")
.context("failed to connect to external API")
.context("service health check failed")
.into())
}
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_env_filter("mad=debug")
.init();
let result = notmad::Mad::builder()
.add(NestedErrorComponent {
name: "database-service".into(),
})
.add(AnotherFailingComponent)
.run()
.await;
match result {
Ok(()) => println!("Success!"),
Err(e) => {
eprintln!("\n=== Error occurred ===");
eprintln!("{}", e);
// Also demonstrate how to walk the error chain manually
if let notmad::MadError::AggregateError(ref agg) = e {
eprintln!("\n=== Detailed error chains ===");
for (i, error) in agg.get_errors().iter().enumerate() {
eprintln!("\nComponent {} error chain:", i + 1);
if let notmad::MadError::Inner(inner) = error {
for (j, cause) in inner.chain().enumerate() {
eprintln!(" {}. {}", j + 1, cause);
}
}
}
} else if let notmad::MadError::Inner(ref inner) = e {
eprintln!("\n=== Error chain ===");
for (i, cause) in inner.chain().enumerate() {
eprintln!(" {}. {}", i + 1, cause);
}
}
}
}
}

View File

@@ -1,83 +1,6 @@
//! # MAD - Lifecycle Manager for Rust Applications
//!
//! MAD is a robust lifecycle manager designed for long-running Rust operations. It provides
//! a simple, composable way to manage multiple concurrent services within your application,
//! handling graceful startup and shutdown automatically.
//!
//! ## Overview
//!
//! MAD helps you build applications composed of multiple long-running components that need
//! to be orchestrated together. It handles:
//!
//! - **Concurrent execution** of multiple components
//! - **Graceful shutdown** with cancellation tokens
//! - **Error aggregation** from multiple components
//! - **Lifecycle management** with setup, run, and close phases
//!
//! ## Quick Start
//!
//! ```rust,no_run
//! use notmad::{Component, Mad};
//! use async_trait::async_trait;
//! use tokio_util::sync::CancellationToken;
//!
//! struct MyService {
//! name: String,
//! }
//!
//! #[async_trait]
//! impl Component for MyService {
//! fn name(&self) -> Option<String> {
//! Some(self.name.clone())
//! }
//!
//! async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
//! // Your service logic here
//! while !cancellation.is_cancelled() {
//! // Do work...
//! tokio::time::sleep(std::time::Duration::from_secs(1)).await;
//! }
//! Ok(())
//! }
//! }
//!
//! #[tokio::main]
//! async fn main() -> anyhow::Result<()> {
//! Mad::builder()
//! .add(MyService { name: "service-1".into() })
//! .add(MyService { name: "service-2".into() })
//! .run()
//! .await?;
//! Ok(())
//! }
//! ```
//!
//! ## Component Lifecycle
//!
//! Components go through three phases:
//!
//! 1. **Setup**: Optional initialization phase before components start running
//! 2. **Run**: Main execution phase where components perform their work
//! 3. **Close**: Optional cleanup phase after components stop
//!
//! ## Error Handling
//!
//! MAD provides comprehensive error handling through [`MadError`], which can:
//! - Wrap errors from individual components
//! - Aggregate multiple errors when several components fail
//! - Automatically convert from `anyhow::Error`
//!
//! ## Shutdown Behavior
//!
//! MAD handles shutdown gracefully:
//! - Responds to SIGTERM and Ctrl+C signals
//! - Propagates cancellation tokens to all components
//! - Waits for components to finish cleanup
//! - Configurable cancellation timeout
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures_util::StreamExt; use futures_util::StreamExt;
use std::{fmt::Display, sync::Arc, error::Error}; use std::{fmt::Display, sync::Arc};
use tokio::signal::unix::{SignalKind, signal}; use tokio::signal::unix::{SignalKind, signal};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@@ -86,48 +9,23 @@ use crate::waiter::Waiter;
mod waiter; mod waiter;
/// Error type for MAD operations.
///
/// This enum represents all possible errors that can occur during
/// the lifecycle of MAD components.
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
pub enum MadError { pub enum MadError {
/// Generic error wrapper for anyhow errors. #[error("component: {0:#?}")]
/// Inner(#[source] anyhow::Error),
/// This variant is used when components return errors via the `?` operator
/// or when converting from `anyhow::Error`.
#[error(transparent)]
Inner(anyhow::Error),
/// Error that occurred during the run phase of a component. #[error("component: {run:#?}")]
#[error(transparent)] RunError { run: anyhow::Error },
RunError {
run: anyhow::Error
},
/// Error that occurred during the close phase of a component. #[error("component(s) failed: {close}")]
#[error("component(s) failed during close")] CloseError { close: anyhow::Error },
CloseError {
#[source]
close: anyhow::Error
},
/// Multiple errors from different components. #[error("component(s): {0}")]
///
/// This is used when multiple components fail simultaneously,
/// allowing all errors to be reported rather than just the first one.
#[error("{0}")]
AggregateError(AggregateError), AggregateError(AggregateError),
/// Returned when a component doesn't implement the optional setup method.
///
/// This is not typically an error condition as setup is optional.
#[error("setup not defined")] #[error("setup not defined")]
SetupNotDefined, SetupNotDefined,
/// Returned when a component doesn't implement the optional close method.
///
/// This is not typically an error condition as close is optional.
#[error("close not defined")] #[error("close not defined")]
CloseNotDefined, CloseNotDefined,
} }
@@ -138,30 +36,12 @@ impl From<anyhow::Error> for MadError {
} }
} }
/// Container for multiple errors from different components. #[derive(Debug)]
///
/// When multiple components fail, their errors are collected
/// into this struct to provide complete error reporting.
#[derive(Debug, thiserror::Error)]
pub struct AggregateError { pub struct AggregateError {
errors: Vec<MadError>, errors: Vec<MadError>,
} }
impl AggregateError { impl AggregateError {
/// Returns a slice of all contained errors.
///
/// # Example
///
/// ```rust,ignore
/// match result {
/// Err(notmad::MadError::AggregateError(agg)) => {
/// for error in agg.get_errors() {
/// eprintln!("Component error: {}", error);
/// }
/// }
/// _ => {}
/// }
/// ```
pub fn get_errors(&self) -> &[MadError] { pub fn get_errors(&self) -> &[MadError] {
&self.errors &self.errors
} }
@@ -174,55 +54,20 @@ impl Display for AggregateError {
} }
if self.errors.len() == 1 { if self.errors.len() == 1 {
return write!(f, "{}", self.errors[0]); return f.write_str(&self.errors.first().unwrap().to_string());
} }
writeln!(f, "{} component errors occurred:", self.errors.len())?; f.write_str("MadError::AggregateError: (")?;
for (i, error) in self.errors.iter().enumerate() {
write!(f, "\n[Component {}] {}", i + 1, error)?;
// Print the error chain for each component error for error in &self.errors {
let mut source = error.source(); f.write_str(&error.to_string())?;
let mut level = 1; f.write_str(", ")?;
while let Some(err) = source {
write!(f, "\n {}. {}", level, err)?;
source = err.source();
level += 1;
}
} }
Ok(())
f.write_str(")")
} }
} }
/// The main lifecycle manager for running multiple components.
///
/// `Mad` orchestrates the lifecycle of multiple components, ensuring they
/// start up in order, run concurrently, and shut down gracefully.
///
/// # Example
///
/// ```rust
/// use notmad::{Component, Mad};
/// use async_trait::async_trait;
/// use tokio_util::sync::CancellationToken;
///
/// struct MyComponent;
///
/// #[async_trait]
/// impl Component for MyComponent {
/// async fn run(&self, _cancel: CancellationToken) -> Result<(), notmad::MadError> {
/// Ok(())
/// }
/// }
///
/// # async fn example() -> Result<(), notmad::MadError> {
/// Mad::builder()
/// .add(MyComponent)
/// .run()
/// .await?;
/// # Ok(())
/// # }
/// ```
pub struct Mad { pub struct Mad {
components: Vec<Arc<dyn Component + Send + Sync + 'static>>, components: Vec<Arc<dyn Component + Send + Sync + 'static>>,
@@ -235,18 +80,6 @@ struct CompletionResult {
} }
impl Mad { impl Mad {
/// Creates a new `Mad` builder.
///
/// This is the entry point for constructing a MAD application.
/// Components are added using the builder pattern before calling `run()`.
///
/// # Example
///
/// ```rust
/// use notmad::Mad;
///
/// let mut app = Mad::builder();
/// ```
pub fn builder() -> Self { pub fn builder() -> Self {
Self { Self {
components: Vec::default(), components: Vec::default(),
@@ -255,65 +88,12 @@ impl Mad {
} }
} }
/// Adds a component to the MAD application.
///
/// Components will be set up in the order they are added,
/// run concurrently, and closed in the order they were added.
///
/// # Arguments
///
/// * `component` - Any type that implements `Component` or `IntoComponent`
///
/// # Example
///
/// ```rust
/// use notmad::{Component, Mad};
/// # use async_trait::async_trait;
/// # use tokio_util::sync::CancellationToken;
/// # struct MyService;
/// # #[async_trait]
/// # impl Component for MyService {
/// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) }
/// # }
///
/// Mad::builder()
/// .add(MyService)
/// .add(MyService);
/// ```
pub fn add(&mut self, component: impl IntoComponent) -> &mut Self { pub fn add(&mut self, component: impl IntoComponent) -> &mut Self {
self.components.push(component.into_component()); self.components.push(component.into_component());
self self
} }
/// Conditionally adds a component based on a boolean condition.
///
/// If the condition is false, a waiter component is added instead,
/// which simply waits for cancellation without doing any work.
///
/// # Arguments
///
/// * `condition` - If true, adds the component; if false, adds a waiter
/// * `component` - The component to add if condition is true
///
/// # Example
///
/// ```rust
/// use notmad::Mad;
/// # use notmad::Component;
/// # use async_trait::async_trait;
/// # use tokio_util::sync::CancellationToken;
/// # struct DebugService;
/// # #[async_trait]
/// # impl Component for DebugService {
/// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) }
/// # }
///
/// let enable_debug = std::env::var("DEBUG").is_ok();
///
/// Mad::builder()
/// .add_conditional(enable_debug, DebugService);
/// ```
pub fn add_conditional(&mut self, condition: bool, component: impl IntoComponent) -> &mut Self { pub fn add_conditional(&mut self, condition: bool, component: impl IntoComponent) -> &mut Self {
if condition { if condition {
self.components.push(component.into_component()); self.components.push(component.into_component());
@@ -325,53 +105,12 @@ impl Mad {
self self
} }
/// Adds a waiter component that does nothing but wait for cancellation.
///
/// This is useful when you need a placeholder component or want
/// the application to keep running without any specific work.
///
/// # Example
///
/// ```rust,no_run
/// # async fn example() {
/// use notmad::Mad;
///
/// Mad::builder()
/// .add_wait() // Keeps the app running until shutdown signal
/// .run()
/// .await;
/// # }
/// ```
pub fn add_wait(&mut self) -> &mut Self { pub fn add_wait(&mut self) -> &mut Self {
self.components.push(Waiter::default().into_component()); self.components.push(Waiter::default().into_component());
self self
} }
/// Adds a closure or function as a component.
///
/// This is a convenient way to add simple components without
/// creating a full struct that implements `Component`.
///
/// # Arguments
///
/// * `f` - A closure that takes a `CancellationToken` and returns a future
///
/// # Example
///
/// ```rust
/// use notmad::Mad;
/// use tokio_util::sync::CancellationToken;
///
/// Mad::builder()
/// .add_fn(|cancel: CancellationToken| async move {
/// while !cancel.is_cancelled() {
/// println!("Working...");
/// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
/// }
/// Ok(())
/// });
/// ```
pub fn add_fn<F, Fut>(&mut self, f: F) -> &mut Self pub fn add_fn<F, Fut>(&mut self, f: F) -> &mut Self
where where
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static, F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
@@ -382,63 +121,12 @@ impl Mad {
self.add(comp) self.add(comp)
} }
/// Configures the cancellation timeout behavior.
///
/// When a shutdown signal is received, MAD will:
/// 1. Send cancellation tokens to all components
/// 2. Wait for the specified duration
/// 3. Force shutdown if components haven't stopped
///
/// # Arguments
///
/// * `should_cancel` - Duration to wait after cancellation before forcing shutdown.
/// Pass `None` to wait indefinitely.
///
/// # Example
///
/// ```rust,no_run
/// # async fn example() {
/// use notmad::Mad;
/// use std::time::Duration;
///
/// Mad::builder()
/// .cancellation(Some(Duration::from_secs(30))) // 30 second grace period
/// .run()
/// .await;
/// # }
/// ```
pub fn cancellation(&mut self, should_cancel: Option<std::time::Duration>) -> &mut Self { pub fn cancellation(&mut self, should_cancel: Option<std::time::Duration>) -> &mut Self {
self.should_cancel = should_cancel; self.should_cancel = should_cancel;
self self
} }
/// Runs all components until completion or shutdown.
///
/// This method:
/// 1. Calls `setup()` on all components (in order)
/// 2. Starts all components concurrently
/// 3. Waits for shutdown signal (SIGTERM, Ctrl+C) or component failure
/// 4. Sends cancellation to all components
/// 5. Calls `close()` on all components (in order)
///
/// # Returns
///
/// * `Ok(())` if all components shut down cleanly
/// * `Err(MadError)` if any component fails
///
/// # Example
///
/// ```rust,no_run
/// # use notmad::Mad;
/// # async fn example() -> Result<(), notmad::MadError> {
/// Mad::builder()
/// .add_wait()
/// .run()
/// .await?;
/// # Ok(())
/// # }
/// ```
pub async fn run(&mut self) -> Result<(), MadError> { pub async fn run(&mut self) -> Result<(), MadError> {
tracing::info!("running mad setup"); tracing::info!("running mad setup");
@@ -601,148 +289,24 @@ async fn signal_unix_terminate() {
sigterm.recv().await; sigterm.recv().await;
} }
/// Trait for implementing MAD components.
///
/// Components represent individual services or tasks that run as part
/// of your application. Each component has its own lifecycle with
/// optional setup and cleanup phases.
///
/// # Example
///
/// ```rust
/// use notmad::{Component, MadError};
/// use async_trait::async_trait;
/// use tokio_util::sync::CancellationToken;
///
/// struct DatabaseConnection {
/// url: String,
/// }
///
/// #[async_trait]
/// impl Component for DatabaseConnection {
/// fn name(&self) -> Option<String> {
/// Some("database".to_string())
/// }
///
/// async fn setup(&self) -> Result<(), MadError> {
/// println!("Connecting to database...");
/// // Initialize connection pool
/// Ok(())
/// }
///
/// async fn run(&self, cancel: CancellationToken) -> Result<(), MadError> {
/// // Keep connection alive, handle queries
/// cancel.cancelled().await;
/// Ok(())
/// }
///
/// async fn close(&self) -> Result<(), MadError> {
/// println!("Closing database connection...");
/// // Clean up resources
/// Ok(())
/// }
/// }
/// ```
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait Component { pub trait Component {
/// Returns an optional name for the component.
///
/// This name is used in logging and error messages to identify
/// which component is being processed.
///
/// # Default
///
/// Returns `None` if not overridden.
fn name(&self) -> Option<String> { fn name(&self) -> Option<String> {
None None
} }
/// Optional setup phase called before the component starts running.
///
/// Use this for initialization tasks like:
/// - Establishing database connections
/// - Loading configuration
/// - Preparing resources
///
/// # Default
///
/// Returns `MadError::SetupNotDefined` which is handled gracefully.
///
/// # Errors
///
/// If setup fails with an error other than `SetupNotDefined`,
/// the entire application will stop before any components start running.
async fn setup(&self) -> Result<(), MadError> { async fn setup(&self) -> Result<(), MadError> {
Err(MadError::SetupNotDefined) Err(MadError::SetupNotDefined)
} }
/// Main execution phase of the component.
///
/// This method should contain the primary logic of your component.
/// It should respect the cancellation token and shut down gracefully
/// when cancellation is requested.
///
/// # Arguments
///
/// * `cancellation_token` - Signal for graceful shutdown
///
/// # Implementation Guidelines
///
/// - Check `cancellation_token.is_cancelled()` periodically
/// - Use `tokio::select!` with `cancellation_token.cancelled()` for async operations
/// - Clean up resources before returning
///
/// # Errors
///
/// Any error returned will trigger shutdown of all other components.
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError>; async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError>;
/// Optional cleanup phase called after the component stops.
///
/// Use this for cleanup tasks like:
/// - Flushing buffers
/// - Closing connections
/// - Saving state
///
/// # Default
///
/// Returns `MadError::CloseNotDefined` which is handled gracefully.
///
/// # Errors
///
/// Errors during close are logged but don't prevent other components
/// from closing.
async fn close(&self) -> Result<(), MadError> { async fn close(&self) -> Result<(), MadError> {
Err(MadError::CloseNotDefined) Err(MadError::CloseNotDefined)
} }
} }
/// Trait for converting types into components.
///
/// This trait is automatically implemented for all types that implement
/// `Component + Send + Sync + 'static`, allowing them to be added to MAD
/// directly without manual conversion.
///
/// # Example
///
/// ```rust
/// use notmad::{Component, IntoComponent, Mad};
/// # use async_trait::async_trait;
/// # use tokio_util::sync::CancellationToken;
///
/// struct MyService;
///
/// # #[async_trait]
/// # impl Component for MyService {
/// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) }
/// # }
///
/// // MyService automatically implements IntoComponent
/// Mad::builder()
/// .add(MyService); // Works directly
/// ```
pub trait IntoComponent { pub trait IntoComponent {
/// Converts self into an Arc-wrapped component.
fn into_component(self) -> Arc<dyn Component + Send + Sync + 'static>; fn into_component(self) -> Arc<dyn Component + Send + Sync + 'static>;
} }
@@ -782,132 +346,3 @@ where
self.execute(cancellation_token).await self.execute(cancellation_token).await
} }
} }
#[cfg(test)]
mod tests {
use super::*;
use anyhow::Context;
#[test]
fn test_error_chaining_display() {
// Test single error with context chain
let base_error = std::io::Error::new(std::io::ErrorKind::NotFound, "file not found");
let error = anyhow::Error::from(base_error)
.context("failed to read configuration")
.context("unable to initialize database")
.context("service startup failed");
let mad_error = MadError::Inner(error);
let display = format!("{}", mad_error);
// Should display the top-level error message
assert!(display.contains("service startup failed"));
// Test error chain iteration
if let MadError::Inner(ref e) = mad_error {
let chain: Vec<String> = e.chain().map(|c| c.to_string()).collect();
assert_eq!(chain.len(), 4);
assert_eq!(chain[0], "service startup failed");
assert_eq!(chain[1], "unable to initialize database");
assert_eq!(chain[2], "failed to read configuration");
assert_eq!(chain[3], "file not found");
}
}
#[test]
fn test_aggregate_error_display() {
let error1 = MadError::Inner(
anyhow::anyhow!("database connection failed")
.context("failed to connect to PostgreSQL")
);
let error2 = MadError::Inner(
anyhow::anyhow!("port already in use")
.context("failed to bind to port 8080")
.context("web server initialization failed")
);
let aggregate = MadError::AggregateError(AggregateError {
errors: vec![error1, error2],
});
let display = format!("{}", aggregate);
// Check that it shows multiple errors
assert!(display.contains("2 component errors occurred"));
assert!(display.contains("[Component 1]"));
assert!(display.contains("[Component 2]"));
// Check that context chains are displayed
assert!(display.contains("failed to connect to PostgreSQL"));
assert!(display.contains("database connection failed"));
assert!(display.contains("web server initialization failed"));
assert!(display.contains("failed to bind to port 8080"));
assert!(display.contains("port already in use"));
}
#[test]
fn test_single_error_aggregate() {
let error = MadError::Inner(anyhow::anyhow!("single error"));
let aggregate = AggregateError {
errors: vec![error],
};
let display = format!("{}", aggregate);
// Single error should be displayed directly
assert!(display.contains("single error"));
assert!(!display.contains("component errors occurred"));
}
#[test]
fn test_error_source_chain() {
let error = MadError::Inner(
anyhow::anyhow!("root cause")
.context("middle layer")
.context("top layer")
);
// Test that we can access the error chain
if let MadError::Inner(ref e) = error {
let chain: Vec<String> = e.chain().map(|c| c.to_string()).collect();
assert_eq!(chain.len(), 3);
assert_eq!(chain[0], "top layer");
assert_eq!(chain[1], "middle layer");
assert_eq!(chain[2], "root cause");
} else {
panic!("Expected MadError::Inner");
}
}
#[tokio::test]
async fn test_component_error_propagation() {
struct FailingComponent;
#[async_trait::async_trait]
impl Component for FailingComponent {
fn name(&self) -> Option<String> {
Some("test-component".to_string())
}
async fn run(&self, _cancel: CancellationToken) -> Result<(), MadError> {
Err(anyhow::anyhow!("IO error")
.context("failed to open file")
.context("component initialization failed")
.into())
}
}
let result = Mad::builder()
.add(FailingComponent)
.cancellation(Some(std::time::Duration::from_millis(100)))
.run()
.await;
assert!(result.is_err());
let error = result.unwrap_err();
// Check error display
let display = format!("{}", error);
assert!(display.contains("component initialization failed"));
}
}

View File

@@ -1,9 +1,3 @@
//! Waiter components for MAD.
//!
//! This module provides waiter components that simply wait for cancellation
//! without performing any work. Useful for keeping the application alive
//! or as placeholders in conditional component loading.
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
@@ -11,10 +5,6 @@ use tokio_util::sync::CancellationToken;
use crate::{Component, MadError}; use crate::{Component, MadError};
/// A default waiter component that panics if run.
///
/// This is used internally as a placeholder that should never
/// actually be executed.
pub struct DefaultWaiter {} pub struct DefaultWaiter {}
#[async_trait] #[async_trait]
impl Component for DefaultWaiter { impl Component for DefaultWaiter {
@@ -23,20 +13,6 @@ impl Component for DefaultWaiter {
} }
} }
/// A wrapper component that waits for cancellation.
///
/// Instead of running the wrapped component's logic, this simply
/// waits for the cancellation token. This is useful for conditionally
/// disabling components while keeping the same structure.
///
/// # Example
///
/// ```rust,ignore
/// use mad::Waiter;
///
/// // Instead of running the service, just wait
/// let waiter = Waiter::new(service.into_component());
/// ```
pub struct Waiter { pub struct Waiter {
comp: Arc<dyn Component + Send + Sync + 'static>, comp: Arc<dyn Component + Send + Sync + 'static>,
} }
@@ -50,10 +26,6 @@ impl Default for Waiter {
} }
impl Waiter { impl Waiter {
/// Creates a new waiter that wraps the given component.
///
/// The wrapped component's name will be used (prefixed with "waiter/"),
/// but its run method will not be called.
pub fn new(c: Arc<dyn Component + Send + Sync + 'static>) -> Self { pub fn new(c: Arc<dyn Component + Send + Sync + 'static>) -> Self {
Self { comp: c } Self { comp: c }
} }
@@ -61,10 +33,6 @@ impl Waiter {
#[async_trait] #[async_trait]
impl Component for Waiter { impl Component for Waiter {
/// Returns the name of the waiter, prefixed with "waiter/".
///
/// If the wrapped component has a name, it will be "waiter/{name}".
/// Otherwise, returns "waiter".
fn name(&self) -> Option<String> { fn name(&self) -> Option<String> {
match self.comp.name() { match self.comp.name() {
Some(name) => Some(format!("waiter/{name}")), Some(name) => Some(format!("waiter/{name}")),
@@ -72,10 +40,6 @@ impl Component for Waiter {
} }
} }
/// Waits for cancellation without performing any work.
///
/// This method simply waits for the cancellation token to be triggered,
/// then returns successfully.
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError> { async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError> {
cancellation_token.cancelled().await; cancellation_token.cancelled().await;

View File

@@ -1,6 +1,6 @@
# yaml-language-server: $schema=https://git.kjuulh.io/kjuulh/cuddle/raw/branch/main/schemas/base.json # yaml-language-server: $schema=https://git.front.kjuulh.io/kjuulh/cuddle/raw/branch/main/schemas/base.json
base: "git@git.kjuulh.io:kjuulh/cuddle-rust-lib-plan.git" base: "git@git.front.kjuulh.io:kjuulh/cuddle-rust-lib-plan.git"
vars: vars:
service: "mad" service: "mad"
@@ -12,6 +12,6 @@ please:
repository: "mad" repository: "mad"
branch: main branch: main
settings: settings:
api_url: "https://git.kjuulh.io" api_url: "https://git.front.kjuulh.io"
actions: actions:
rust: rust: