57 Commits

Author SHA1 Message Date
3122e4495e chore(deps): update rust crate anyhow to v1.0.102
Some checks failed
continuous-integration/drone/pr Build encountered an error
continuous-integration/drone/push Build encountered an error
2026-02-20 04:43:01 +00:00
ebe5110f6f fix(deps): update rust-futures monorepo to v0.3.32 (#50)
Some checks failed
continuous-integration/drone/push Build encountered an error
2026-02-16 02:39:15 +01:00
b0f8295a81 fix(deps): update rust crate rand to 0.10.0 (#49)
Some checks failed
continuous-integration/drone/push Build encountered an error
2026-02-09 02:39:03 +01:00
e9e542b506 chore(deps): update rust crate tracing-test to v0.2.6 (#48)
Some checks failed
continuous-integration/drone/push Build encountered an error
2026-02-08 02:38:23 +01:00
92c0c22319 chore(deps): update rust crate anyhow to v1.0.101 (#47)
Some checks failed
continuous-integration/drone/push Build encountered an error
2026-02-06 02:44:37 +01:00
ea46f0f2ac feat: bump v0.11
Some checks failed
continuous-integration/drone/push Build encountered an error
Signed-off-by: kjuulh <contact@kjuulh.io>
2026-02-05 23:32:51 +01:00
689bfd1325 BREAKING: name() -> info() and removed async_trait
Some checks failed
continuous-integration/drone/push Build encountered an error
Signed-off-by: kjuulh <contact@kjuulh.io>
2026-02-05 23:32:27 +01:00
f0c90edce9 feat: replace async-trait with erased box type
Signed-off-by: kjuulh <contact@kjuulh.io>
2026-02-05 23:32:27 +01:00
5e60a272f7 fix(deps): update rust crate thiserror to v2.0.18 (#46)
Some checks failed
continuous-integration/drone/push Build encountered an error
2026-01-19 02:39:03 +01:00
34d609937d fix(deps): update rust crate tokio-util to v0.7.18 (#45)
Some checks failed
continuous-integration/drone/push Build encountered an error
2026-01-05 02:39:14 +01:00
094b14c945 chore(deps): update rust crate tokio to v1.49.0 (#44)
Some checks failed
continuous-integration/drone/push Build encountered an error
2026-01-04 02:41:05 +01:00
f6d4f846fc chore(deps): update rust crate tracing to v0.1.44 (#43)
All checks were successful
continuous-integration/drone/push Build is passing
2025-12-19 02:47:52 +01:00
7f3139f4f9 chore(deps): update tokio-tracing monorepo (#41)
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-29 02:44:04 +01:00
494ec05874 chore(release): v0.10.0 (#40)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
chore(release): 0.10.0

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: #40
2025-11-15 14:48:14 +01:00
8c0128612f feat: implement take errors
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2025-11-15 14:47:24 +01:00
cbe049b6a2 chore(release): v0.9.0 (#36)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
chore(release): 0.9.0

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: #36
2025-11-15 14:35:36 +01:00
2d6b14ad77 feat: mad not properly surfaces panics
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2025-11-15 14:33:00 +01:00
f777ec9b1e fix(deps): update all dependencies (#38)
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-14 02:38:11 +01:00
2415088792 chore(deps): update rust crate tracing-subscriber to v0.3.20 (#37)
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-13 03:15:23 +01:00
a35d15edc2 feat: add publish
All checks were successful
continuous-integration/drone/push Build is passing
2025-09-03 12:52:24 +02:00
613947ac88 feat: add readme
All checks were successful
continuous-integration/drone/push Build is passing
2025-09-03 12:40:28 +02:00
145e067454 chore(release): v0.8.1 (#35)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
chore(release): 0.8.1

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: #35
2025-08-09 15:33:46 +02:00
82de5b260f improve: error logging
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2025-08-09 15:22:47 +02:00
eb360e565c chore(release): v0.8.0 (#34)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
chore(release): 0.8.0

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: #34
2025-08-09 14:57:12 +02:00
c18c8a885c feat: add docs
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2025-08-08 23:06:23 +02:00
762da1e672 feat: update readme
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2025-08-07 11:29:29 +02:00
3bc512ab48 chore(release): v0.7.5 (#33)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
chore(release): 0.7.5

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: https://git.front.kjuulh.io/kjuulh/mad/pulls/33
2025-07-24 22:44:46 +02:00
7d8071d41b feat: print big inner
All checks were successful
continuous-integration/drone/push Build is passing
2025-07-24 22:44:13 +02:00
1cc4138ec7 chore: more error correction
All checks were successful
continuous-integration/drone/push Build is passing
2025-07-24 22:36:39 +02:00
00517daaaa chore: correct error test to not be as verbose 2025-07-24 22:27:04 +02:00
b941dc9a76 chore(release): v0.7.4 (#32)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
chore(release): 0.7.4

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: https://git.front.kjuulh.io/kjuulh/mad/pulls/32
2025-07-24 22:23:00 +02:00
5da3c83c12 feat: cleanup aggregate error for single error
All checks were successful
continuous-integration/drone/push Build is passing
2025-07-24 22:22:12 +02:00
a16bee8e37 chore(release): v0.7.3 (#31)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
chore(release): 0.7.3

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: https://git.front.kjuulh.io/kjuulh/mad/pulls/31
2025-07-24 21:09:49 +02:00
a61f00a79d feat: automatic conversion from anyhow::Error and access to aggregate errors
All checks were successful
continuous-integration/drone/push Build is passing
2025-07-24 21:07:35 +02:00
2bd9bd7b8e fix(deps): update all dependencies (#30)
All checks were successful
continuous-integration/drone/push Build is passing
2025-07-21 05:37:49 +02:00
c79ff2fde0 chore(release): v0.7.2 (#14)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
chore(release): 0.7.2

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: https://git.front.kjuulh.io/kjuulh/mad/pulls/14
2025-06-25 09:54:55 +02:00
c29a84d15e chore
All checks were successful
continuous-integration/drone/push Build is passing
2025-06-25 09:48:35 +02:00
01274c1364 feat: add wait 2025-06-25 09:47:34 +02:00
9c3f2cb7f7 feat: add conditional, allows adding or waiting for close 2025-06-25 09:44:45 +02:00
9489f1a5a8 chore(deps): update all dependencies (#29)
All checks were successful
continuous-integration/drone/push Build is passing
2025-03-04 02:57:08 +01:00
f6aba7fac6 fix(deps): update rust crate async-trait to v0.1.86 (#28)
All checks were successful
continuous-integration/drone/push Build is passing
2025-02-01 06:51:54 +01:00
772366e267 fix(deps): update rust crate rand to 0.9.0 (#27)
All checks were successful
continuous-integration/drone/push Build is passing
2025-01-28 02:46:27 +01:00
1e08ee3dbb fix(deps): update rust crate thiserror to v2.0.11 (#26)
All checks were successful
continuous-integration/drone/push Build is passing
2025-01-11 02:40:13 +01:00
78f0c4057a fix(deps): update all dependencies (#25)
All checks were successful
continuous-integration/drone/push Build is passing
2025-01-09 02:50:22 +01:00
cf5d5268f6 fix(deps): update rust crate async-trait to v0.1.84 (#24)
All checks were successful
continuous-integration/drone/push Build is passing
2025-01-07 02:45:22 +01:00
ce2479f6ca chore(deps): update rust crate anyhow to v1.0.95 (#23)
All checks were successful
continuous-integration/drone/push Build is passing
2024-12-23 02:52:27 +01:00
82d4699bca fix(deps): update rust crate thiserror to v2.0.9 (#22)
All checks were successful
continuous-integration/drone/push Build is passing
2024-12-22 06:35:32 +01:00
5ab7cae1fe fix(deps): update rust crate thiserror to v2.0.8 (#21)
All checks were successful
continuous-integration/drone/push Build is passing
2024-12-18 06:34:52 +01:00
f049750e4c fix(deps): update rust crate thiserror to v2.0.7 (#20)
All checks were successful
continuous-integration/drone/push Build is passing
2024-12-14 02:38:13 +01:00
0b5f19fc77 fix(deps): update rust crate thiserror to v2.0.6 (#19)
All checks were successful
continuous-integration/drone/push Build is passing
2024-12-09 06:48:37 +01:00
14eabdbe82 fix(deps): update rust crate thiserror to v2.0.5 (#18)
All checks were successful
continuous-integration/drone/push Build is passing
2024-12-08 06:39:15 +01:00
ea568449fe fix(deps): update rust crate tokio-util to v0.7.13 (#17)
All checks were successful
continuous-integration/drone/push Build is passing
2024-12-05 02:46:34 +01:00
6ec3a6031e chore(deps): update all dependencies (#16)
All checks were successful
continuous-integration/drone/push Build is passing
2024-12-04 03:07:17 +01:00
0f8fd2343e chore(deps): update rust crate tracing-subscriber to v0.3.19 (#15)
All checks were successful
continuous-integration/drone/push Build is passing
2024-11-30 03:27:23 +01:00
12c00941b5 chore(deps): update rust crate tracing to v0.1.41 (#13)
All checks were successful
continuous-integration/drone/push Build is passing
2024-11-28 02:56:53 +01:00
72755f9cf1 chore(release): v0.7.1 (#12)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
chore(release): 0.7.1

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: https://git.front.kjuulh.io/kjuulh/mad/pulls/12
2024-11-24 11:28:02 +01:00
ae0b8b703e fix: make sure to close on final
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 11:24:13 +01:00
18 changed files with 2210 additions and 349 deletions

View File

@@ -6,6 +6,90 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.10.0] - 2025-11-15
### Added
- implement take errors
## [0.9.0] - 2025-11-15
### Added
- mad not properly surfaces panics
- add publish
- add readme
### Fixed
- *(deps)* update all dependencies (#38)
### Other
- *(deps)* update rust crate tracing-subscriber to v0.3.20 (#37)
## [0.8.1] - 2025-08-09
### Other
- error logging
## [0.8.0] - 2025-08-08
### Added
- add docs
- update readme
## [0.7.5] - 2025-07-24
### Added
- print big inner
### Other
- more error correction
- correct error test to not be as verbose
## [0.7.4] - 2025-07-24
### Added
- cleanup aggregate error for single error
## [0.7.3] - 2025-07-24
### Added
- automatic conversion from anyhow::Error and access to aggregate errors
### Fixed
- *(deps)* update all dependencies (#30)
## [0.7.2] - 2025-06-25
### Added
- add wait
- add conditional, allows adding or waiting for close
### Fixed
- *(deps)* update rust crate async-trait to v0.1.86 (#28)
- *(deps)* update rust crate rand to 0.9.0 (#27)
- *(deps)* update rust crate thiserror to v2.0.11 (#26)
- *(deps)* update all dependencies (#25)
- *(deps)* update rust crate async-trait to v0.1.84 (#24)
- *(deps)* update rust crate thiserror to v2.0.9 (#22)
- *(deps)* update rust crate thiserror to v2.0.8 (#21)
- *(deps)* update rust crate thiserror to v2.0.7 (#20)
- *(deps)* update rust crate thiserror to v2.0.6 (#19)
- *(deps)* update rust crate thiserror to v2.0.5 (#18)
- *(deps)* update rust crate tokio-util to v0.7.13 (#17)
### Other
- chore
- *(deps)* update all dependencies (#29)
- *(deps)* update rust crate anyhow to v1.0.95 (#23)
- *(deps)* update all dependencies (#16)
- *(deps)* update rust crate tracing-subscriber to v0.3.19 (#15)
- *(deps)* update rust crate tracing to v0.1.41 (#13)
## [0.7.1] - 2024-11-24
### Fixed
- make sure to close on final
## [0.7.0] - 2024-11-24
### Added

674
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -3,10 +3,9 @@ members = ["crates/*"]
resolver = "2"
[workspace.package]
version = "0.7.0"
version = "0.11.0"
[workspace.dependencies]
mad = { path = "crates/mad" }
anyhow = { version = "1.0.71" }
tokio = { version = "1", features = ["full"] }

171
README.md
View File

@@ -1,29 +1,32 @@
# MAD
# MAD - Lifecycle Manager for Rust Applications
Mad is a life-cycle manager for long running rust operations.
[![Crates.io](https://img.shields.io/crates/v/notmad.svg)](https://crates.io/crates/notmad)
[![Documentation](https://docs.rs/notmad/badge.svg)](https://docs.rs/notmad)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
- Webservers
- Queue bindings
- gRPC servers etc
- Cron runners
A simple lifecycle manager for long-running Rust applications. Run multiple services concurrently with graceful shutdown handling.
It is supposed to be the main thing the application runs, and everything from it is spawned and managed by it.
## Installation
```toml
[dependencies]
notmad = "0.10.0"
tokio = { version = "1", features = ["full"] }
```
## Quick Start
```rust
struct WaitServer {}
use notmad::{Component, Mad};
use tokio_util::sync::CancellationToken;
#[async_trait]
impl Component for WaitServer {
fn name(&self) -> Option<String> {
Some("NeverEndingRun".into())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
let millis_wait = rand::thread_rng().gen_range(50..1000);
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
struct MyService;
impl Component for MyService {
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
println!("Service running...");
cancellation.cancelled().await;
println!("Service stopped");
Ok(())
}
}
@@ -31,21 +34,133 @@ impl Component for WaitServer {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
Mad::builder()
.add(WaitServer {})
.add(WaitServer {})
.add(WaitServer {})
.add(MyService)
.run()
.await?;
Ok(())
}
```
## Basic Usage
### Axum Web Server with Graceful Shutdown
Here's how to run an Axum server with MAD's graceful shutdown:
```rust
use axum::{Router, routing::get};
use notmad::{Component, ComponentInfo};
use tokio_util::sync::CancellationToken;
struct WebServer {
port: u16,
}
impl Component for WebServer {
fn info(&self) -> ComponentInfo {
format!("WebServer:{}", self.port).into()
}
async fn run(&self, cancel: CancellationToken) -> Result<(), notmad::MadError> {
let app = Router::new().route("/", get(|| async { "Hello, World!" }));
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", self.port))
.await?;
println!("Listening on http://0.0.0.0:{}", self.port);
// Run server with graceful shutdown
axum::serve(listener, app)
.with_graceful_shutdown(async move {
cancel.cancelled().await;
println!("Shutting down server...");
})
.await?;
Ok(())
}
}
```
### Run Multiple Services
```rust
Mad::builder()
.add(WebServer { port: 8080 })
.add(WebServer { port: 8081 })
.run()
.await?;
```
### Use Functions as Components
```rust
Mad::builder()
.add_fn(|cancel| async move {
println!("Running...");
cancel.cancelled().await;
Ok(())
})
.run()
.await?;
```
## Lifecycle Hooks
Components support optional setup and cleanup phases:
```rust
impl Component for DatabaseService {
async fn setup(&self) -> Result<(), notmad::MadError> {
println!("Connecting to database...");
Ok(())
}
async fn run(&self, cancel: CancellationToken) -> Result<(), notmad::MadError> {
cancel.cancelled().await;
Ok(())
}
async fn close(&self) -> Result<(), notmad::MadError> {
println!("Closing database connection...");
Ok(())
}
}
```
## Migration from v0.10
### Breaking Changes
1. **`name()``info()`**: Returns `ComponentInfo` instead of `Option<String>`
```rust
// Before
fn name(&self) -> Option<String> { Some("my-service".into()) }
// After
fn info(&self) -> ComponentInfo { "my-service".into() }
```
2. **No more `async-trait`**: Remove the dependency and `#[async_trait]` attribute
```rust
// Before
#[async_trait]
impl Component for MyService { }
// After
impl Component for MyService { }
```
## Examples
Can be found (here)[crates/mad/examples]
See [examples directory](crates/mad/examples) for complete working examples.
- basic
- fn
- signals
- error_log
## License
MIT - see [LICENSE](LICENSE)
## Links
- [Documentation](https://docs.rs/notmad)
- [Repository](https://github.com/kjuulh/mad)
- [Crates.io](https://crates.io/crates/notmad)

View File

@@ -4,15 +4,15 @@ version.workspace = true
description = "notmad is a life-cycle manager for long running rust operations"
license = "MIT"
repository = "https://github.com/kjuulh/mad"
author = "kjuulh"
edition = "2021"
authors = ["kjuulh"]
edition = "2024"
readme = "../../README.md"
[dependencies]
anyhow.workspace = true
async-trait = "0.1.81"
futures = "0.3.30"
futures-util = "0.3.30"
rand = "0.8.5"
rand = "0.10.0"
thiserror = "2.0.0"
tokio.workspace = true
tokio-util = "0.7.11"

View File

@@ -1,16 +1,15 @@
use async_trait::async_trait;
use notmad::ComponentInfo;
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::Level;
struct WaitServer {}
#[async_trait]
impl notmad::Component for WaitServer {
fn name(&self) -> Option<String> {
Some("WaitServer".into())
fn info(&self) -> ComponentInfo {
"WaitServer".into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
let millis_wait = rand::thread_rng().gen_range(500..3000);
tracing::debug!("waiting: {}ms", millis_wait);

View File

@@ -0,0 +1,18 @@
[package]
name = "mad-comprehensive-example"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "comprehensive"
path = "main.rs"
[dependencies]
notmad = { path = "../.." }
tokio = { version = "1", features = ["full"] }
tokio-util = "0.7"
async-trait = "0.1"
anyhow = "1"
tracing = "0.1"
tracing-subscriber = "0.3"
rand = "0.8"

View File

@@ -0,0 +1,326 @@
//! Comprehensive example demonstrating MAD's full capabilities.
//!
//! This example shows:
//! - Multiple component types (struct, closure, conditional)
//! - Component lifecycle (setup, run, close)
//! - Error handling and propagation
//! - Graceful shutdown with cancellation tokens
//! - Concurrent component execution
use notmad::{Component, ComponentInfo, Mad, MadError};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::time::{Duration, interval};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
/// A web server component that simulates handling HTTP requests.
struct WebServer {
port: u16,
request_count: Arc<AtomicUsize>,
}
impl Component for WebServer {
fn info(&self) -> ComponentInfo {
format!("web-server-{}", self.port).into()
}
async fn setup(&self) -> Result<(), MadError> {
info!("Setting up web server on port {}", self.port);
// In a real application, you might:
// - Bind to the port
// - Set up TLS certificates
// - Initialize middleware
tokio::time::sleep(Duration::from_millis(100)).await;
info!("Web server on port {} is ready", self.port);
Ok(())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
info!("Web server on port {} started", self.port);
let mut interval = interval(Duration::from_secs(1));
while !cancellation.is_cancelled() {
tokio::select! {
_ = cancellation.cancelled() => {
info!("Web server on port {} received shutdown signal", self.port);
break;
}
_ = interval.tick() => {
// Simulate handling requests
let count = self.request_count.fetch_add(1, Ordering::Relaxed);
info!("Server on port {} handled request #{}", self.port, count + 1);
}
}
}
Ok(())
}
async fn close(&self) -> Result<(), MadError> {
info!("Shutting down web server on port {}", self.port);
// In a real application, you might:
// - Drain existing connections
// - Save server state
// - Close database connections
tokio::time::sleep(Duration::from_millis(200)).await;
let total = self.request_count.load(Ordering::Relaxed);
info!(
"Web server on port {} shut down. Total requests handled: {}",
self.port, total
);
Ok(())
}
}
/// A background job processor that simulates processing tasks from a queue.
struct JobProcessor {
queue_name: String,
processing_interval: Duration,
}
impl Component for JobProcessor {
fn info(&self) -> ComponentInfo {
format!("job-processor-{}", self.queue_name).into()
}
async fn setup(&self) -> Result<(), MadError> {
info!("Connecting to queue: {}", self.queue_name);
// Simulate connecting to a message queue
tokio::time::sleep(Duration::from_millis(150)).await;
Ok(())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
info!("Job processor for {} started", self.queue_name);
let mut interval = interval(self.processing_interval);
let mut job_count = 0;
loop {
tokio::select! {
_ = cancellation.cancelled() => {
info!("Job processor for {} stopping...", self.queue_name);
break;
}
_ = interval.tick() => {
job_count += 1;
info!("Processing job #{} from {}", job_count, self.queue_name);
// Simulate job processing
tokio::time::sleep(Duration::from_millis(100)).await;
// Simulate occasional errors (but don't fail the component)
if job_count % 10 == 0 {
warn!("Job #{} from {} required retry", job_count, self.queue_name);
}
}
}
}
info!(
"Job processor for {} processed {} jobs",
self.queue_name, job_count
);
Ok(())
}
async fn close(&self) -> Result<(), MadError> {
info!("Disconnecting from queue: {}", self.queue_name);
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
}
/// A health check component that monitors system health.
struct HealthChecker {
check_interval: Duration,
}
impl Component for HealthChecker {
fn info(&self) -> ComponentInfo {
"health-checker".into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
info!("Health checker started");
let mut interval = interval(self.check_interval);
while !cancellation.is_cancelled() {
tokio::select! {
_ = cancellation.cancelled() => {
info!("Health checker stopping...");
break;
}
_ = interval.tick() => {
// Simulate health checks
let cpu_usage = rand::random::<f32>() * 100.0;
let memory_usage = rand::random::<f32>() * 100.0;
info!("System health: CPU={:.1}%, Memory={:.1}%", cpu_usage, memory_usage);
if cpu_usage > 90.0 {
warn!("High CPU usage detected: {:.1}%", cpu_usage);
}
if memory_usage > 90.0 {
warn!("High memory usage detected: {:.1}%", memory_usage);
}
}
}
}
Ok(())
}
}
/// A component that will fail after some time to demonstrate error handling.
struct FailingComponent {
fail_after: Duration,
}
impl Component for FailingComponent {
fn info(&self) -> ComponentInfo {
"failing-component".into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
info!(
"Failing component started (will fail after {:?})",
self.fail_after
);
tokio::select! {
_ = cancellation.cancelled() => {
info!("Failing component cancelled before failure");
Ok(())
}
_ = tokio::time::sleep(self.fail_after) => {
error!("Failing component encountered an error!");
Err(anyhow::anyhow!("Simulated component failure").into())
}
}
}
}
/// Debug component that logs system status periodically.
struct DebugComponent;
impl Component for DebugComponent {
fn info(&self) -> ComponentInfo {
"debug-component".into()
}
async fn run(&self, cancel: CancellationToken) -> Result<(), MadError> {
info!("Debug mode enabled - verbose logging active");
let mut interval = interval(Duration::from_secs(5));
while !cancel.is_cancelled() {
tokio::select! {
_ = cancel.cancelled() => break,
_ = interval.tick() => {
info!("DEBUG: System running normally");
}
}
}
info!("Debug component shutting down");
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Initialize tracing for logging
tracing_subscriber::fmt()
.with_target(false)
.without_time()
.init();
info!("Starting comprehensive MAD example application");
// Check if we should enable the failing component
let enable_failure_demo = std::env::var("ENABLE_FAILURE").is_ok();
// Check if we should enable debug mode
let debug_mode = std::env::var("DEBUG").is_ok();
// Shared state for demonstration
let request_count = Arc::new(AtomicUsize::new(0));
// Build and run the application
let result = Mad::builder()
// Add web servers
.add(WebServer {
port: 8080,
request_count: request_count.clone(),
})
.add(WebServer {
port: 8081,
request_count: request_count.clone(),
})
// Add job processors
.add(JobProcessor {
queue_name: "high-priority".to_string(),
processing_interval: Duration::from_secs(2),
})
.add(JobProcessor {
queue_name: "low-priority".to_string(),
processing_interval: Duration::from_secs(5),
})
// Add health checker
.add(HealthChecker {
check_interval: Duration::from_secs(3),
})
// Conditionally add a debug component using add_fn
.add_conditional(debug_mode, DebugComponent)
// Conditionally add failing component to demonstrate error handling
.add_conditional(
enable_failure_demo,
FailingComponent {
fail_after: Duration::from_secs(10),
},
)
// Add a simple metrics reporter using add_fn
.add_fn(|cancel: CancellationToken| async move {
info!("Metrics reporter started");
let mut interval = interval(Duration::from_secs(10));
let start = std::time::Instant::now();
while !cancel.is_cancelled() {
tokio::select! {
_ = cancel.cancelled() => break,
_ = interval.tick() => {
let uptime = start.elapsed();
info!("Application uptime: {:?}", uptime);
}
}
}
info!("Metrics reporter stopped");
Ok(())
})
// Configure graceful shutdown timeout
.cancellation(Some(Duration::from_secs(5)))
// Run the application
.run()
.await;
match result {
Ok(()) => {
info!("Application shut down successfully");
Ok(())
}
Err(e) => {
error!("Application failed: {}", e);
// Check if it's an aggregate error with multiple failures
if let MadError::AggregateError(ref agg) = e {
error!("Multiple component failures detected:");
for (i, err) in agg.get_errors().iter().enumerate() {
error!(" {}. {}", i + 1, err);
}
}
Err(e.into())
}
}
}

View File

@@ -1,16 +1,15 @@
use async_trait::async_trait;
use notmad::ComponentInfo;
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::Level;
struct ErrorServer {}
#[async_trait]
impl notmad::Component for ErrorServer {
fn name(&self) -> Option<String> {
Some("ErrorServer".into())
fn info(&self) -> ComponentInfo {
"ErrorServer".into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
let millis_wait = rand::thread_rng().gen_range(500..3000);
tracing::debug!("waiting: {}ms", millis_wait);

View File

@@ -1,13 +1,12 @@
use async_trait::async_trait;
use notmad::ComponentInfo;
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::Level;
struct WaitServer {}
#[async_trait]
impl notmad::Component for WaitServer {
fn name(&self) -> Option<String> {
Some("WaitServer".into())
fn info(&self) -> ComponentInfo {
"WaitServer".into()
}
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {

View File

@@ -0,0 +1,15 @@
[package]
name = "mad-multi-service-example"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "multi_service"
path = "main.rs"
[dependencies]
notmad = { path = "../.." }
tokio = { version = "1", features = ["full"] }
tokio-util = "0.7"
async-trait = "0.1"
anyhow = "1"

View File

@@ -0,0 +1,210 @@
//! Example demonstrating running multiple services with MAD.
//!
//! This example shows how to run a web server, queue processor, and
//! scheduled task together, with graceful shutdown on Ctrl+C.
use notmad::{Component, ComponentInfo, Mad, MadError};
use tokio::time::{Duration, interval};
use tokio_util::sync::CancellationToken;
/// A simple web server component.
///
/// In a real application, this would bind to a port and handle HTTP requests.
/// Here we simulate it by periodically logging that we're "handling" requests.
struct WebServer {
port: u16,
}
impl Component for WebServer {
fn info(&self) -> ComponentInfo {
format!("WebServer:{}", self.port).into()
}
async fn setup(&self) -> Result<(), MadError> {
println!("[{}] Binding to port...", self.info());
// Simulate server setup time
tokio::time::sleep(Duration::from_millis(100)).await;
println!("[{}] Ready to accept connections", self.info());
Ok(())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
println!("[{}] Server started", self.info());
// Simulate handling requests until shutdown
let mut request_id = 0;
let mut interval = interval(Duration::from_secs(2));
while !cancellation.is_cancelled() {
tokio::select! {
_ = cancellation.cancelled() => {
println!("[{}] Shutdown signal received", self.info());
break;
}
_ = interval.tick() => {
request_id += 1;
println!("[{}] Handling request #{}", self.info(), request_id);
}
}
}
Ok(())
}
async fn close(&self) -> Result<(), MadError> {
println!("[{}] Closing connections...", self.info());
// Simulate graceful connection drain
tokio::time::sleep(Duration::from_millis(200)).await;
println!("[{}] Server stopped", self.info());
Ok(())
}
}
/// A queue processor that consumes messages from a queue.
///
/// This simulates processing messages from a message queue like
/// RabbitMQ, Kafka, or AWS SQS.
struct QueueProcessor {
queue_name: String,
}
impl Component for QueueProcessor {
fn info(&self) -> ComponentInfo {
format!("QueueProcessor:{}", self.queue_name).into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
println!("[{}] Started processing", self.info());
let mut message_count = 0;
// Process messages until shutdown
while !cancellation.is_cancelled() {
// Simulate waiting for and processing a message
tokio::select! {
_ = cancellation.cancelled() => {
println!("[{}] Stopping message processing", self.info());
break;
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {
message_count += 1;
println!("[{}] Processed message #{}", self.info(), message_count);
}
}
}
println!(
"[{}] Processed {} messages total",
self.info(),
message_count
);
Ok(())
}
}
/// A scheduled task that runs periodically.
///
/// This could be used for tasks like:
/// - Cleaning up old data
/// - Generating reports
/// - Syncing with external systems
struct ScheduledTask {
task_name: String,
interval_secs: u64,
}
impl Component for ScheduledTask {
fn info(&self) -> ComponentInfo {
format!("ScheduledTask:{}", self.task_name).into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
println!(
"[{}] Scheduled to run every {} seconds",
self.info(),
self.interval_secs
);
let mut interval = interval(Duration::from_secs(self.interval_secs));
let mut run_count = 0;
while !cancellation.is_cancelled() {
tokio::select! {
_ = cancellation.cancelled() => {
println!("[{}] Scheduler stopping", self.info());
break;
}
_ = interval.tick() => {
run_count += 1;
println!("[{}] Executing run #{}", self.info(), run_count);
// Simulate task execution
tokio::time::sleep(Duration::from_millis(500)).await;
println!("[{}] Run #{} completed", self.info(), run_count);
}
}
}
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
println!("Starting multi-service application");
println!("Press Ctrl+C to trigger graceful shutdown");
println!("----------------------------------------");
// Build the application with multiple services
Mad::builder()
// Add a web server on port 8080
.add(WebServer { port: 8080 })
// Add another web server on port 8081 (e.g., admin interface)
.add(WebServer { port: 8081 })
// Add queue processors for different queues
.add(QueueProcessor {
queue_name: "orders".to_string(),
})
.add(QueueProcessor {
queue_name: "notifications".to_string(),
})
// Add scheduled tasks
.add(ScheduledTask {
task_name: "cleanup".to_string(),
interval_secs: 5,
})
.add(ScheduledTask {
task_name: "report_generator".to_string(),
interval_secs: 10,
})
// Add a monitoring component using a closure
.add_fn(|cancel| async move {
println!("[Monitor] Starting system monitor");
let mut interval = interval(Duration::from_secs(15));
while !cancel.is_cancelled() {
tokio::select! {
_ = cancel.cancelled() => {
println!("[Monitor] Stopping monitor");
break;
}
_ = interval.tick() => {
println!("[Monitor] All systems operational");
}
}
}
Ok(())
})
// Set graceful shutdown timeout to 3 seconds
.cancellation(Some(Duration::from_secs(3)))
// Run all components
.run()
.await?;
println!("----------------------------------------");
println!("All services shut down successfully");
Ok(())
}

View File

@@ -0,0 +1,84 @@
use notmad::ComponentInfo;
use tokio_util::sync::CancellationToken;
struct NestedErrorComponent {
name: String,
}
impl notmad::Component for NestedErrorComponent {
fn info(&self) -> ComponentInfo {
self.name.clone().into()
}
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
// Simulate a deeply nested error
let io_error = std::io::Error::new(
std::io::ErrorKind::PermissionDenied,
"access denied to /etc/secret",
);
Err(anyhow::Error::from(io_error)
.context("failed to read configuration file")
.context("unable to initialize database connection pool")
.context(format!("component '{}' startup failed", self.name))
.into())
}
}
struct AnotherFailingComponent;
impl notmad::Component for AnotherFailingComponent {
fn info(&self) -> ComponentInfo {
"another-component".into()
}
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Err(anyhow::anyhow!("network timeout after 30s")
.context("failed to connect to external API")
.context("service health check failed")
.into())
}
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_env_filter("mad=debug")
.init();
let result = notmad::Mad::builder()
.add(NestedErrorComponent {
name: "database-service".into(),
})
.add(AnotherFailingComponent)
.run()
.await;
match result {
Ok(()) => println!("Success!"),
Err(e) => {
eprintln!("\n=== Error occurred ===");
eprintln!("{}", e);
// Also demonstrate how to walk the error chain manually
if let notmad::MadError::AggregateError(ref agg) = e {
eprintln!("\n=== Detailed error chains ===");
for (i, error) in agg.get_errors().iter().enumerate() {
eprintln!("\nComponent {} error chain:", i + 1);
if let notmad::MadError::Inner(inner) = error {
for (j, cause) in inner.chain().enumerate() {
eprintln!(" {}. {}", j + 1, cause);
}
}
}
} else if let notmad::MadError::Inner(ref inner) = e {
eprintln!("\n=== Error chain ===");
for (i, cause) in inner.chain().enumerate() {
eprintln!(" {}. {}", i + 1, cause);
}
}
}
}
}

View File

@@ -1,16 +1,15 @@
use async_trait::async_trait;
use notmad::ComponentInfo;
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::Level;
struct WaitServer {}
#[async_trait]
impl notmad::Component for WaitServer {
fn name(&self) -> Option<String> {
Some("WaitServer".into())
fn info(&self) -> ComponentInfo {
"WaitServer".into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
let millis_wait = rand::thread_rng().gen_range(500..3000);
tracing::debug!("waiting: {}ms", millis_wait);
@@ -23,10 +22,9 @@ impl notmad::Component for WaitServer {
}
struct RespectCancel {}
#[async_trait]
impl notmad::Component for RespectCancel {
fn name(&self) -> Option<String> {
Some("RespectCancel".into())
fn info(&self) -> ComponentInfo {
"RespectCancel".into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
@@ -38,13 +36,12 @@ impl notmad::Component for RespectCancel {
}
struct NeverStopServer {}
#[async_trait]
impl notmad::Component for NeverStopServer {
fn name(&self) -> Option<String> {
Some("NeverStopServer".into())
fn info(&self) -> ComponentInfo {
"NeverStopServer".into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(999999999)).await;

View File

@@ -1,51 +1,193 @@
//! # MAD - Lifecycle Manager for Rust Applications
//!
//! A simple lifecycle manager for long-running Rust applications. Run multiple services
//! concurrently with graceful shutdown handling.
//!
//! ## Quick Start
//!
//! ```rust,no_run
//! use notmad::{Component, Mad};
//! use tokio_util::sync::CancellationToken;
//!
//! struct MyService;
//!
//! impl Component for MyService {
//! async fn run(&self, cancel: CancellationToken) -> Result<(), notmad::MadError> {
//! println!("Running...");
//! cancel.cancelled().await;
//! println!("Stopped");
//! Ok(())
//! }
//! }
//!
//! #[tokio::main]
//! async fn main() -> anyhow::Result<()> {
//! Mad::builder()
//! .add(MyService)
//! .run()
//! .await?;
//! Ok(())
//! }
//! ```
//!
//! ## Features
//!
//! - Run multiple components concurrently
//! - Graceful shutdown with cancellation tokens
//! - Optional lifecycle hooks: `setup()`, `run()`, `close()`
//! - Automatic error aggregation
//! - SIGTERM and Ctrl+C signal handling
use futures::stream::FuturesUnordered;
use futures_util::StreamExt;
use std::{fmt::Display, sync::Arc};
use tokio::signal::unix::{signal, SignalKind};
use std::{error::Error, fmt::Display, pin::Pin, sync::Arc};
use tokio::signal::unix::{SignalKind, signal};
use tokio_util::sync::CancellationToken;
use crate::waiter::Waiter;
mod waiter;
/// Error type for MAD operations.
///
/// This enum represents all possible errors that can occur during
/// the lifecycle of MAD components.
#[derive(thiserror::Error, Debug)]
pub enum MadError {
#[error("component failed: {0}")]
Inner(#[source] anyhow::Error),
/// Generic error wrapper for anyhow errors.
///
/// This variant is used when components return errors via the `?` operator
/// or when converting from `anyhow::Error`.
#[error(transparent)]
Inner(anyhow::Error),
#[error("component(s) failed: {run}")]
/// Error that occurred during the run phase of a component.
#[error(transparent)]
RunError { run: anyhow::Error },
#[error("component(s) failed: {close}")]
CloseError { close: anyhow::Error },
/// Error that occurred during the close phase of a component.
#[error("component(s) failed during close")]
CloseError {
#[source]
close: anyhow::Error,
},
#[error("component(s) failed: {0}")]
/// Multiple errors from different components.
///
/// This is used when multiple components fail simultaneously,
/// allowing all errors to be reported rather than just the first one.
#[error("{0}")]
AggregateError(AggregateError),
/// Returned when a component doesn't implement the optional setup method.
///
/// This is not typically an error condition as setup is optional.
#[error("setup not defined")]
SetupNotDefined,
/// Returned when a component doesn't implement the optional close method.
///
/// This is not typically an error condition as close is optional.
#[error("close not defined")]
CloseNotDefined,
}
#[derive(Debug)]
impl From<anyhow::Error> for MadError {
fn from(value: anyhow::Error) -> Self {
Self::Inner(value)
}
}
/// Container for multiple errors from different components.
///
/// When multiple components fail, their errors are collected
/// into this struct to provide complete error reporting.
#[derive(Debug, thiserror::Error)]
pub struct AggregateError {
errors: Vec<MadError>,
}
impl Display for AggregateError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("MadError::AggregateError: (")?;
impl AggregateError {
/// Returns a slice of all contained errors.
///
/// # Example
///
/// ```rust,ignore
/// match result {
/// Err(notmad::MadError::AggregateError(agg)) => {
/// for error in agg.get_errors() {
/// eprintln!("Component error: {}", error);
/// }
/// }
/// _ => {}
/// }
/// ```
pub fn get_errors(&self) -> &[MadError] {
&self.errors
}
for error in &self.errors {
f.write_str(&error.to_string())?;
f.write_str(", ")?;
}
f.write_str(")")
pub fn take_errors(self) -> Vec<MadError> {
self.errors
}
}
impl Display for AggregateError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.errors.is_empty() {
return Ok(());
}
if self.errors.len() == 1 {
return write!(f, "{}", self.errors[0]);
}
writeln!(f, "{} component errors occurred:", self.errors.len())?;
for (i, error) in self.errors.iter().enumerate() {
write!(f, "\n[Component {}] {}", i + 1, error)?;
// Print the error chain for each component error
let mut source = error.source();
let mut level = 1;
while let Some(err) = source {
write!(f, "\n {}. {}", level, err)?;
source = err.source();
level += 1;
}
}
Ok(())
}
}
/// The main lifecycle manager for running multiple components.
///
/// `Mad` orchestrates the lifecycle of multiple components, ensuring they
/// start up in order, run concurrently, and shut down gracefully.
///
/// # Example
///
/// ```rust
/// use notmad::{Component, Mad};
/// use tokio_util::sync::CancellationToken;
///
/// struct MyComponent;
///
/// impl Component for MyComponent {
/// async fn run(&self, _cancel: CancellationToken) -> Result<(), notmad::MadError> {
/// Ok(())
/// }
/// }
///
/// # async fn example() -> Result<(), notmad::MadError> {
/// Mad::builder()
/// .add(MyComponent)
/// .run()
/// .await?;
/// # Ok(())
/// # }
/// ```
pub struct Mad {
components: Vec<Arc<dyn Component + Send + Sync + 'static>>,
components: Vec<SharedComponent>,
should_cancel: Option<std::time::Duration>,
}
@@ -56,6 +198,18 @@ struct CompletionResult {
}
impl Mad {
/// Creates a new `Mad` builder.
///
/// This is the entry point for constructing a MAD application.
/// Components are added using the builder pattern before calling `run()`.
///
/// # Example
///
/// ```rust
/// use notmad::Mad;
///
/// let mut app = Mad::builder();
/// ```
pub fn builder() -> Self {
Self {
components: Vec::default(),
@@ -64,12 +218,119 @@ impl Mad {
}
}
/// Adds a component to the MAD application.
///
/// Components will be set up in the order they are added,
/// run concurrently, and closed in the order they were added.
///
/// # Arguments
///
/// * `component` - Any type that implements `Component` or `IntoComponent`
///
/// # Example
///
/// ```rust
/// use notmad::{Component, Mad};
/// # use tokio_util::sync::CancellationToken;
/// # struct MyService;
/// # impl Component for MyService {
/// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) }
/// # }
///
/// Mad::builder()
/// .add(MyService)
/// .add(MyService);
/// ```
pub fn add(&mut self, component: impl IntoComponent) -> &mut Self {
self.components.push(component.into_component());
self
}
/// Conditionally adds a component based on a boolean condition.
///
/// If the condition is false, a waiter component is added instead,
/// which simply waits for cancellation without doing any work.
///
/// # Arguments
///
/// * `condition` - If true, adds the component; if false, adds a waiter
/// * `component` - The component to add if condition is true
///
/// # Example
///
/// ```rust
/// use notmad::Mad;
/// # use notmad::Component;
/// # use tokio_util::sync::CancellationToken;
/// # struct DebugService;
/// # impl Component for DebugService {
/// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) }
/// # }
///
/// let enable_debug = std::env::var("DEBUG").is_ok();
///
/// Mad::builder()
/// .add_conditional(enable_debug, DebugService);
/// ```
pub fn add_conditional(&mut self, condition: bool, component: impl IntoComponent) -> &mut Self {
if condition {
self.components.push(component.into_component());
} else {
self.components
.push(Waiter::new(component.into_component()).into_component())
}
self
}
/// Adds a waiter component that does nothing but wait for cancellation.
///
/// This is useful when you need a placeholder component or want
/// the application to keep running without any specific work.
///
/// # Example
///
/// ```rust,no_run
/// # async fn example() {
/// use notmad::Mad;
///
/// Mad::builder()
/// .add_wait() // Keeps the app running until shutdown signal
/// .run()
/// .await;
/// # }
/// ```
pub fn add_wait(&mut self) -> &mut Self {
self.components.push(Waiter::default().into_component());
self
}
/// Adds a closure or function as a component.
///
/// This is a convenient way to add simple components without
/// creating a full struct that implements `Component`.
///
/// # Arguments
///
/// * `f` - A closure that takes a `CancellationToken` and returns a future
///
/// # Example
///
/// ```rust
/// use notmad::Mad;
/// use tokio_util::sync::CancellationToken;
///
/// Mad::builder()
/// .add_fn(|cancel: CancellationToken| async move {
/// while !cancel.is_cancelled() {
/// println!("Working...");
/// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
/// }
/// Ok(())
/// });
/// ```
pub fn add_fn<F, Fut>(&mut self, f: F) -> &mut Self
where
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
@@ -80,12 +341,63 @@ impl Mad {
self.add(comp)
}
/// Configures the cancellation timeout behavior.
///
/// When a shutdown signal is received, MAD will:
/// 1. Send cancellation tokens to all components
/// 2. Wait for the specified duration
/// 3. Force shutdown if components haven't stopped
///
/// # Arguments
///
/// * `should_cancel` - Duration to wait after cancellation before forcing shutdown.
/// Pass `None` to wait indefinitely.
///
/// # Example
///
/// ```rust,no_run
/// # async fn example() {
/// use notmad::Mad;
/// use std::time::Duration;
///
/// Mad::builder()
/// .cancellation(Some(Duration::from_secs(30))) // 30 second grace period
/// .run()
/// .await;
/// # }
/// ```
pub fn cancellation(&mut self, should_cancel: Option<std::time::Duration>) -> &mut Self {
self.should_cancel = should_cancel;
self
}
/// Runs all components until completion or shutdown.
///
/// This method:
/// 1. Calls `setup()` on all components (in order)
/// 2. Starts all components concurrently
/// 3. Waits for shutdown signal (SIGTERM, Ctrl+C) or component failure
/// 4. Sends cancellation to all components
/// 5. Calls `close()` on all components (in order)
///
/// # Returns
///
/// * `Ok(())` if all components shut down cleanly
/// * `Err(MadError)` if any component fails
///
/// # Example
///
/// ```rust,no_run
/// # use notmad::Mad;
/// # async fn example() -> Result<(), notmad::MadError> {
/// Mad::builder()
/// .add_wait()
/// .run()
/// .await?;
/// # Ok(())
/// # }
/// ```
pub async fn run(&mut self) -> Result<(), MadError> {
tracing::info!("running mad setup");
@@ -100,7 +412,7 @@ impl Mad {
(Err(run), Err(close)) => {
return Err(MadError::AggregateError(AggregateError {
errors: vec![run, close],
}))
}));
}
(Ok(_), Ok(_)) => {}
(Ok(_), Err(close)) => return Err(close),
@@ -114,7 +426,7 @@ impl Mad {
tracing::debug!("setting up components");
for comp in &self.components {
tracing::trace!(component = &comp.name(), "mad setting up");
tracing::trace!(component = %comp.info(), "mad setting up");
match comp.setup().await {
Ok(_) | Err(MadError::SetupNotDefined) => {}
@@ -144,34 +456,57 @@ impl Mad {
channels.push(error_rx);
tokio::spawn(async move {
let name = comp.name().clone();
let info = comp.info().clone();
tracing::debug!(component = name, "mad running");
tracing::debug!(component = %info, "mad running");
let handle = tokio::spawn(async move { comp.run(job_cancellation).await });
tokio::select! {
_ = cancellation_token.cancelled() => {
error_tx.send(CompletionResult { res: Ok(()) , name }).await
error_tx.send(CompletionResult { res: Ok(()) , name: info.name }).await
}
res = comp.run(job_cancellation) => {
error_tx.send(CompletionResult { res , name }).await
res = handle => {
let res = match res {
Ok(res) => res,
Err(join) => {
match join.source() {
Some(error) => {
Err(MadError::RunError{run: anyhow::anyhow!("component aborted: {:?}", error)})
},
None => {
if join.is_panic(){
Err(MadError::RunError { run: anyhow::anyhow!("component panicked: {}", join) })
} else {
Err(MadError::RunError { run: anyhow::anyhow!("component faced unknown error: {}", join) })
}
},
}
},
};
error_tx.send(CompletionResult { res , name: info.name }).await
}
}
});
}
tokio::spawn({
let cancellation_token = cancellation_token.child_token();
let cancellation_token = cancellation_token;
let job_done = job_done.child_token();
let wait_cancel = self.should_cancel;
async move {
let should_cancel =
|cancel: CancellationToken, wait: Option<std::time::Duration>| async move {
|cancel: CancellationToken,
global_cancel: CancellationToken,
wait: Option<std::time::Duration>| async move {
if let Some(cancel_wait) = wait {
tokio::time::sleep(cancel_wait).await;
cancel.cancel();
tokio::time::sleep(cancel_wait).await;
global_cancel.cancel();
}
};
@@ -180,13 +515,13 @@ impl Mad {
job_cancellation.cancel();
}
_ = job_done.cancelled() => {
should_cancel(job_cancellation, wait_cancel).await;
should_cancel(job_cancellation, cancellation_token, wait_cancel).await;
}
_ = tokio::signal::ctrl_c() => {
should_cancel(job_cancellation, wait_cancel).await;
should_cancel(job_cancellation, cancellation_token,wait_cancel).await;
}
_ = signal_unix_terminate() => {
should_cancel(job_cancellation, wait_cancel).await;
should_cancel(job_cancellation, cancellation_token, wait_cancel).await;
}
}
}
@@ -228,7 +563,7 @@ impl Mad {
tracing::debug!("closing components");
for comp in &self.components {
tracing::trace!(component = &comp.name(), "mad closing");
tracing::trace!(component = %comp.info(), "mad closing");
match comp.close().await {
Ok(_) | Err(MadError::CloseNotDefined) => {}
Err(e) => return Err(e),
@@ -246,30 +581,259 @@ async fn signal_unix_terminate() {
sigterm.recv().await;
}
#[async_trait::async_trait]
pub trait Component {
fn name(&self) -> Option<String> {
None
#[derive(Default, Clone)]
pub struct ComponentInfo {
name: Option<String>,
}
impl Display for ComponentInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(
self.name
.as_ref()
.map(|n| n.as_str())
.unwrap_or_else(|| "unknown"),
)
}
}
impl ComponentInfo {
pub fn new() -> Self {
Self::default()
}
pub fn with_name(&mut self, name: impl Into<String>) -> &mut Self {
self.name = Some(name.into());
self
}
}
impl From<String> for ComponentInfo {
fn from(value: String) -> Self {
Self { name: Some(value) }
}
}
impl From<&str> for ComponentInfo {
fn from(value: &str) -> Self {
Self {
name: Some(value.into()),
}
}
}
/// Trait for implementing MAD components.
///
/// Components represent individual services or tasks that run as part
/// of your application. Each component has its own lifecycle with
/// optional setup and cleanup phases.
///
/// # Example
///
/// ```rust
/// use notmad::{Component, ComponentInfo, MadError};
/// use tokio_util::sync::CancellationToken;
///
/// struct DatabaseConnection {
/// url: String,
/// }
///
/// impl Component for DatabaseConnection {
/// fn info(&self) -> ComponentInfo {
/// "database".into()
/// }
///
/// async fn setup(&self) -> Result<(), MadError> {
/// println!("Connecting to database...");
/// // Initialize connection pool
/// Ok(())
/// }
///
/// async fn run(&self, cancel: CancellationToken) -> Result<(), MadError> {
/// // Keep connection alive, handle queries
/// cancel.cancelled().await;
/// Ok(())
/// }
///
/// async fn close(&self) -> Result<(), MadError> {
/// println!("Closing database connection...");
/// // Clean up resources
/// Ok(())
/// }
/// }
/// ```
pub trait Component: Send + Sync + 'static {
/// Returns an optional name for the component.
///
/// This name is used in logging and error messages to identify
/// which component is being processed.
///
/// # Default
///
/// Returns `None` if not overridden.
fn info(&self) -> ComponentInfo {
ComponentInfo::default()
}
/// Optional setup phase called before the component starts running.
///
/// Use this for initialization tasks like:
/// - Establishing database connections
/// - Loading configuration
/// - Preparing resources
///
/// # Default
///
/// Returns `MadError::SetupNotDefined` which is handled gracefully.
///
/// # Errors
///
/// If setup fails with an error other than `SetupNotDefined`,
/// the entire application will stop before any components start running.
fn setup(&self) -> impl Future<Output = Result<(), MadError>> + Send + '_ {
async { Err(MadError::SetupNotDefined) }
}
/// Main execution phase of the component.
///
/// This method should contain the primary logic of your component.
/// It should respect the cancellation token and shut down gracefully
/// when cancellation is requested.
///
/// # Arguments
///
/// * `cancellation_token` - Signal for graceful shutdown
///
/// # Implementation Guidelines
///
/// - Check `cancellation_token.is_cancelled()` periodically
/// - Use `tokio::select!` with `cancellation_token.cancelled()` for async operations
/// - Clean up resources before returning
///
/// # Errors
///
/// Any error returned will trigger shutdown of all other components.
fn run(
&self,
cancellation_token: CancellationToken,
) -> impl Future<Output = Result<(), MadError>> + Send + '_;
/// Optional cleanup phase called after the component stops.
///
/// Use this for cleanup tasks like:
/// - Flushing buffers
/// - Closing connections
/// - Saving state
///
/// # Default
///
/// Returns `MadError::CloseNotDefined` which is handled gracefully.
///
/// # Errors
///
/// Errors during close are logged but don't prevent other components
/// from closing.
fn close(&self) -> impl Future<Output = Result<(), MadError>> + Send + '_ {
async { Err(MadError::CloseNotDefined) }
}
}
trait AsyncComponent: Send + Sync + 'static {
fn info_async(&self) -> ComponentInfo;
fn setup_async(&self) -> Pin<Box<dyn Future<Output = Result<(), MadError>> + Send + '_>>;
fn run_async(
&self,
cancellation_token: CancellationToken,
) -> Pin<Box<dyn Future<Output = Result<(), MadError>> + Send + '_>>;
fn close_async(&self) -> Pin<Box<dyn Future<Output = Result<(), MadError>> + Send + '_>>;
}
impl<E: Component> AsyncComponent for E {
#[inline(always)]
fn info_async(&self) -> ComponentInfo {
self.info()
}
#[inline(always)]
fn setup_async(&self) -> Pin<Box<dyn Future<Output = Result<(), MadError>> + Send + '_>> {
Box::pin(self.setup())
}
#[inline(always)]
fn run_async(
&self,
cancellation_token: CancellationToken,
) -> Pin<Box<dyn Future<Output = Result<(), MadError>> + Send + '_>> {
Box::pin(self.run(cancellation_token))
}
#[inline(always)]
fn close_async(&self) -> Pin<Box<dyn Future<Output = Result<(), MadError>> + Send + '_>> {
Box::pin(self.close())
}
}
#[derive(Clone)]
pub struct SharedComponent {
component: Arc<dyn AsyncComponent + Send + Sync + 'static>,
}
impl SharedComponent {
#[inline(always)]
pub fn info(&self) -> ComponentInfo {
self.component.info_async()
}
#[inline(always)]
async fn setup(&self) -> Result<(), MadError> {
Err(MadError::SetupNotDefined)
self.component.setup_async().await
}
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError>;
#[inline(always)]
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError> {
self.component.run_async(cancellation_token).await
}
#[inline(always)]
async fn close(&self) -> Result<(), MadError> {
Err(MadError::CloseNotDefined)
self.component.close_async().await
}
}
/// Trait for converting types into components.
///
/// This trait is automatically implemented for all types that implement
/// `Component + Send + Sync + 'static`, allowing them to be added to MAD
/// directly without manual conversion.
///
/// # Example
///
/// ```rust
/// use notmad::{Component, IntoComponent, Mad};
/// # use tokio_util::sync::CancellationToken;
///
/// struct MyService;
///
/// # impl Component for MyService {
/// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) }
/// # }
///
/// // MyService automatically implements IntoComponent
/// Mad::builder()
/// .add(MyService); // Works directly
/// ```
pub trait IntoComponent {
fn into_component(self) -> Arc<dyn Component + Send + Sync + 'static>;
/// Converts self into an Arc-wrapped component.
fn into_component(self) -> SharedComponent;
}
impl<T: Component + Send + Sync + 'static> IntoComponent for T {
fn into_component(self) -> Arc<dyn Component + Send + Sync + 'static> {
Arc::new(self)
impl<T: Component> IntoComponent for T {
fn into_component(self) -> SharedComponent {
SharedComponent {
component: Arc::new(self),
}
}
}
@@ -293,7 +857,6 @@ where
}
}
#[async_trait::async_trait]
impl<F, Fut> Component for ClosureComponent<F, Fut>
where
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
@@ -303,3 +866,130 @@ where
self.execute(cancellation_token).await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_error_chaining_display() {
// Test single error with context chain
let base_error = std::io::Error::new(std::io::ErrorKind::NotFound, "file not found");
let error = anyhow::Error::from(base_error)
.context("failed to read configuration")
.context("unable to initialize database")
.context("service startup failed");
let mad_error = MadError::Inner(error);
let display = format!("{}", mad_error);
// Should display the top-level error message
assert!(display.contains("service startup failed"));
// Test error chain iteration
if let MadError::Inner(ref e) = mad_error {
let chain: Vec<String> = e.chain().map(|c| c.to_string()).collect();
assert_eq!(chain.len(), 4);
assert_eq!(chain[0], "service startup failed");
assert_eq!(chain[1], "unable to initialize database");
assert_eq!(chain[2], "failed to read configuration");
assert_eq!(chain[3], "file not found");
}
}
#[test]
fn test_aggregate_error_display() {
let error1 = MadError::Inner(
anyhow::anyhow!("database connection failed")
.context("failed to connect to PostgreSQL"),
);
let error2 = MadError::Inner(
anyhow::anyhow!("port already in use")
.context("failed to bind to port 8080")
.context("web server initialization failed"),
);
let aggregate = MadError::AggregateError(AggregateError {
errors: vec![error1, error2],
});
let display = format!("{}", aggregate);
// Check that it shows multiple errors
assert!(display.contains("2 component errors occurred"));
assert!(display.contains("[Component 1]"));
assert!(display.contains("[Component 2]"));
// Check that context chains are displayed
assert!(display.contains("failed to connect to PostgreSQL"));
assert!(display.contains("database connection failed"));
assert!(display.contains("web server initialization failed"));
assert!(display.contains("failed to bind to port 8080"));
assert!(display.contains("port already in use"));
}
#[test]
fn test_single_error_aggregate() {
let error = MadError::Inner(anyhow::anyhow!("single error"));
let aggregate = AggregateError {
errors: vec![error],
};
let display = format!("{}", aggregate);
// Single error should be displayed directly
assert!(display.contains("single error"));
assert!(!display.contains("component errors occurred"));
}
#[test]
fn test_error_source_chain() {
let error = MadError::Inner(
anyhow::anyhow!("root cause")
.context("middle layer")
.context("top layer"),
);
// Test that we can access the error chain
if let MadError::Inner(ref e) = error {
let chain: Vec<String> = e.chain().map(|c| c.to_string()).collect();
assert_eq!(chain.len(), 3);
assert_eq!(chain[0], "top layer");
assert_eq!(chain[1], "middle layer");
assert_eq!(chain[2], "root cause");
} else {
panic!("Expected MadError::Inner");
}
}
#[tokio::test]
async fn test_component_error_propagation() {
struct FailingComponent;
impl Component for FailingComponent {
fn info(&self) -> ComponentInfo {
"test-component".into()
}
async fn run(&self, _cancel: CancellationToken) -> Result<(), MadError> {
Err(anyhow::anyhow!("IO error")
.context("failed to open file")
.context("component initialization failed")
.into())
}
}
let result = Mad::builder()
.add(FailingComponent)
.cancellation(Some(std::time::Duration::from_millis(100)))
.run()
.await;
assert!(result.is_err());
let error = result.unwrap_err();
// Check error display
let display = format!("{}", error);
assert!(display.contains("component initialization failed"));
}
}

79
crates/mad/src/waiter.rs Normal file
View File

@@ -0,0 +1,79 @@
//! Waiter components for MAD.
//!
//! This module provides waiter components that simply wait for cancellation
//! without performing any work. Useful for keeping the application alive
//! or as placeholders in conditional component loading.
use tokio_util::sync::CancellationToken;
use crate::{Component, ComponentInfo, IntoComponent, MadError, SharedComponent};
/// A default waiter component that panics if run.
///
/// This is used internally as a placeholder that should never
/// actually be executed.
pub struct DefaultWaiter;
impl Component for DefaultWaiter {
async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), MadError> {
panic!("should never be called");
}
}
/// A wrapper component that waits for cancellation.
///
/// Instead of running the wrapped component's logic, this simply
/// waits for the cancellation token. This is useful for conditionally
/// disabling components while keeping the same structure.
///
/// # Example
///
/// ```rust,ignore
/// use mad::Waiter;
///
/// // Instead of running the service, just wait
/// let waiter = Waiter::new(service.into_component());
/// ```
pub struct Waiter {
comp: SharedComponent,
}
impl Default for Waiter {
fn default() -> Self {
Self {
comp: DefaultWaiter {}.into_component(),
}
}
}
impl Waiter {
/// Creates a new waiter that wraps the given component.
///
/// The wrapped component's name will be used (prefixed with "waiter/"),
/// but its run method will not be called.
pub fn new(c: SharedComponent) -> Self {
Self { comp: c }
}
}
impl Component for Waiter {
/// Returns the name of the waiter, prefixed with "waiter/".
///
/// If the wrapped component has a name, it will be "waiter/{name}".
/// Otherwise, returns "waiter".
fn info(&self) -> ComponentInfo {
match &self.comp.info().name {
Some(name) => format!("waiter/{name}").into(),
None => "waiter".into(),
}
}
/// Waits for cancellation without performing any work.
///
/// This method simply waits for the cancellation token to be triggered,
/// then returns successfully.
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError> {
cancellation_token.cancelled().await;
Ok(())
}
}

View File

@@ -1,7 +1,6 @@
use std::sync::Arc;
use async_trait::async_trait;
use notmad::{Component, Mad};
use notmad::{Component, ComponentInfo, Mad, MadError};
use rand::Rng;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
@@ -9,13 +8,12 @@ use tracing_test::traced_test;
struct NeverEndingRun {}
#[async_trait]
impl Component for NeverEndingRun {
fn name(&self) -> Option<String> {
Some("NeverEndingRun".into())
fn info(&self) -> ComponentInfo {
"NeverEndingRun".into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
let millis_wait = rand::thread_rng().gen_range(50..1000);
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
@@ -137,3 +135,44 @@ async fn test_can_shutdown_gracefully() -> anyhow::Result<()> {
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_component_panics_shutdowns_cleanly() -> anyhow::Result<()> {
let res = Mad::builder()
.add_fn({
move |_cancel| async move {
panic!("my inner panic");
}
})
.add_fn(|cancel| async move {
cancel.cancelled().await;
Ok(())
})
.run()
.await;
let err_content = res.unwrap_err().to_string();
assert!(err_content.contains("component panicked"));
assert!(err_content.contains("my inner panic"));
Ok(())
}
#[test]
fn test_can_easily_transform_error() -> anyhow::Result<()> {
fn fallible() -> anyhow::Result<()> {
Ok(())
}
fn inner() -> Result<(), MadError> {
fallible()?;
Ok(())
}
inner()?;
Ok(())
}

View File

@@ -1,10 +1,12 @@
# yaml-language-server: $schema=https://git.front.kjuulh.io/kjuulh/cuddle/raw/branch/main/schemas/base.json
# yaml-language-server: $schema=https://git.kjuulh.io/kjuulh/cuddle/raw/branch/main/schemas/base.json
base: "git@git.front.kjuulh.io:kjuulh/cuddle-rust-lib-plan.git"
base: "git@git.kjuulh.io:kjuulh/cuddle-rust-lib-plan.git"
vars:
service: "mad"
registry: kasperhermansen
rust:
publish: {}
please:
project:
@@ -12,6 +14,6 @@ please:
repository: "mad"
branch: main
settings:
api_url: "https://git.front.kjuulh.io"
api_url: "https://git.kjuulh.io"
actions:
rust: