9 Commits

Author SHA1 Message Date
eb360e565c chore(release): v0.8.0 (#34)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
chore(release): 0.8.0

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: #34
2025-08-09 14:57:12 +02:00
c18c8a885c feat: add docs
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2025-08-08 23:06:23 +02:00
762da1e672 feat: update readme
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2025-08-07 11:29:29 +02:00
3bc512ab48 chore(release): v0.7.5 (#33)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
chore(release): 0.7.5

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: https://git.front.kjuulh.io/kjuulh/mad/pulls/33
2025-07-24 22:44:46 +02:00
7d8071d41b feat: print big inner
All checks were successful
continuous-integration/drone/push Build is passing
2025-07-24 22:44:13 +02:00
1cc4138ec7 chore: more error correction
All checks were successful
continuous-integration/drone/push Build is passing
2025-07-24 22:36:39 +02:00
00517daaaa chore: correct error test to not be as verbose 2025-07-24 22:27:04 +02:00
b941dc9a76 chore(release): v0.7.4 (#32)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
chore(release): 0.7.4

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: https://git.front.kjuulh.io/kjuulh/mad/pulls/32
2025-07-24 22:23:00 +02:00
5da3c83c12 feat: cleanup aggregate error for single error
All checks were successful
continuous-integration/drone/push Build is passing
2025-07-24 22:22:12 +02:00
11 changed files with 1216 additions and 31 deletions

View File

@@ -6,6 +6,26 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.8.0] - 2025-08-08
### Added
- add docs
- update readme
## [0.7.5] - 2025-07-24
### Added
- print big inner
### Other
- more error correction
- correct error test to not be as verbose
## [0.7.4] - 2025-07-24
### Added
- cleanup aggregate error for single error
## [0.7.3] - 2025-07-24
### Added

2
Cargo.lock generated
View File

@@ -278,7 +278,7 @@ dependencies = [
[[package]]
name = "notmad"
version = "0.7.2"
version = "0.7.5"
dependencies = [
"anyhow",
"async-trait",

View File

@@ -3,7 +3,7 @@ members = ["crates/*"]
resolver = "2"
[workspace.package]
version = "0.7.3"
version = "0.8.0"
[workspace.dependencies]
mad = { path = "crates/mad" }

163
README.md
View File

@@ -1,39 +1,92 @@
# MAD
# MAD - Lifecycle Manager for Rust Applications
Mad is a life-cycle manager for long running rust operations.
[![Crates.io](https://img.shields.io/crates/v/notmad.svg)](https://crates.io/crates/notmad)
[![Documentation](https://docs.rs/notmad/badge.svg)](https://docs.rs/notmad)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
- Webservers
- Queue bindings
- gRPC servers etc
- Cron runners
## Overview
It is supposed to be the main thing the application runs, and everything from it is spawned and managed by it.
MAD is a robust lifecycle manager designed for long-running Rust operations. It provides a simple, composable way to manage multiple concurrent services within your application, handling graceful startup and shutdown automatically.
### Perfect for:
- 🌐 Web servers
- 📨 Queue consumers and message processors
- 🔌 gRPC servers
- ⏰ Cron job runners
- 🔄 Background workers
- 📡 Any long-running async operations
## Features
- **Component-based architecture** - Build your application from composable, reusable components
- **Graceful shutdown** - Automatic handling of shutdown signals with proper cleanup
- **Concurrent execution** - Run multiple components in parallel with tokio
- **Error handling** - Built-in error propagation and logging
- **Cancellation tokens** - Coordinate shutdown across all components
- **Minimal boilerplate** - Focus on your business logic, not lifecycle management
## Installation
Add MAD to your `Cargo.toml`:
```toml
[dependencies]
notmad = "0.7.5"
tokio = { version = "1", features = ["full"] }
async-trait = "0.1"
```
## Quick Start
Here's a simple example of a component that simulates a long-running server:
```rust
struct WaitServer {}
use mad::{Component, Mad};
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
// Define your component
struct WebServer {
port: u16,
}
#[async_trait]
impl Component for WaitServer {
impl Component for WebServer {
fn name(&self) -> Option<String> {
Some("NeverEndingRun".into())
Some(format!("WebServer on port {}", self.port))
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
let millis_wait = rand::thread_rng().gen_range(50..1000);
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
println!("Starting web server on port {}", self.port);
// Your server logic here
// The cancellation token will be triggered on shutdown
tokio::select! {
_ = cancellation.cancelled() => {
println!("Shutting down web server");
}
_ = self.serve() => {
println!("Server stopped");
}
}
Ok(())
}
}
impl WebServer {
async fn serve(&self) {
// Simulate a running server
tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Build and run your application
Mad::builder()
.add(WaitServer {})
.add(WaitServer {})
.add(WaitServer {})
.add(WebServer { port: 8080 })
.add(WebServer { port: 8081 }) // You can add multiple instances
.run()
.await?;
@@ -41,11 +94,75 @@ async fn main() -> anyhow::Result<()> {
}
```
## Advanced Usage
### Custom Components
Components can be anything that implements the `Component` trait:
```rust
use mad::{Component, Mad};
use async_trait::async_trait;
struct QueueProcessor {
queue_name: String,
}
#[async_trait]
impl Component for QueueProcessor {
fn name(&self) -> Option<String> {
Some(format!("QueueProcessor-{}", self.queue_name))
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
while !cancellation.is_cancelled() {
// Process messages from queue
self.process_next_message().await?;
}
Ok(())
}
}
```
### Error Handling
MAD provides comprehensive error handling through the `MadError` type with automatic conversion from `anyhow::Error`:
```rust
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
// Errors automatically convert from anyhow::Error to MadError
database_operation().await?;
// Or return explicit errors
if some_condition {
return Err(anyhow::anyhow!("Something went wrong").into());
}
Ok(())
}
```
## Examples
Can be found (here)[crates/mad/examples]
Check out the [examples directory](crates/mad/examples) for more detailed examples:
- basic
- fn
- signals
- error_log
- **basic** - Simple component lifecycle
- **fn** - Using functions as components
- **signals** - Handling system signals
- **error_log** - Error handling and logging
## Contributing
Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
## Author
Created and maintained by [kjuulh](https://github.com/kjuulh)
## Repository
Find the source code at [https://github.com/kjuulh/mad](https://github.com/kjuulh/mad)

View File

@@ -0,0 +1,18 @@
[package]
name = "mad-comprehensive-example"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "comprehensive"
path = "main.rs"
[dependencies]
notmad = { path = "../.." }
tokio = { version = "1", features = ["full"] }
tokio-util = "0.7"
async-trait = "0.1"
anyhow = "1"
tracing = "0.1"
tracing-subscriber = "0.3"
rand = "0.8"

View File

@@ -0,0 +1,332 @@
//! Comprehensive example demonstrating MAD's full capabilities.
//!
//! This example shows:
//! - Multiple component types (struct, closure, conditional)
//! - Component lifecycle (setup, run, close)
//! - Error handling and propagation
//! - Graceful shutdown with cancellation tokens
//! - Concurrent component execution
use async_trait::async_trait;
use notmad::{Component, Mad, MadError};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::time::{Duration, interval};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
/// A web server component that simulates handling HTTP requests.
struct WebServer {
port: u16,
request_count: Arc<AtomicUsize>,
}
#[async_trait]
impl Component for WebServer {
fn name(&self) -> Option<String> {
Some(format!("web-server-{}", self.port))
}
async fn setup(&self) -> Result<(), MadError> {
info!("Setting up web server on port {}", self.port);
// In a real application, you might:
// - Bind to the port
// - Set up TLS certificates
// - Initialize middleware
tokio::time::sleep(Duration::from_millis(100)).await;
info!("Web server on port {} is ready", self.port);
Ok(())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
info!("Web server on port {} started", self.port);
let mut interval = interval(Duration::from_secs(1));
while !cancellation.is_cancelled() {
tokio::select! {
_ = cancellation.cancelled() => {
info!("Web server on port {} received shutdown signal", self.port);
break;
}
_ = interval.tick() => {
// Simulate handling requests
let count = self.request_count.fetch_add(1, Ordering::Relaxed);
info!("Server on port {} handled request #{}", self.port, count + 1);
}
}
}
Ok(())
}
async fn close(&self) -> Result<(), MadError> {
info!("Shutting down web server on port {}", self.port);
// In a real application, you might:
// - Drain existing connections
// - Save server state
// - Close database connections
tokio::time::sleep(Duration::from_millis(200)).await;
let total = self.request_count.load(Ordering::Relaxed);
info!(
"Web server on port {} shut down. Total requests handled: {}",
self.port, total
);
Ok(())
}
}
/// A background job processor that simulates processing tasks from a queue.
struct JobProcessor {
queue_name: String,
processing_interval: Duration,
}
#[async_trait]
impl Component for JobProcessor {
fn name(&self) -> Option<String> {
Some(format!("job-processor-{}", self.queue_name))
}
async fn setup(&self) -> Result<(), MadError> {
info!("Connecting to queue: {}", self.queue_name);
// Simulate connecting to a message queue
tokio::time::sleep(Duration::from_millis(150)).await;
Ok(())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
info!("Job processor for {} started", self.queue_name);
let mut interval = interval(self.processing_interval);
let mut job_count = 0;
loop {
tokio::select! {
_ = cancellation.cancelled() => {
info!("Job processor for {} stopping...", self.queue_name);
break;
}
_ = interval.tick() => {
job_count += 1;
info!("Processing job #{} from {}", job_count, self.queue_name);
// Simulate job processing
tokio::time::sleep(Duration::from_millis(100)).await;
// Simulate occasional errors (but don't fail the component)
if job_count % 10 == 0 {
warn!("Job #{} from {} required retry", job_count, self.queue_name);
}
}
}
}
info!(
"Job processor for {} processed {} jobs",
self.queue_name, job_count
);
Ok(())
}
async fn close(&self) -> Result<(), MadError> {
info!("Disconnecting from queue: {}", self.queue_name);
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
}
/// A health check component that monitors system health.
struct HealthChecker {
check_interval: Duration,
}
#[async_trait]
impl Component for HealthChecker {
fn name(&self) -> Option<String> {
Some("health-checker".to_string())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
info!("Health checker started");
let mut interval = interval(self.check_interval);
while !cancellation.is_cancelled() {
tokio::select! {
_ = cancellation.cancelled() => {
info!("Health checker stopping...");
break;
}
_ = interval.tick() => {
// Simulate health checks
let cpu_usage = rand::random::<f32>() * 100.0;
let memory_usage = rand::random::<f32>() * 100.0;
info!("System health: CPU={:.1}%, Memory={:.1}%", cpu_usage, memory_usage);
if cpu_usage > 90.0 {
warn!("High CPU usage detected: {:.1}%", cpu_usage);
}
if memory_usage > 90.0 {
warn!("High memory usage detected: {:.1}%", memory_usage);
}
}
}
}
Ok(())
}
}
/// A component that will fail after some time to demonstrate error handling.
struct FailingComponent {
fail_after: Duration,
}
#[async_trait]
impl Component for FailingComponent {
fn name(&self) -> Option<String> {
Some("failing-component".to_string())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
info!(
"Failing component started (will fail after {:?})",
self.fail_after
);
tokio::select! {
_ = cancellation.cancelled() => {
info!("Failing component cancelled before failure");
Ok(())
}
_ = tokio::time::sleep(self.fail_after) => {
error!("Failing component encountered an error!");
Err(anyhow::anyhow!("Simulated component failure").into())
}
}
}
}
/// Debug component that logs system status periodically.
struct DebugComponent;
#[async_trait]
impl Component for DebugComponent {
fn name(&self) -> Option<String> {
Some("debug-component".to_string())
}
async fn run(&self, cancel: CancellationToken) -> Result<(), MadError> {
info!("Debug mode enabled - verbose logging active");
let mut interval = interval(Duration::from_secs(5));
while !cancel.is_cancelled() {
tokio::select! {
_ = cancel.cancelled() => break,
_ = interval.tick() => {
info!("DEBUG: System running normally");
}
}
}
info!("Debug component shutting down");
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Initialize tracing for logging
tracing_subscriber::fmt()
.with_target(false)
.without_time()
.init();
info!("Starting comprehensive MAD example application");
// Check if we should enable the failing component
let enable_failure_demo = std::env::var("ENABLE_FAILURE").is_ok();
// Check if we should enable debug mode
let debug_mode = std::env::var("DEBUG").is_ok();
// Shared state for demonstration
let request_count = Arc::new(AtomicUsize::new(0));
// Build and run the application
let result = Mad::builder()
// Add web servers
.add(WebServer {
port: 8080,
request_count: request_count.clone(),
})
.add(WebServer {
port: 8081,
request_count: request_count.clone(),
})
// Add job processors
.add(JobProcessor {
queue_name: "high-priority".to_string(),
processing_interval: Duration::from_secs(2),
})
.add(JobProcessor {
queue_name: "low-priority".to_string(),
processing_interval: Duration::from_secs(5),
})
// Add health checker
.add(HealthChecker {
check_interval: Duration::from_secs(3),
})
// Conditionally add a debug component using add_fn
.add_conditional(debug_mode, DebugComponent)
// Conditionally add failing component to demonstrate error handling
.add_conditional(
enable_failure_demo,
FailingComponent {
fail_after: Duration::from_secs(10),
},
)
// Add a simple metrics reporter using add_fn
.add_fn(|cancel: CancellationToken| async move {
info!("Metrics reporter started");
let mut interval = interval(Duration::from_secs(10));
let start = std::time::Instant::now();
while !cancel.is_cancelled() {
tokio::select! {
_ = cancel.cancelled() => break,
_ = interval.tick() => {
let uptime = start.elapsed();
info!("Application uptime: {:?}", uptime);
}
}
}
info!("Metrics reporter stopped");
Ok(())
})
// Configure graceful shutdown timeout
.cancellation(Some(Duration::from_secs(5)))
// Run the application
.run()
.await;
match result {
Ok(()) => {
info!("Application shut down successfully");
Ok(())
}
Err(e) => {
error!("Application failed: {}", e);
// Check if it's an aggregate error with multiple failures
if let MadError::AggregateError(ref agg) = e {
error!("Multiple component failures detected:");
for (i, err) in agg.get_errors().iter().enumerate() {
error!(" {}. {}", i + 1, err);
}
}
Err(e.into())
}
}
}

View File

@@ -0,0 +1,15 @@
[package]
name = "mad-multi-service-example"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "multi_service"
path = "main.rs"
[dependencies]
notmad = { path = "../.." }
tokio = { version = "1", features = ["full"] }
tokio-util = "0.7"
async-trait = "0.1"
anyhow = "1"

View File

@@ -0,0 +1,214 @@
//! Example demonstrating running multiple services with MAD.
//!
//! This example shows how to run a web server, queue processor, and
//! scheduled task together, with graceful shutdown on Ctrl+C.
use async_trait::async_trait;
use notmad::{Component, Mad, MadError};
use tokio::time::{Duration, interval};
use tokio_util::sync::CancellationToken;
/// A simple web server component.
///
/// In a real application, this would bind to a port and handle HTTP requests.
/// Here we simulate it by periodically logging that we're "handling" requests.
struct WebServer {
port: u16,
}
#[async_trait]
impl Component for WebServer {
fn name(&self) -> Option<String> {
Some(format!("WebServer:{}", self.port))
}
async fn setup(&self) -> Result<(), MadError> {
println!("[{}] Binding to port...", self.name().unwrap());
// Simulate server setup time
tokio::time::sleep(Duration::from_millis(100)).await;
println!("[{}] Ready to accept connections", self.name().unwrap());
Ok(())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
println!("[{}] Server started", self.name().unwrap());
// Simulate handling requests until shutdown
let mut request_id = 0;
let mut interval = interval(Duration::from_secs(2));
while !cancellation.is_cancelled() {
tokio::select! {
_ = cancellation.cancelled() => {
println!("[{}] Shutdown signal received", self.name().unwrap());
break;
}
_ = interval.tick() => {
request_id += 1;
println!("[{}] Handling request #{}", self.name().unwrap(), request_id);
}
}
}
Ok(())
}
async fn close(&self) -> Result<(), MadError> {
println!("[{}] Closing connections...", self.name().unwrap());
// Simulate graceful connection drain
tokio::time::sleep(Duration::from_millis(200)).await;
println!("[{}] Server stopped", self.name().unwrap());
Ok(())
}
}
/// A queue processor that consumes messages from a queue.
///
/// This simulates processing messages from a message queue like
/// RabbitMQ, Kafka, or AWS SQS.
struct QueueProcessor {
queue_name: String,
}
#[async_trait]
impl Component for QueueProcessor {
fn name(&self) -> Option<String> {
Some(format!("QueueProcessor:{}", self.queue_name))
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
println!("[{}] Started processing", self.name().unwrap());
let mut message_count = 0;
// Process messages until shutdown
while !cancellation.is_cancelled() {
// Simulate waiting for and processing a message
tokio::select! {
_ = cancellation.cancelled() => {
println!("[{}] Stopping message processing", self.name().unwrap());
break;
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {
message_count += 1;
println!("[{}] Processed message #{}", self.name().unwrap(), message_count);
}
}
}
println!(
"[{}] Processed {} messages total",
self.name().unwrap(),
message_count
);
Ok(())
}
}
/// A scheduled task that runs periodically.
///
/// This could be used for tasks like:
/// - Cleaning up old data
/// - Generating reports
/// - Syncing with external systems
struct ScheduledTask {
task_name: String,
interval_secs: u64,
}
#[async_trait]
impl Component for ScheduledTask {
fn name(&self) -> Option<String> {
Some(format!("ScheduledTask:{}", self.task_name))
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
println!(
"[{}] Scheduled to run every {} seconds",
self.name().unwrap(),
self.interval_secs
);
let mut interval = interval(Duration::from_secs(self.interval_secs));
let mut run_count = 0;
while !cancellation.is_cancelled() {
tokio::select! {
_ = cancellation.cancelled() => {
println!("[{}] Scheduler stopping", self.name().unwrap());
break;
}
_ = interval.tick() => {
run_count += 1;
println!("[{}] Executing run #{}", self.name().unwrap(), run_count);
// Simulate task execution
tokio::time::sleep(Duration::from_millis(500)).await;
println!("[{}] Run #{} completed", self.name().unwrap(), run_count);
}
}
}
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
println!("Starting multi-service application");
println!("Press Ctrl+C to trigger graceful shutdown");
println!("----------------------------------------");
// Build the application with multiple services
Mad::builder()
// Add a web server on port 8080
.add(WebServer { port: 8080 })
// Add another web server on port 8081 (e.g., admin interface)
.add(WebServer { port: 8081 })
// Add queue processors for different queues
.add(QueueProcessor {
queue_name: "orders".to_string(),
})
.add(QueueProcessor {
queue_name: "notifications".to_string(),
})
// Add scheduled tasks
.add(ScheduledTask {
task_name: "cleanup".to_string(),
interval_secs: 5,
})
.add(ScheduledTask {
task_name: "report_generator".to_string(),
interval_secs: 10,
})
// Add a monitoring component using a closure
.add_fn(|cancel| async move {
println!("[Monitor] Starting system monitor");
let mut interval = interval(Duration::from_secs(15));
while !cancel.is_cancelled() {
tokio::select! {
_ = cancel.cancelled() => {
println!("[Monitor] Stopping monitor");
break;
}
_ = interval.tick() => {
println!("[Monitor] All systems operational");
}
}
}
Ok(())
})
// Set graceful shutdown timeout to 3 seconds
.cancellation(Some(Duration::from_secs(3)))
// Run all components
.run()
.await?;
println!("----------------------------------------");
println!("All services shut down successfully");
Ok(())
}

View File

@@ -1,3 +1,80 @@
//! # MAD - Lifecycle Manager for Rust Applications
//!
//! MAD is a robust lifecycle manager designed for long-running Rust operations. It provides
//! a simple, composable way to manage multiple concurrent services within your application,
//! handling graceful startup and shutdown automatically.
//!
//! ## Overview
//!
//! MAD helps you build applications composed of multiple long-running components that need
//! to be orchestrated together. It handles:
//!
//! - **Concurrent execution** of multiple components
//! - **Graceful shutdown** with cancellation tokens
//! - **Error aggregation** from multiple components
//! - **Lifecycle management** with setup, run, and close phases
//!
//! ## Quick Start
//!
//! ```rust,no_run
//! use notmad::{Component, Mad};
//! use async_trait::async_trait;
//! use tokio_util::sync::CancellationToken;
//!
//! struct MyService {
//! name: String,
//! }
//!
//! #[async_trait]
//! impl Component for MyService {
//! fn name(&self) -> Option<String> {
//! Some(self.name.clone())
//! }
//!
//! async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
//! // Your service logic here
//! while !cancellation.is_cancelled() {
//! // Do work...
//! tokio::time::sleep(std::time::Duration::from_secs(1)).await;
//! }
//! Ok(())
//! }
//! }
//!
//! #[tokio::main]
//! async fn main() -> anyhow::Result<()> {
//! Mad::builder()
//! .add(MyService { name: "service-1".into() })
//! .add(MyService { name: "service-2".into() })
//! .run()
//! .await?;
//! Ok(())
//! }
//! ```
//!
//! ## Component Lifecycle
//!
//! Components go through three phases:
//!
//! 1. **Setup**: Optional initialization phase before components start running
//! 2. **Run**: Main execution phase where components perform their work
//! 3. **Close**: Optional cleanup phase after components stop
//!
//! ## Error Handling
//!
//! MAD provides comprehensive error handling through [`MadError`], which can:
//! - Wrap errors from individual components
//! - Aggregate multiple errors when several components fail
//! - Automatically convert from `anyhow::Error`
//!
//! ## Shutdown Behavior
//!
//! MAD handles shutdown gracefully:
//! - Responds to SIGTERM and Ctrl+C signals
//! - Propagates cancellation tokens to all components
//! - Waits for components to finish cleanup
//! - Configurable cancellation timeout
use futures::stream::FuturesUnordered;
use futures_util::StreamExt;
use std::{fmt::Display, sync::Arc};
@@ -9,23 +86,43 @@ use crate::waiter::Waiter;
mod waiter;
/// Error type for MAD operations.
///
/// This enum represents all possible errors that can occur during
/// the lifecycle of MAD components.
#[derive(thiserror::Error, Debug)]
pub enum MadError {
#[error("component failed: {0}")]
/// Generic error wrapper for anyhow errors.
///
/// This variant is used when components return errors via the `?` operator
/// or when converting from `anyhow::Error`.
#[error("component: {0:#?}")]
Inner(#[source] anyhow::Error),
#[error("component(s) failed: {run}")]
/// Error that occurred during the run phase of a component.
#[error("component: {run:#?}")]
RunError { run: anyhow::Error },
/// Error that occurred during the close phase of a component.
#[error("component(s) failed: {close}")]
CloseError { close: anyhow::Error },
#[error("component(s) failed: {0}")]
/// Multiple errors from different components.
///
/// This is used when multiple components fail simultaneously,
/// allowing all errors to be reported rather than just the first one.
#[error("component(s): {0}")]
AggregateError(AggregateError),
/// Returned when a component doesn't implement the optional setup method.
///
/// This is not typically an error condition as setup is optional.
#[error("setup not defined")]
SetupNotDefined,
/// Returned when a component doesn't implement the optional close method.
///
/// This is not typically an error condition as close is optional.
#[error("close not defined")]
CloseNotDefined,
}
@@ -36,12 +133,30 @@ impl From<anyhow::Error> for MadError {
}
}
/// Container for multiple errors from different components.
///
/// When multiple components fail, their errors are collected
/// into this struct to provide complete error reporting.
#[derive(Debug)]
pub struct AggregateError {
errors: Vec<MadError>,
}
impl AggregateError {
/// Returns a slice of all contained errors.
///
/// # Example
///
/// ```rust,ignore
/// match result {
/// Err(notmad::MadError::AggregateError(agg)) => {
/// for error in agg.get_errors() {
/// eprintln!("Component error: {}", error);
/// }
/// }
/// _ => {}
/// }
/// ```
pub fn get_errors(&self) -> &[MadError] {
&self.errors
}
@@ -49,6 +164,14 @@ impl AggregateError {
impl Display for AggregateError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.errors.is_empty() {
return Ok(());
}
if self.errors.len() == 1 {
return f.write_str(&self.errors.first().unwrap().to_string());
}
f.write_str("MadError::AggregateError: (")?;
for error in &self.errors {
@@ -60,6 +183,35 @@ impl Display for AggregateError {
}
}
/// The main lifecycle manager for running multiple components.
///
/// `Mad` orchestrates the lifecycle of multiple components, ensuring they
/// start up in order, run concurrently, and shut down gracefully.
///
/// # Example
///
/// ```rust
/// use notmad::{Component, Mad};
/// use async_trait::async_trait;
/// use tokio_util::sync::CancellationToken;
///
/// struct MyComponent;
///
/// #[async_trait]
/// impl Component for MyComponent {
/// async fn run(&self, _cancel: CancellationToken) -> Result<(), notmad::MadError> {
/// Ok(())
/// }
/// }
///
/// # async fn example() -> Result<(), notmad::MadError> {
/// Mad::builder()
/// .add(MyComponent)
/// .run()
/// .await?;
/// # Ok(())
/// # }
/// ```
pub struct Mad {
components: Vec<Arc<dyn Component + Send + Sync + 'static>>,
@@ -72,6 +224,18 @@ struct CompletionResult {
}
impl Mad {
/// Creates a new `Mad` builder.
///
/// This is the entry point for constructing a MAD application.
/// Components are added using the builder pattern before calling `run()`.
///
/// # Example
///
/// ```rust
/// use notmad::Mad;
///
/// let mut app = Mad::builder();
/// ```
pub fn builder() -> Self {
Self {
components: Vec::default(),
@@ -80,12 +244,65 @@ impl Mad {
}
}
/// Adds a component to the MAD application.
///
/// Components will be set up in the order they are added,
/// run concurrently, and closed in the order they were added.
///
/// # Arguments
///
/// * `component` - Any type that implements `Component` or `IntoComponent`
///
/// # Example
///
/// ```rust
/// use notmad::{Component, Mad};
/// # use async_trait::async_trait;
/// # use tokio_util::sync::CancellationToken;
/// # struct MyService;
/// # #[async_trait]
/// # impl Component for MyService {
/// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) }
/// # }
///
/// Mad::builder()
/// .add(MyService)
/// .add(MyService);
/// ```
pub fn add(&mut self, component: impl IntoComponent) -> &mut Self {
self.components.push(component.into_component());
self
}
/// Conditionally adds a component based on a boolean condition.
///
/// If the condition is false, a waiter component is added instead,
/// which simply waits for cancellation without doing any work.
///
/// # Arguments
///
/// * `condition` - If true, adds the component; if false, adds a waiter
/// * `component` - The component to add if condition is true
///
/// # Example
///
/// ```rust
/// use notmad::Mad;
/// # use notmad::Component;
/// # use async_trait::async_trait;
/// # use tokio_util::sync::CancellationToken;
/// # struct DebugService;
/// # #[async_trait]
/// # impl Component for DebugService {
/// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) }
/// # }
///
/// let enable_debug = std::env::var("DEBUG").is_ok();
///
/// Mad::builder()
/// .add_conditional(enable_debug, DebugService);
/// ```
pub fn add_conditional(&mut self, condition: bool, component: impl IntoComponent) -> &mut Self {
if condition {
self.components.push(component.into_component());
@@ -97,12 +314,53 @@ impl Mad {
self
}
/// Adds a waiter component that does nothing but wait for cancellation.
///
/// This is useful when you need a placeholder component or want
/// the application to keep running without any specific work.
///
/// # Example
///
/// ```rust,no_run
/// # async fn example() {
/// use notmad::Mad;
///
/// Mad::builder()
/// .add_wait() // Keeps the app running until shutdown signal
/// .run()
/// .await;
/// # }
/// ```
pub fn add_wait(&mut self) -> &mut Self {
self.components.push(Waiter::default().into_component());
self
}
/// Adds a closure or function as a component.
///
/// This is a convenient way to add simple components without
/// creating a full struct that implements `Component`.
///
/// # Arguments
///
/// * `f` - A closure that takes a `CancellationToken` and returns a future
///
/// # Example
///
/// ```rust
/// use notmad::Mad;
/// use tokio_util::sync::CancellationToken;
///
/// Mad::builder()
/// .add_fn(|cancel: CancellationToken| async move {
/// while !cancel.is_cancelled() {
/// println!("Working...");
/// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
/// }
/// Ok(())
/// });
/// ```
pub fn add_fn<F, Fut>(&mut self, f: F) -> &mut Self
where
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
@@ -113,12 +371,63 @@ impl Mad {
self.add(comp)
}
/// Configures the cancellation timeout behavior.
///
/// When a shutdown signal is received, MAD will:
/// 1. Send cancellation tokens to all components
/// 2. Wait for the specified duration
/// 3. Force shutdown if components haven't stopped
///
/// # Arguments
///
/// * `should_cancel` - Duration to wait after cancellation before forcing shutdown.
/// Pass `None` to wait indefinitely.
///
/// # Example
///
/// ```rust,no_run
/// # async fn example() {
/// use notmad::Mad;
/// use std::time::Duration;
///
/// Mad::builder()
/// .cancellation(Some(Duration::from_secs(30))) // 30 second grace period
/// .run()
/// .await;
/// # }
/// ```
pub fn cancellation(&mut self, should_cancel: Option<std::time::Duration>) -> &mut Self {
self.should_cancel = should_cancel;
self
}
/// Runs all components until completion or shutdown.
///
/// This method:
/// 1. Calls `setup()` on all components (in order)
/// 2. Starts all components concurrently
/// 3. Waits for shutdown signal (SIGTERM, Ctrl+C) or component failure
/// 4. Sends cancellation to all components
/// 5. Calls `close()` on all components (in order)
///
/// # Returns
///
/// * `Ok(())` if all components shut down cleanly
/// * `Err(MadError)` if any component fails
///
/// # Example
///
/// ```rust,no_run
/// # use notmad::Mad;
/// # async fn example() -> Result<(), notmad::MadError> {
/// Mad::builder()
/// .add_wait()
/// .run()
/// .await?;
/// # Ok(())
/// # }
/// ```
pub async fn run(&mut self) -> Result<(), MadError> {
tracing::info!("running mad setup");
@@ -281,24 +590,148 @@ async fn signal_unix_terminate() {
sigterm.recv().await;
}
/// Trait for implementing MAD components.
///
/// Components represent individual services or tasks that run as part
/// of your application. Each component has its own lifecycle with
/// optional setup and cleanup phases.
///
/// # Example
///
/// ```rust
/// use notmad::{Component, MadError};
/// use async_trait::async_trait;
/// use tokio_util::sync::CancellationToken;
///
/// struct DatabaseConnection {
/// url: String,
/// }
///
/// #[async_trait]
/// impl Component for DatabaseConnection {
/// fn name(&self) -> Option<String> {
/// Some("database".to_string())
/// }
///
/// async fn setup(&self) -> Result<(), MadError> {
/// println!("Connecting to database...");
/// // Initialize connection pool
/// Ok(())
/// }
///
/// async fn run(&self, cancel: CancellationToken) -> Result<(), MadError> {
/// // Keep connection alive, handle queries
/// cancel.cancelled().await;
/// Ok(())
/// }
///
/// async fn close(&self) -> Result<(), MadError> {
/// println!("Closing database connection...");
/// // Clean up resources
/// Ok(())
/// }
/// }
/// ```
#[async_trait::async_trait]
pub trait Component {
/// Returns an optional name for the component.
///
/// This name is used in logging and error messages to identify
/// which component is being processed.
///
/// # Default
///
/// Returns `None` if not overridden.
fn name(&self) -> Option<String> {
None
}
/// Optional setup phase called before the component starts running.
///
/// Use this for initialization tasks like:
/// - Establishing database connections
/// - Loading configuration
/// - Preparing resources
///
/// # Default
///
/// Returns `MadError::SetupNotDefined` which is handled gracefully.
///
/// # Errors
///
/// If setup fails with an error other than `SetupNotDefined`,
/// the entire application will stop before any components start running.
async fn setup(&self) -> Result<(), MadError> {
Err(MadError::SetupNotDefined)
}
/// Main execution phase of the component.
///
/// This method should contain the primary logic of your component.
/// It should respect the cancellation token and shut down gracefully
/// when cancellation is requested.
///
/// # Arguments
///
/// * `cancellation_token` - Signal for graceful shutdown
///
/// # Implementation Guidelines
///
/// - Check `cancellation_token.is_cancelled()` periodically
/// - Use `tokio::select!` with `cancellation_token.cancelled()` for async operations
/// - Clean up resources before returning
///
/// # Errors
///
/// Any error returned will trigger shutdown of all other components.
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError>;
/// Optional cleanup phase called after the component stops.
///
/// Use this for cleanup tasks like:
/// - Flushing buffers
/// - Closing connections
/// - Saving state
///
/// # Default
///
/// Returns `MadError::CloseNotDefined` which is handled gracefully.
///
/// # Errors
///
/// Errors during close are logged but don't prevent other components
/// from closing.
async fn close(&self) -> Result<(), MadError> {
Err(MadError::CloseNotDefined)
}
}
/// Trait for converting types into components.
///
/// This trait is automatically implemented for all types that implement
/// `Component + Send + Sync + 'static`, allowing them to be added to MAD
/// directly without manual conversion.
///
/// # Example
///
/// ```rust
/// use notmad::{Component, IntoComponent, Mad};
/// # use async_trait::async_trait;
/// # use tokio_util::sync::CancellationToken;
///
/// struct MyService;
///
/// # #[async_trait]
/// # impl Component for MyService {
/// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) }
/// # }
///
/// // MyService automatically implements IntoComponent
/// Mad::builder()
/// .add(MyService); // Works directly
/// ```
pub trait IntoComponent {
/// Converts self into an Arc-wrapped component.
fn into_component(self) -> Arc<dyn Component + Send + Sync + 'static>;
}

View File

@@ -1,3 +1,9 @@
//! Waiter components for MAD.
//!
//! This module provides waiter components that simply wait for cancellation
//! without performing any work. Useful for keeping the application alive
//! or as placeholders in conditional component loading.
use std::sync::Arc;
use async_trait::async_trait;
@@ -5,6 +11,10 @@ use tokio_util::sync::CancellationToken;
use crate::{Component, MadError};
/// A default waiter component that panics if run.
///
/// This is used internally as a placeholder that should never
/// actually be executed.
pub struct DefaultWaiter {}
#[async_trait]
impl Component for DefaultWaiter {
@@ -13,6 +23,20 @@ impl Component for DefaultWaiter {
}
}
/// A wrapper component that waits for cancellation.
///
/// Instead of running the wrapped component's logic, this simply
/// waits for the cancellation token. This is useful for conditionally
/// disabling components while keeping the same structure.
///
/// # Example
///
/// ```rust,ignore
/// use mad::Waiter;
///
/// // Instead of running the service, just wait
/// let waiter = Waiter::new(service.into_component());
/// ```
pub struct Waiter {
comp: Arc<dyn Component + Send + Sync + 'static>,
}
@@ -26,6 +50,10 @@ impl Default for Waiter {
}
impl Waiter {
/// Creates a new waiter that wraps the given component.
///
/// The wrapped component's name will be used (prefixed with "waiter/"),
/// but its run method will not be called.
pub fn new(c: Arc<dyn Component + Send + Sync + 'static>) -> Self {
Self { comp: c }
}
@@ -33,6 +61,10 @@ impl Waiter {
#[async_trait]
impl Component for Waiter {
/// Returns the name of the waiter, prefixed with "waiter/".
///
/// If the wrapped component has a name, it will be "waiter/{name}".
/// Otherwise, returns "waiter".
fn name(&self) -> Option<String> {
match self.comp.name() {
Some(name) => Some(format!("waiter/{name}")),
@@ -40,6 +72,10 @@ impl Component for Waiter {
}
}
/// Waits for cancellation without performing any work.
///
/// This method simply waits for the cancellation token to be triggered,
/// then returns successfully.
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError> {
cancellation_token.cancelled().await;

View File

@@ -1,6 +1,6 @@
# yaml-language-server: $schema=https://git.front.kjuulh.io/kjuulh/cuddle/raw/branch/main/schemas/base.json
# yaml-language-server: $schema=https://git.kjuulh.io/kjuulh/cuddle/raw/branch/main/schemas/base.json
base: "git@git.front.kjuulh.io:kjuulh/cuddle-rust-lib-plan.git"
base: "git@git.kjuulh.io:kjuulh/cuddle-rust-lib-plan.git"
vars:
service: "mad"
@@ -12,6 +12,6 @@ please:
repository: "mad"
branch: main
settings:
api_url: "https://git.front.kjuulh.io"
api_url: "https://git.kjuulh.io"
actions:
rust: