21 Commits

Author SHA1 Message Date
3122e4495e chore(deps): update rust crate anyhow to v1.0.102
Some checks failed
continuous-integration/drone/pr Build encountered an error
continuous-integration/drone/push Build encountered an error
2026-02-20 04:43:01 +00:00
ebe5110f6f fix(deps): update rust-futures monorepo to v0.3.32 (#50)
Some checks failed
continuous-integration/drone/push Build encountered an error
2026-02-16 02:39:15 +01:00
b0f8295a81 fix(deps): update rust crate rand to 0.10.0 (#49)
Some checks failed
continuous-integration/drone/push Build encountered an error
2026-02-09 02:39:03 +01:00
e9e542b506 chore(deps): update rust crate tracing-test to v0.2.6 (#48)
Some checks failed
continuous-integration/drone/push Build encountered an error
2026-02-08 02:38:23 +01:00
92c0c22319 chore(deps): update rust crate anyhow to v1.0.101 (#47)
Some checks failed
continuous-integration/drone/push Build encountered an error
2026-02-06 02:44:37 +01:00
ea46f0f2ac feat: bump v0.11
Some checks failed
continuous-integration/drone/push Build encountered an error
Signed-off-by: kjuulh <contact@kjuulh.io>
2026-02-05 23:32:51 +01:00
689bfd1325 BREAKING: name() -> info() and removed async_trait
Some checks failed
continuous-integration/drone/push Build encountered an error
Signed-off-by: kjuulh <contact@kjuulh.io>
2026-02-05 23:32:27 +01:00
f0c90edce9 feat: replace async-trait with erased box type
Signed-off-by: kjuulh <contact@kjuulh.io>
2026-02-05 23:32:27 +01:00
5e60a272f7 fix(deps): update rust crate thiserror to v2.0.18 (#46)
Some checks failed
continuous-integration/drone/push Build encountered an error
2026-01-19 02:39:03 +01:00
34d609937d fix(deps): update rust crate tokio-util to v0.7.18 (#45)
Some checks failed
continuous-integration/drone/push Build encountered an error
2026-01-05 02:39:14 +01:00
094b14c945 chore(deps): update rust crate tokio to v1.49.0 (#44)
Some checks failed
continuous-integration/drone/push Build encountered an error
2026-01-04 02:41:05 +01:00
f6d4f846fc chore(deps): update rust crate tracing to v0.1.44 (#43)
All checks were successful
continuous-integration/drone/push Build is passing
2025-12-19 02:47:52 +01:00
7f3139f4f9 chore(deps): update tokio-tracing monorepo (#41)
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-29 02:44:04 +01:00
494ec05874 chore(release): v0.10.0 (#40)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
chore(release): 0.10.0

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: #40
2025-11-15 14:48:14 +01:00
8c0128612f feat: implement take errors
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2025-11-15 14:47:24 +01:00
cbe049b6a2 chore(release): v0.9.0 (#36)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
chore(release): 0.9.0

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: #36
2025-11-15 14:35:36 +01:00
2d6b14ad77 feat: mad not properly surfaces panics
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2025-11-15 14:33:00 +01:00
f777ec9b1e fix(deps): update all dependencies (#38)
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-14 02:38:11 +01:00
2415088792 chore(deps): update rust crate tracing-subscriber to v0.3.20 (#37)
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-13 03:15:23 +01:00
a35d15edc2 feat: add publish
All checks were successful
continuous-integration/drone/push Build is passing
2025-09-03 12:52:24 +02:00
613947ac88 feat: add readme
All checks were successful
continuous-integration/drone/push Build is passing
2025-09-03 12:40:28 +02:00
16 changed files with 851 additions and 599 deletions

View File

@@ -6,6 +6,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.10.0] - 2025-11-15
### Added
- implement take errors
## [0.9.0] - 2025-11-15
### Added
- mad not properly surfaces panics
- add publish
- add readme
### Fixed
- *(deps)* update all dependencies (#38)
### Other
- *(deps)* update rust crate tracing-subscriber to v0.3.20 (#37)
## [0.8.1] - 2025-08-09
### Other

706
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -3,10 +3,9 @@ members = ["crates/*"]
resolver = "2"
[workspace.package]
version = "0.8.1"
version = "0.11.0"
[workspace.dependencies]
mad = { path = "crates/mad" }
anyhow = { version = "1.0.71" }
tokio = { version = "1", features = ["full"] }

214
README.md
View File

@@ -4,165 +4,163 @@
[![Documentation](https://docs.rs/notmad/badge.svg)](https://docs.rs/notmad)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
## Overview
MAD is a robust lifecycle manager designed for long-running Rust operations. It provides a simple, composable way to manage multiple concurrent services within your application, handling graceful startup and shutdown automatically.
### Perfect for:
- 🌐 Web servers
- 📨 Queue consumers and message processors
- 🔌 gRPC servers
- ⏰ Cron job runners
- 🔄 Background workers
- 📡 Any long-running async operations
## Features
- **Component-based architecture** - Build your application from composable, reusable components
- **Graceful shutdown** - Automatic handling of shutdown signals with proper cleanup
- **Concurrent execution** - Run multiple components in parallel with tokio
- **Error handling** - Built-in error propagation and logging
- **Cancellation tokens** - Coordinate shutdown across all components
- **Minimal boilerplate** - Focus on your business logic, not lifecycle management
A simple lifecycle manager for long-running Rust applications. Run multiple services concurrently with graceful shutdown handling.
## Installation
Add MAD to your `Cargo.toml`:
```toml
[dependencies]
notmad = "0.7.5"
notmad = "0.10.0"
tokio = { version = "1", features = ["full"] }
async-trait = "0.1"
```
## Quick Start
Here's a simple example of a component that simulates a long-running server:
```rust
use mad::{Component, Mad};
use async_trait::async_trait;
use notmad::{Component, Mad};
use tokio_util::sync::CancellationToken;
// Define your component
struct WebServer {
port: u16,
}
struct MyService;
#[async_trait]
impl Component for WebServer {
fn name(&self) -> Option<String> {
Some(format!("WebServer on port {}", self.port))
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
println!("Starting web server on port {}", self.port);
// Your server logic here
// The cancellation token will be triggered on shutdown
tokio::select! {
_ = cancellation.cancelled() => {
println!("Shutting down web server");
}
_ = self.serve() => {
println!("Server stopped");
}
}
impl Component for MyService {
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
println!("Service running...");
cancellation.cancelled().await;
println!("Service stopped");
Ok(())
}
}
impl WebServer {
async fn serve(&self) {
// Simulate a running server
tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Build and run your application
Mad::builder()
.add(WebServer { port: 8080 })
.add(WebServer { port: 8081 }) // You can add multiple instances
.add(MyService)
.run()
.await?;
Ok(())
}
```
## Advanced Usage
## Basic Usage
### Custom Components
### Axum Web Server with Graceful Shutdown
Components can be anything that implements the `Component` trait:
Here's how to run an Axum server with MAD's graceful shutdown:
```rust
use mad::{Component, Mad};
use async_trait::async_trait;
use axum::{Router, routing::get};
use notmad::{Component, ComponentInfo};
use tokio_util::sync::CancellationToken;
struct QueueProcessor {
queue_name: String,
struct WebServer {
port: u16,
}
#[async_trait]
impl Component for QueueProcessor {
fn name(&self) -> Option<String> {
Some(format!("QueueProcessor-{}", self.queue_name))
impl Component for WebServer {
fn info(&self) -> ComponentInfo {
format!("WebServer:{}", self.port).into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
while !cancellation.is_cancelled() {
// Process messages from queue
self.process_next_message().await?;
}
async fn run(&self, cancel: CancellationToken) -> Result<(), notmad::MadError> {
let app = Router::new().route("/", get(|| async { "Hello, World!" }));
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", self.port))
.await?;
println!("Listening on http://0.0.0.0:{}", self.port);
// Run server with graceful shutdown
axum::serve(listener, app)
.with_graceful_shutdown(async move {
cancel.cancelled().await;
println!("Shutting down server...");
})
.await?;
Ok(())
}
}
```
### Error Handling
MAD provides comprehensive error handling through the `MadError` type with automatic conversion from `anyhow::Error`:
### Run Multiple Services
```rust
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
// Errors automatically convert from anyhow::Error to MadError
database_operation().await?;
// Or return explicit errors
if some_condition {
return Err(anyhow::anyhow!("Something went wrong").into());
Mad::builder()
.add(WebServer { port: 8080 })
.add(WebServer { port: 8081 })
.run()
.await?;
```
### Use Functions as Components
```rust
Mad::builder()
.add_fn(|cancel| async move {
println!("Running...");
cancel.cancelled().await;
Ok(())
})
.run()
.await?;
```
## Lifecycle Hooks
Components support optional setup and cleanup phases:
```rust
impl Component for DatabaseService {
async fn setup(&self) -> Result<(), notmad::MadError> {
println!("Connecting to database...");
Ok(())
}
async fn run(&self, cancel: CancellationToken) -> Result<(), notmad::MadError> {
cancel.cancelled().await;
Ok(())
}
async fn close(&self) -> Result<(), notmad::MadError> {
println!("Closing database connection...");
Ok(())
}
Ok(())
}
```
## Migration from v0.10
### Breaking Changes
1. **`name()``info()`**: Returns `ComponentInfo` instead of `Option<String>`
```rust
// Before
fn name(&self) -> Option<String> { Some("my-service".into()) }
// After
fn info(&self) -> ComponentInfo { "my-service".into() }
```
2. **No more `async-trait`**: Remove the dependency and `#[async_trait]` attribute
```rust
// Before
#[async_trait]
impl Component for MyService { }
// After
impl Component for MyService { }
```
## Examples
Check out the [examples directory](crates/mad/examples) for more detailed examples:
- **basic** - Simple component lifecycle
- **fn** - Using functions as components
- **signals** - Handling system signals
- **error_log** - Error handling and logging
## Contributing
Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
See [examples directory](crates/mad/examples) for complete working examples.
## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
MIT - see [LICENSE](LICENSE)
## Author
## Links
Created and maintained by [kjuulh](https://github.com/kjuulh)
## Repository
Find the source code at [https://github.com/kjuulh/mad](https://github.com/kjuulh/mad)
- [Documentation](https://docs.rs/notmad)
- [Repository](https://github.com/kjuulh/mad)
- [Crates.io](https://crates.io/crates/notmad)

View File

@@ -6,13 +6,13 @@ license = "MIT"
repository = "https://github.com/kjuulh/mad"
authors = ["kjuulh"]
edition = "2024"
readme = "../../README.md"
[dependencies]
anyhow.workspace = true
async-trait = "0.1.81"
futures = "0.3.30"
futures-util = "0.3.30"
rand = "0.9.0"
rand = "0.10.0"
thiserror = "2.0.0"
tokio.workspace = true
tokio-util = "0.7.11"

View File

@@ -1,16 +1,15 @@
use async_trait::async_trait;
use notmad::ComponentInfo;
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::Level;
struct WaitServer {}
#[async_trait]
impl notmad::Component for WaitServer {
fn name(&self) -> Option<String> {
Some("WaitServer".into())
fn info(&self) -> ComponentInfo {
"WaitServer".into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
let millis_wait = rand::thread_rng().gen_range(500..3000);
tracing::debug!("waiting: {}ms", millis_wait);

View File

@@ -7,8 +7,7 @@
//! - Graceful shutdown with cancellation tokens
//! - Concurrent component execution
use async_trait::async_trait;
use notmad::{Component, Mad, MadError};
use notmad::{Component, ComponentInfo, Mad, MadError};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::time::{Duration, interval};
@@ -21,10 +20,9 @@ struct WebServer {
request_count: Arc<AtomicUsize>,
}
#[async_trait]
impl Component for WebServer {
fn name(&self) -> Option<String> {
Some(format!("web-server-{}", self.port))
fn info(&self) -> ComponentInfo {
format!("web-server-{}", self.port).into()
}
async fn setup(&self) -> Result<(), MadError> {
@@ -81,10 +79,9 @@ struct JobProcessor {
processing_interval: Duration,
}
#[async_trait]
impl Component for JobProcessor {
fn name(&self) -> Option<String> {
Some(format!("job-processor-{}", self.queue_name))
fn info(&self) -> ComponentInfo {
format!("job-processor-{}", self.queue_name).into()
}
async fn setup(&self) -> Result<(), MadError> {
@@ -139,10 +136,9 @@ struct HealthChecker {
check_interval: Duration,
}
#[async_trait]
impl Component for HealthChecker {
fn name(&self) -> Option<String> {
Some("health-checker".to_string())
fn info(&self) -> ComponentInfo {
"health-checker".into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
@@ -181,10 +177,9 @@ struct FailingComponent {
fail_after: Duration,
}
#[async_trait]
impl Component for FailingComponent {
fn name(&self) -> Option<String> {
Some("failing-component".to_string())
fn info(&self) -> ComponentInfo {
"failing-component".into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
@@ -209,10 +204,9 @@ impl Component for FailingComponent {
/// Debug component that logs system status periodically.
struct DebugComponent;
#[async_trait]
impl Component for DebugComponent {
fn name(&self) -> Option<String> {
Some("debug-component".to_string())
fn info(&self) -> ComponentInfo {
"debug-component".into()
}
async fn run(&self, cancel: CancellationToken) -> Result<(), MadError> {

View File

@@ -1,16 +1,15 @@
use async_trait::async_trait;
use notmad::ComponentInfo;
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::Level;
struct ErrorServer {}
#[async_trait]
impl notmad::Component for ErrorServer {
fn name(&self) -> Option<String> {
Some("ErrorServer".into())
fn info(&self) -> ComponentInfo {
"ErrorServer".into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
let millis_wait = rand::thread_rng().gen_range(500..3000);
tracing::debug!("waiting: {}ms", millis_wait);

View File

@@ -1,13 +1,12 @@
use async_trait::async_trait;
use notmad::ComponentInfo;
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::Level;
struct WaitServer {}
#[async_trait]
impl notmad::Component for WaitServer {
fn name(&self) -> Option<String> {
Some("WaitServer".into())
fn info(&self) -> ComponentInfo {
"WaitServer".into()
}
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {

View File

@@ -3,8 +3,7 @@
//! This example shows how to run a web server, queue processor, and
//! scheduled task together, with graceful shutdown on Ctrl+C.
use async_trait::async_trait;
use notmad::{Component, Mad, MadError};
use notmad::{Component, ComponentInfo, Mad, MadError};
use tokio::time::{Duration, interval};
use tokio_util::sync::CancellationToken;
@@ -16,22 +15,21 @@ struct WebServer {
port: u16,
}
#[async_trait]
impl Component for WebServer {
fn name(&self) -> Option<String> {
Some(format!("WebServer:{}", self.port))
fn info(&self) -> ComponentInfo {
format!("WebServer:{}", self.port).into()
}
async fn setup(&self) -> Result<(), MadError> {
println!("[{}] Binding to port...", self.name().unwrap());
println!("[{}] Binding to port...", self.info());
// Simulate server setup time
tokio::time::sleep(Duration::from_millis(100)).await;
println!("[{}] Ready to accept connections", self.name().unwrap());
println!("[{}] Ready to accept connections", self.info());
Ok(())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
println!("[{}] Server started", self.name().unwrap());
println!("[{}] Server started", self.info());
// Simulate handling requests until shutdown
let mut request_id = 0;
@@ -40,12 +38,12 @@ impl Component for WebServer {
while !cancellation.is_cancelled() {
tokio::select! {
_ = cancellation.cancelled() => {
println!("[{}] Shutdown signal received", self.name().unwrap());
println!("[{}] Shutdown signal received", self.info());
break;
}
_ = interval.tick() => {
request_id += 1;
println!("[{}] Handling request #{}", self.name().unwrap(), request_id);
println!("[{}] Handling request #{}", self.info(), request_id);
}
}
}
@@ -54,10 +52,10 @@ impl Component for WebServer {
}
async fn close(&self) -> Result<(), MadError> {
println!("[{}] Closing connections...", self.name().unwrap());
println!("[{}] Closing connections...", self.info());
// Simulate graceful connection drain
tokio::time::sleep(Duration::from_millis(200)).await;
println!("[{}] Server stopped", self.name().unwrap());
println!("[{}] Server stopped", self.info());
Ok(())
}
}
@@ -70,14 +68,13 @@ struct QueueProcessor {
queue_name: String,
}
#[async_trait]
impl Component for QueueProcessor {
fn name(&self) -> Option<String> {
Some(format!("QueueProcessor:{}", self.queue_name))
fn info(&self) -> ComponentInfo {
format!("QueueProcessor:{}", self.queue_name).into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
println!("[{}] Started processing", self.name().unwrap());
println!("[{}] Started processing", self.info());
let mut message_count = 0;
@@ -86,19 +83,19 @@ impl Component for QueueProcessor {
// Simulate waiting for and processing a message
tokio::select! {
_ = cancellation.cancelled() => {
println!("[{}] Stopping message processing", self.name().unwrap());
println!("[{}] Stopping message processing", self.info());
break;
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {
message_count += 1;
println!("[{}] Processed message #{}", self.name().unwrap(), message_count);
println!("[{}] Processed message #{}", self.info(), message_count);
}
}
}
println!(
"[{}] Processed {} messages total",
self.name().unwrap(),
self.info(),
message_count
);
Ok(())
@@ -116,16 +113,15 @@ struct ScheduledTask {
interval_secs: u64,
}
#[async_trait]
impl Component for ScheduledTask {
fn name(&self) -> Option<String> {
Some(format!("ScheduledTask:{}", self.task_name))
fn info(&self) -> ComponentInfo {
format!("ScheduledTask:{}", self.task_name).into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
println!(
"[{}] Scheduled to run every {} seconds",
self.name().unwrap(),
self.info(),
self.interval_secs
);
@@ -135,17 +131,17 @@ impl Component for ScheduledTask {
while !cancellation.is_cancelled() {
tokio::select! {
_ = cancellation.cancelled() => {
println!("[{}] Scheduler stopping", self.name().unwrap());
println!("[{}] Scheduler stopping", self.info());
break;
}
_ = interval.tick() => {
run_count += 1;
println!("[{}] Executing run #{}", self.name().unwrap(), run_count);
println!("[{}] Executing run #{}", self.info(), run_count);
// Simulate task execution
tokio::time::sleep(Duration::from_millis(500)).await;
println!("[{}] Run #{} completed", self.name().unwrap(), run_count);
println!("[{}] Run #{} completed", self.info(), run_count);
}
}
}

View File

@@ -1,14 +1,13 @@
use async_trait::async_trait;
use notmad::ComponentInfo;
use tokio_util::sync::CancellationToken;
struct NestedErrorComponent {
name: String,
}
#[async_trait]
impl notmad::Component for NestedErrorComponent {
fn name(&self) -> Option<String> {
Some(self.name.clone())
fn info(&self) -> ComponentInfo {
self.name.clone().into()
}
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
@@ -28,10 +27,9 @@ impl notmad::Component for NestedErrorComponent {
struct AnotherFailingComponent;
#[async_trait]
impl notmad::Component for AnotherFailingComponent {
fn name(&self) -> Option<String> {
Some("another-component".into())
fn info(&self) -> ComponentInfo {
"another-component".into()
}
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {

View File

@@ -1,16 +1,15 @@
use async_trait::async_trait;
use notmad::ComponentInfo;
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::Level;
struct WaitServer {}
#[async_trait]
impl notmad::Component for WaitServer {
fn name(&self) -> Option<String> {
Some("WaitServer".into())
fn info(&self) -> ComponentInfo {
"WaitServer".into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
let millis_wait = rand::thread_rng().gen_range(500..3000);
tracing::debug!("waiting: {}ms", millis_wait);
@@ -23,10 +22,9 @@ impl notmad::Component for WaitServer {
}
struct RespectCancel {}
#[async_trait]
impl notmad::Component for RespectCancel {
fn name(&self) -> Option<String> {
Some("RespectCancel".into())
fn info(&self) -> ComponentInfo {
"RespectCancel".into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
@@ -38,13 +36,12 @@ impl notmad::Component for RespectCancel {
}
struct NeverStopServer {}
#[async_trait]
impl notmad::Component for NeverStopServer {
fn name(&self) -> Option<String> {
Some("NeverStopServer".into())
fn info(&self) -> ComponentInfo {
"NeverStopServer".into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(999999999)).await;

View File

@@ -1,42 +1,21 @@
//! # MAD - Lifecycle Manager for Rust Applications
//!
//! MAD is a robust lifecycle manager designed for long-running Rust operations. It provides
//! a simple, composable way to manage multiple concurrent services within your application,
//! handling graceful startup and shutdown automatically.
//!
//! ## Overview
//!
//! MAD helps you build applications composed of multiple long-running components that need
//! to be orchestrated together. It handles:
//!
//! - **Concurrent execution** of multiple components
//! - **Graceful shutdown** with cancellation tokens
//! - **Error aggregation** from multiple components
//! - **Lifecycle management** with setup, run, and close phases
//! A simple lifecycle manager for long-running Rust applications. Run multiple services
//! concurrently with graceful shutdown handling.
//!
//! ## Quick Start
//!
//! ```rust,no_run
//! use notmad::{Component, Mad};
//! use async_trait::async_trait;
//! use tokio_util::sync::CancellationToken;
//!
//! struct MyService {
//! name: String,
//! }
//! struct MyService;
//!
//! #[async_trait]
//! impl Component for MyService {
//! fn name(&self) -> Option<String> {
//! Some(self.name.clone())
//! }
//!
//! async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
//! // Your service logic here
//! while !cancellation.is_cancelled() {
//! // Do work...
//! tokio::time::sleep(std::time::Duration::from_secs(1)).await;
//! }
//! async fn run(&self, cancel: CancellationToken) -> Result<(), notmad::MadError> {
//! println!("Running...");
//! cancel.cancelled().await;
//! println!("Stopped");
//! Ok(())
//! }
//! }
@@ -44,40 +23,24 @@
//! #[tokio::main]
//! async fn main() -> anyhow::Result<()> {
//! Mad::builder()
//! .add(MyService { name: "service-1".into() })
//! .add(MyService { name: "service-2".into() })
//! .add(MyService)
//! .run()
//! .await?;
//! Ok(())
//! }
//! ```
//!
//! ## Component Lifecycle
//! ## Features
//!
//! Components go through three phases:
//!
//! 1. **Setup**: Optional initialization phase before components start running
//! 2. **Run**: Main execution phase where components perform their work
//! 3. **Close**: Optional cleanup phase after components stop
//!
//! ## Error Handling
//!
//! MAD provides comprehensive error handling through [`MadError`], which can:
//! - Wrap errors from individual components
//! - Aggregate multiple errors when several components fail
//! - Automatically convert from `anyhow::Error`
//!
//! ## Shutdown Behavior
//!
//! MAD handles shutdown gracefully:
//! - Responds to SIGTERM and Ctrl+C signals
//! - Propagates cancellation tokens to all components
//! - Waits for components to finish cleanup
//! - Configurable cancellation timeout
//! - Run multiple components concurrently
//! - Graceful shutdown with cancellation tokens
//! - Optional lifecycle hooks: `setup()`, `run()`, `close()`
//! - Automatic error aggregation
//! - SIGTERM and Ctrl+C signal handling
use futures::stream::FuturesUnordered;
use futures_util::StreamExt;
use std::{fmt::Display, sync::Arc, error::Error};
use std::{error::Error, fmt::Display, pin::Pin, sync::Arc};
use tokio::signal::unix::{SignalKind, signal};
use tokio_util::sync::CancellationToken;
@@ -101,15 +64,13 @@ pub enum MadError {
/// Error that occurred during the run phase of a component.
#[error(transparent)]
RunError {
run: anyhow::Error
},
RunError { run: anyhow::Error },
/// Error that occurred during the close phase of a component.
#[error("component(s) failed during close")]
CloseError {
CloseError {
#[source]
close: anyhow::Error
close: anyhow::Error,
},
/// Multiple errors from different components.
@@ -165,6 +126,10 @@ impl AggregateError {
pub fn get_errors(&self) -> &[MadError] {
&self.errors
}
pub fn take_errors(self) -> Vec<MadError> {
self.errors
}
}
impl Display for AggregateError {
@@ -180,7 +145,7 @@ impl Display for AggregateError {
writeln!(f, "{} component errors occurred:", self.errors.len())?;
for (i, error) in self.errors.iter().enumerate() {
write!(f, "\n[Component {}] {}", i + 1, error)?;
// Print the error chain for each component error
let mut source = error.source();
let mut level = 1;
@@ -203,12 +168,10 @@ impl Display for AggregateError {
///
/// ```rust
/// use notmad::{Component, Mad};
/// use async_trait::async_trait;
/// use tokio_util::sync::CancellationToken;
///
/// struct MyComponent;
///
/// #[async_trait]
/// impl Component for MyComponent {
/// async fn run(&self, _cancel: CancellationToken) -> Result<(), notmad::MadError> {
/// Ok(())
@@ -224,7 +187,7 @@ impl Display for AggregateError {
/// # }
/// ```
pub struct Mad {
components: Vec<Arc<dyn Component + Send + Sync + 'static>>,
components: Vec<SharedComponent>,
should_cancel: Option<std::time::Duration>,
}
@@ -268,10 +231,8 @@ impl Mad {
///
/// ```rust
/// use notmad::{Component, Mad};
/// # use async_trait::async_trait;
/// # use tokio_util::sync::CancellationToken;
/// # struct MyService;
/// # #[async_trait]
/// # impl Component for MyService {
/// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) }
/// # }
@@ -301,10 +262,8 @@ impl Mad {
/// ```rust
/// use notmad::Mad;
/// # use notmad::Component;
/// # use async_trait::async_trait;
/// # use tokio_util::sync::CancellationToken;
/// # struct DebugService;
/// # #[async_trait]
/// # impl Component for DebugService {
/// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) }
/// # }
@@ -392,7 +351,7 @@ impl Mad {
/// # Arguments
///
/// * `should_cancel` - Duration to wait after cancellation before forcing shutdown.
/// Pass `None` to wait indefinitely.
/// Pass `None` to wait indefinitely.
///
/// # Example
///
@@ -467,7 +426,7 @@ impl Mad {
tracing::debug!("setting up components");
for comp in &self.components {
tracing::trace!(component = &comp.name(), "mad setting up");
tracing::trace!(component = %comp.info(), "mad setting up");
match comp.setup().await {
Ok(_) | Err(MadError::SetupNotDefined) => {}
@@ -497,16 +456,37 @@ impl Mad {
channels.push(error_rx);
tokio::spawn(async move {
let name = comp.name().clone();
let info = comp.info().clone();
tracing::debug!(component = name, "mad running");
tracing::debug!(component = %info, "mad running");
let handle = tokio::spawn(async move { comp.run(job_cancellation).await });
tokio::select! {
_ = cancellation_token.cancelled() => {
error_tx.send(CompletionResult { res: Ok(()) , name }).await
error_tx.send(CompletionResult { res: Ok(()) , name: info.name }).await
}
res = comp.run(job_cancellation) => {
error_tx.send(CompletionResult { res , name }).await
res = handle => {
let res = match res {
Ok(res) => res,
Err(join) => {
match join.source() {
Some(error) => {
Err(MadError::RunError{run: anyhow::anyhow!("component aborted: {:?}", error)})
},
None => {
if join.is_panic(){
Err(MadError::RunError { run: anyhow::anyhow!("component panicked: {}", join) })
} else {
Err(MadError::RunError { run: anyhow::anyhow!("component faced unknown error: {}", join) })
}
},
}
},
};
error_tx.send(CompletionResult { res , name: info.name }).await
}
}
});
@@ -583,7 +563,7 @@ impl Mad {
tracing::debug!("closing components");
for comp in &self.components {
tracing::trace!(component = &comp.name(), "mad closing");
tracing::trace!(component = %comp.info(), "mad closing");
match comp.close().await {
Ok(_) | Err(MadError::CloseNotDefined) => {}
Err(e) => return Err(e),
@@ -601,6 +581,46 @@ async fn signal_unix_terminate() {
sigterm.recv().await;
}
#[derive(Default, Clone)]
pub struct ComponentInfo {
name: Option<String>,
}
impl Display for ComponentInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(
self.name
.as_ref()
.map(|n| n.as_str())
.unwrap_or_else(|| "unknown"),
)
}
}
impl ComponentInfo {
pub fn new() -> Self {
Self::default()
}
pub fn with_name(&mut self, name: impl Into<String>) -> &mut Self {
self.name = Some(name.into());
self
}
}
impl From<String> for ComponentInfo {
fn from(value: String) -> Self {
Self { name: Some(value) }
}
}
impl From<&str> for ComponentInfo {
fn from(value: &str) -> Self {
Self {
name: Some(value.into()),
}
}
}
/// Trait for implementing MAD components.
///
/// Components represent individual services or tasks that run as part
@@ -610,18 +630,16 @@ async fn signal_unix_terminate() {
/// # Example
///
/// ```rust
/// use notmad::{Component, MadError};
/// use async_trait::async_trait;
/// use notmad::{Component, ComponentInfo, MadError};
/// use tokio_util::sync::CancellationToken;
///
/// struct DatabaseConnection {
/// url: String,
/// }
///
/// #[async_trait]
/// impl Component for DatabaseConnection {
/// fn name(&self) -> Option<String> {
/// Some("database".to_string())
/// fn info(&self) -> ComponentInfo {
/// "database".into()
/// }
///
/// async fn setup(&self) -> Result<(), MadError> {
@@ -643,8 +661,7 @@ async fn signal_unix_terminate() {
/// }
/// }
/// ```
#[async_trait::async_trait]
pub trait Component {
pub trait Component: Send + Sync + 'static {
/// Returns an optional name for the component.
///
/// This name is used in logging and error messages to identify
@@ -653,8 +670,8 @@ pub trait Component {
/// # Default
///
/// Returns `None` if not overridden.
fn name(&self) -> Option<String> {
None
fn info(&self) -> ComponentInfo {
ComponentInfo::default()
}
/// Optional setup phase called before the component starts running.
@@ -672,8 +689,8 @@ pub trait Component {
///
/// If setup fails with an error other than `SetupNotDefined`,
/// the entire application will stop before any components start running.
async fn setup(&self) -> Result<(), MadError> {
Err(MadError::SetupNotDefined)
fn setup(&self) -> impl Future<Output = Result<(), MadError>> + Send + '_ {
async { Err(MadError::SetupNotDefined) }
}
/// Main execution phase of the component.
@@ -695,7 +712,10 @@ pub trait Component {
/// # Errors
///
/// Any error returned will trigger shutdown of all other components.
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError>;
fn run(
&self,
cancellation_token: CancellationToken,
) -> impl Future<Output = Result<(), MadError>> + Send + '_;
/// Optional cleanup phase called after the component stops.
///
@@ -712,8 +732,73 @@ pub trait Component {
///
/// Errors during close are logged but don't prevent other components
/// from closing.
fn close(&self) -> impl Future<Output = Result<(), MadError>> + Send + '_ {
async { Err(MadError::CloseNotDefined) }
}
}
trait AsyncComponent: Send + Sync + 'static {
fn info_async(&self) -> ComponentInfo;
fn setup_async(&self) -> Pin<Box<dyn Future<Output = Result<(), MadError>> + Send + '_>>;
fn run_async(
&self,
cancellation_token: CancellationToken,
) -> Pin<Box<dyn Future<Output = Result<(), MadError>> + Send + '_>>;
fn close_async(&self) -> Pin<Box<dyn Future<Output = Result<(), MadError>> + Send + '_>>;
}
impl<E: Component> AsyncComponent for E {
#[inline(always)]
fn info_async(&self) -> ComponentInfo {
self.info()
}
#[inline(always)]
fn setup_async(&self) -> Pin<Box<dyn Future<Output = Result<(), MadError>> + Send + '_>> {
Box::pin(self.setup())
}
#[inline(always)]
fn run_async(
&self,
cancellation_token: CancellationToken,
) -> Pin<Box<dyn Future<Output = Result<(), MadError>> + Send + '_>> {
Box::pin(self.run(cancellation_token))
}
#[inline(always)]
fn close_async(&self) -> Pin<Box<dyn Future<Output = Result<(), MadError>> + Send + '_>> {
Box::pin(self.close())
}
}
#[derive(Clone)]
pub struct SharedComponent {
component: Arc<dyn AsyncComponent + Send + Sync + 'static>,
}
impl SharedComponent {
#[inline(always)]
pub fn info(&self) -> ComponentInfo {
self.component.info_async()
}
#[inline(always)]
async fn setup(&self) -> Result<(), MadError> {
self.component.setup_async().await
}
#[inline(always)]
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError> {
self.component.run_async(cancellation_token).await
}
#[inline(always)]
async fn close(&self) -> Result<(), MadError> {
Err(MadError::CloseNotDefined)
self.component.close_async().await
}
}
@@ -727,12 +812,10 @@ pub trait Component {
///
/// ```rust
/// use notmad::{Component, IntoComponent, Mad};
/// # use async_trait::async_trait;
/// # use tokio_util::sync::CancellationToken;
///
/// struct MyService;
///
/// # #[async_trait]
/// # impl Component for MyService {
/// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) }
/// # }
@@ -743,12 +826,14 @@ pub trait Component {
/// ```
pub trait IntoComponent {
/// Converts self into an Arc-wrapped component.
fn into_component(self) -> Arc<dyn Component + Send + Sync + 'static>;
fn into_component(self) -> SharedComponent;
}
impl<T: Component + Send + Sync + 'static> IntoComponent for T {
fn into_component(self) -> Arc<dyn Component + Send + Sync + 'static> {
Arc::new(self)
impl<T: Component> IntoComponent for T {
fn into_component(self) -> SharedComponent {
SharedComponent {
component: Arc::new(self),
}
}
}
@@ -772,7 +857,6 @@ where
}
}
#[async_trait::async_trait]
impl<F, Fut> Component for ClosureComponent<F, Fut>
where
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
@@ -786,7 +870,6 @@ where
#[cfg(test)]
mod tests {
use super::*;
use anyhow::Context;
#[test]
fn test_error_chaining_display() {
@@ -796,13 +879,13 @@ mod tests {
.context("failed to read configuration")
.context("unable to initialize database")
.context("service startup failed");
let mad_error = MadError::Inner(error);
let display = format!("{}", mad_error);
// Should display the top-level error message
assert!(display.contains("service startup failed"));
// Test error chain iteration
if let MadError::Inner(ref e) = mad_error {
let chain: Vec<String> = e.chain().map(|c| c.to_string()).collect();
@@ -818,26 +901,26 @@ mod tests {
fn test_aggregate_error_display() {
let error1 = MadError::Inner(
anyhow::anyhow!("database connection failed")
.context("failed to connect to PostgreSQL")
.context("failed to connect to PostgreSQL"),
);
let error2 = MadError::Inner(
anyhow::anyhow!("port already in use")
.context("failed to bind to port 8080")
.context("web server initialization failed")
.context("web server initialization failed"),
);
let aggregate = MadError::AggregateError(AggregateError {
errors: vec![error1, error2],
});
let display = format!("{}", aggregate);
// Check that it shows multiple errors
assert!(display.contains("2 component errors occurred"));
assert!(display.contains("[Component 1]"));
assert!(display.contains("[Component 2]"));
// Check that context chains are displayed
assert!(display.contains("failed to connect to PostgreSQL"));
assert!(display.contains("database connection failed"));
@@ -852,7 +935,7 @@ mod tests {
let aggregate = AggregateError {
errors: vec![error],
};
let display = format!("{}", aggregate);
// Single error should be displayed directly
assert!(display.contains("single error"));
@@ -864,9 +947,9 @@ mod tests {
let error = MadError::Inner(
anyhow::anyhow!("root cause")
.context("middle layer")
.context("top layer")
.context("top layer"),
);
// Test that we can access the error chain
if let MadError::Inner(ref e) = error {
let chain: Vec<String> = e.chain().map(|c| c.to_string()).collect();
@@ -882,13 +965,12 @@ mod tests {
#[tokio::test]
async fn test_component_error_propagation() {
struct FailingComponent;
#[async_trait::async_trait]
impl Component for FailingComponent {
fn name(&self) -> Option<String> {
Some("test-component".to_string())
fn info(&self) -> ComponentInfo {
"test-component".into()
}
async fn run(&self, _cancel: CancellationToken) -> Result<(), MadError> {
Err(anyhow::anyhow!("IO error")
.context("failed to open file")
@@ -896,16 +978,16 @@ mod tests {
.into())
}
}
let result = Mad::builder()
.add(FailingComponent)
.cancellation(Some(std::time::Duration::from_millis(100)))
.run()
.await;
assert!(result.is_err());
let error = result.unwrap_err();
// Check error display
let display = format!("{}", error);
assert!(display.contains("component initialization failed"));

View File

@@ -4,19 +4,15 @@
//! without performing any work. Useful for keeping the application alive
//! or as placeholders in conditional component loading.
use std::sync::Arc;
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
use crate::{Component, MadError};
use crate::{Component, ComponentInfo, IntoComponent, MadError, SharedComponent};
/// A default waiter component that panics if run.
///
/// This is used internally as a placeholder that should never
/// actually be executed.
pub struct DefaultWaiter {}
#[async_trait]
pub struct DefaultWaiter;
impl Component for DefaultWaiter {
async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), MadError> {
panic!("should never be called");
@@ -38,13 +34,13 @@ impl Component for DefaultWaiter {
/// let waiter = Waiter::new(service.into_component());
/// ```
pub struct Waiter {
comp: Arc<dyn Component + Send + Sync + 'static>,
comp: SharedComponent,
}
impl Default for Waiter {
fn default() -> Self {
Self {
comp: Arc::new(DefaultWaiter {}),
comp: DefaultWaiter {}.into_component(),
}
}
}
@@ -54,21 +50,20 @@ impl Waiter {
///
/// The wrapped component's name will be used (prefixed with "waiter/"),
/// but its run method will not be called.
pub fn new(c: Arc<dyn Component + Send + Sync + 'static>) -> Self {
pub fn new(c: SharedComponent) -> Self {
Self { comp: c }
}
}
#[async_trait]
impl Component for Waiter {
/// Returns the name of the waiter, prefixed with "waiter/".
///
/// If the wrapped component has a name, it will be "waiter/{name}".
/// Otherwise, returns "waiter".
fn name(&self) -> Option<String> {
match self.comp.name() {
Some(name) => Some(format!("waiter/{name}")),
None => Some("waiter".into()),
fn info(&self) -> ComponentInfo {
match &self.comp.info().name {
Some(name) => format!("waiter/{name}").into(),
None => "waiter".into(),
}
}

View File

@@ -1,7 +1,6 @@
use std::sync::Arc;
use async_trait::async_trait;
use notmad::{Component, Mad, MadError};
use notmad::{Component, ComponentInfo, Mad, MadError};
use rand::Rng;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
@@ -9,13 +8,12 @@ use tracing_test::traced_test;
struct NeverEndingRun {}
#[async_trait]
impl Component for NeverEndingRun {
fn name(&self) -> Option<String> {
Some("NeverEndingRun".into())
fn info(&self) -> ComponentInfo {
"NeverEndingRun".into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
let millis_wait = rand::thread_rng().gen_range(50..1000);
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
@@ -138,6 +136,30 @@ async fn test_can_shutdown_gracefully() -> anyhow::Result<()> {
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_component_panics_shutdowns_cleanly() -> anyhow::Result<()> {
let res = Mad::builder()
.add_fn({
move |_cancel| async move {
panic!("my inner panic");
}
})
.add_fn(|cancel| async move {
cancel.cancelled().await;
Ok(())
})
.run()
.await;
let err_content = res.unwrap_err().to_string();
assert!(err_content.contains("component panicked"));
assert!(err_content.contains("my inner panic"));
Ok(())
}
#[test]
fn test_can_easily_transform_error() -> anyhow::Result<()> {
fn fallible() -> anyhow::Result<()> {

View File

@@ -5,6 +5,8 @@ base: "git@git.kjuulh.io:kjuulh/cuddle-rust-lib-plan.git"
vars:
service: "mad"
registry: kasperhermansen
rust:
publish: {}
please:
project: