Compare commits
1 Commits
main
...
3251e7d60d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3251e7d60d |
74
CHANGELOG.md
74
CHANGELOG.md
@@ -6,81 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [0.10.0] - 2025-11-15
|
||||
|
||||
### Added
|
||||
- implement take errors
|
||||
|
||||
## [0.9.0] - 2025-11-15
|
||||
|
||||
### Added
|
||||
- mad not properly surfaces panics
|
||||
- add publish
|
||||
- add readme
|
||||
|
||||
### Fixed
|
||||
- *(deps)* update all dependencies (#38)
|
||||
## [0.7.2] - 2024-12-04
|
||||
|
||||
### Other
|
||||
- *(deps)* update rust crate tracing-subscriber to v0.3.20 (#37)
|
||||
|
||||
## [0.8.1] - 2025-08-09
|
||||
|
||||
### Other
|
||||
- error logging
|
||||
|
||||
## [0.8.0] - 2025-08-08
|
||||
|
||||
### Added
|
||||
- add docs
|
||||
- update readme
|
||||
|
||||
## [0.7.5] - 2025-07-24
|
||||
|
||||
### Added
|
||||
- print big inner
|
||||
|
||||
### Other
|
||||
- more error correction
|
||||
- correct error test to not be as verbose
|
||||
|
||||
## [0.7.4] - 2025-07-24
|
||||
|
||||
### Added
|
||||
- cleanup aggregate error for single error
|
||||
|
||||
## [0.7.3] - 2025-07-24
|
||||
|
||||
### Added
|
||||
- automatic conversion from anyhow::Error and access to aggregate errors
|
||||
|
||||
### Fixed
|
||||
- *(deps)* update all dependencies (#30)
|
||||
|
||||
## [0.7.2] - 2025-06-25
|
||||
|
||||
### Added
|
||||
- add wait
|
||||
- add conditional, allows adding or waiting for close
|
||||
|
||||
### Fixed
|
||||
- *(deps)* update rust crate async-trait to v0.1.86 (#28)
|
||||
- *(deps)* update rust crate rand to 0.9.0 (#27)
|
||||
- *(deps)* update rust crate thiserror to v2.0.11 (#26)
|
||||
- *(deps)* update all dependencies (#25)
|
||||
- *(deps)* update rust crate async-trait to v0.1.84 (#24)
|
||||
- *(deps)* update rust crate thiserror to v2.0.9 (#22)
|
||||
- *(deps)* update rust crate thiserror to v2.0.8 (#21)
|
||||
- *(deps)* update rust crate thiserror to v2.0.7 (#20)
|
||||
- *(deps)* update rust crate thiserror to v2.0.6 (#19)
|
||||
- *(deps)* update rust crate thiserror to v2.0.5 (#18)
|
||||
- *(deps)* update rust crate tokio-util to v0.7.13 (#17)
|
||||
|
||||
### Other
|
||||
- chore
|
||||
|
||||
- *(deps)* update all dependencies (#29)
|
||||
- *(deps)* update rust crate anyhow to v1.0.95 (#23)
|
||||
- *(deps)* update all dependencies (#16)
|
||||
- *(deps)* update rust crate tracing-subscriber to v0.3.19 (#15)
|
||||
- *(deps)* update rust crate tracing to v0.1.41 (#13)
|
||||
|
||||
670
Cargo.lock
generated
670
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -3,9 +3,10 @@ members = ["crates/*"]
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
version = "0.11.0"
|
||||
version = "0.7.2"
|
||||
|
||||
[workspace.dependencies]
|
||||
mad = { path = "crates/mad" }
|
||||
|
||||
anyhow = { version = "1.0.71" }
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
|
||||
169
README.md
169
README.md
@@ -1,32 +1,29 @@
|
||||
# MAD - Lifecycle Manager for Rust Applications
|
||||
# MAD
|
||||
|
||||
[](https://crates.io/crates/notmad)
|
||||
[](https://docs.rs/notmad)
|
||||
[](https://opensource.org/licenses/MIT)
|
||||
Mad is a life-cycle manager for long running rust operations.
|
||||
|
||||
A simple lifecycle manager for long-running Rust applications. Run multiple services concurrently with graceful shutdown handling.
|
||||
- Webservers
|
||||
- Queue bindings
|
||||
- gRPC servers etc
|
||||
- Cron runners
|
||||
|
||||
## Installation
|
||||
|
||||
```toml
|
||||
[dependencies]
|
||||
notmad = "0.10.0"
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
```
|
||||
|
||||
## Quick Start
|
||||
It is supposed to be the main thing the application runs, and everything from it is spawned and managed by it.
|
||||
|
||||
```rust
|
||||
use notmad::{Component, Mad};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
struct WaitServer {}
|
||||
|
||||
struct MyService;
|
||||
#[async_trait]
|
||||
impl Component for WaitServer {
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("NeverEndingRun".into())
|
||||
}
|
||||
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
||||
let millis_wait = rand::thread_rng().gen_range(50..1000);
|
||||
|
||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
||||
|
||||
impl Component for MyService {
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||
println!("Service running...");
|
||||
cancellation.cancelled().await;
|
||||
println!("Service stopped");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -34,133 +31,21 @@ impl Component for MyService {
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
Mad::builder()
|
||||
.add(MyService)
|
||||
.add(WaitServer {})
|
||||
.add(WaitServer {})
|
||||
.add(WaitServer {})
|
||||
.run()
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
## Basic Usage
|
||||
|
||||
### Axum Web Server with Graceful Shutdown
|
||||
|
||||
Here's how to run an Axum server with MAD's graceful shutdown:
|
||||
|
||||
```rust
|
||||
use axum::{Router, routing::get};
|
||||
use notmad::{Component, ComponentInfo};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
struct WebServer {
|
||||
port: u16,
|
||||
}
|
||||
|
||||
impl Component for WebServer {
|
||||
fn info(&self) -> ComponentInfo {
|
||||
format!("WebServer:{}", self.port).into()
|
||||
}
|
||||
|
||||
async fn run(&self, cancel: CancellationToken) -> Result<(), notmad::MadError> {
|
||||
let app = Router::new().route("/", get(|| async { "Hello, World!" }));
|
||||
|
||||
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", self.port))
|
||||
.await?;
|
||||
|
||||
println!("Listening on http://0.0.0.0:{}", self.port);
|
||||
|
||||
// Run server with graceful shutdown
|
||||
axum::serve(listener, app)
|
||||
.with_graceful_shutdown(async move {
|
||||
cancel.cancelled().await;
|
||||
println!("Shutting down server...");
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Run Multiple Services
|
||||
|
||||
```rust
|
||||
Mad::builder()
|
||||
.add(WebServer { port: 8080 })
|
||||
.add(WebServer { port: 8081 })
|
||||
.run()
|
||||
.await?;
|
||||
```
|
||||
|
||||
### Use Functions as Components
|
||||
|
||||
```rust
|
||||
Mad::builder()
|
||||
.add_fn(|cancel| async move {
|
||||
println!("Running...");
|
||||
cancel.cancelled().await;
|
||||
Ok(())
|
||||
})
|
||||
.run()
|
||||
.await?;
|
||||
```
|
||||
|
||||
## Lifecycle Hooks
|
||||
|
||||
Components support optional setup and cleanup phases:
|
||||
|
||||
```rust
|
||||
impl Component for DatabaseService {
|
||||
async fn setup(&self) -> Result<(), notmad::MadError> {
|
||||
println!("Connecting to database...");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run(&self, cancel: CancellationToken) -> Result<(), notmad::MadError> {
|
||||
cancel.cancelled().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn close(&self) -> Result<(), notmad::MadError> {
|
||||
println!("Closing database connection...");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Migration from v0.10
|
||||
|
||||
### Breaking Changes
|
||||
|
||||
1. **`name()` → `info()`**: Returns `ComponentInfo` instead of `Option<String>`
|
||||
```rust
|
||||
// Before
|
||||
fn name(&self) -> Option<String> { Some("my-service".into()) }
|
||||
|
||||
// After
|
||||
fn info(&self) -> ComponentInfo { "my-service".into() }
|
||||
```
|
||||
|
||||
2. **No more `async-trait`**: Remove the dependency and `#[async_trait]` attribute
|
||||
```rust
|
||||
// Before
|
||||
#[async_trait]
|
||||
impl Component for MyService { }
|
||||
|
||||
// After
|
||||
impl Component for MyService { }
|
||||
```
|
||||
|
||||
## Examples
|
||||
|
||||
See [examples directory](crates/mad/examples) for complete working examples.
|
||||
Can be found (here)[crates/mad/examples]
|
||||
|
||||
## License
|
||||
|
||||
MIT - see [LICENSE](LICENSE)
|
||||
|
||||
## Links
|
||||
|
||||
- [Documentation](https://docs.rs/notmad)
|
||||
- [Repository](https://github.com/kjuulh/mad)
|
||||
- [Crates.io](https://crates.io/crates/notmad)
|
||||
- basic
|
||||
- fn
|
||||
- signals
|
||||
- error_log
|
||||
|
||||
@@ -4,15 +4,15 @@ version.workspace = true
|
||||
description = "notmad is a life-cycle manager for long running rust operations"
|
||||
license = "MIT"
|
||||
repository = "https://github.com/kjuulh/mad"
|
||||
authors = ["kjuulh"]
|
||||
edition = "2024"
|
||||
readme = "../../README.md"
|
||||
author = "kjuulh"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
async-trait = "0.1.81"
|
||||
futures = "0.3.30"
|
||||
futures-util = "0.3.30"
|
||||
rand = "0.10.0"
|
||||
rand = "0.8.5"
|
||||
thiserror = "2.0.0"
|
||||
tokio.workspace = true
|
||||
tokio-util = "0.7.11"
|
||||
|
||||
@@ -1,15 +1,16 @@
|
||||
use notmad::ComponentInfo;
|
||||
use async_trait::async_trait;
|
||||
use rand::Rng;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Level;
|
||||
|
||||
struct WaitServer {}
|
||||
#[async_trait]
|
||||
impl notmad::Component for WaitServer {
|
||||
fn info(&self) -> ComponentInfo {
|
||||
"WaitServer".into()
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("WaitServer".into())
|
||||
}
|
||||
|
||||
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||
let millis_wait = rand::thread_rng().gen_range(500..3000);
|
||||
|
||||
tracing::debug!("waiting: {}ms", millis_wait);
|
||||
|
||||
@@ -1,18 +0,0 @@
|
||||
[package]
|
||||
name = "mad-comprehensive-example"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[[bin]]
|
||||
name = "comprehensive"
|
||||
path = "main.rs"
|
||||
|
||||
[dependencies]
|
||||
notmad = { path = "../.." }
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
tokio-util = "0.7"
|
||||
async-trait = "0.1"
|
||||
anyhow = "1"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.3"
|
||||
rand = "0.8"
|
||||
@@ -1,326 +0,0 @@
|
||||
//! Comprehensive example demonstrating MAD's full capabilities.
|
||||
//!
|
||||
//! This example shows:
|
||||
//! - Multiple component types (struct, closure, conditional)
|
||||
//! - Component lifecycle (setup, run, close)
|
||||
//! - Error handling and propagation
|
||||
//! - Graceful shutdown with cancellation tokens
|
||||
//! - Concurrent component execution
|
||||
|
||||
use notmad::{Component, ComponentInfo, Mad, MadError};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use tokio::time::{Duration, interval};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
/// A web server component that simulates handling HTTP requests.
|
||||
struct WebServer {
|
||||
port: u16,
|
||||
request_count: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl Component for WebServer {
|
||||
fn info(&self) -> ComponentInfo {
|
||||
format!("web-server-{}", self.port).into()
|
||||
}
|
||||
|
||||
async fn setup(&self) -> Result<(), MadError> {
|
||||
info!("Setting up web server on port {}", self.port);
|
||||
// In a real application, you might:
|
||||
// - Bind to the port
|
||||
// - Set up TLS certificates
|
||||
// - Initialize middleware
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
info!("Web server on port {} is ready", self.port);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
|
||||
info!("Web server on port {} started", self.port);
|
||||
let mut interval = interval(Duration::from_secs(1));
|
||||
|
||||
while !cancellation.is_cancelled() {
|
||||
tokio::select! {
|
||||
_ = cancellation.cancelled() => {
|
||||
info!("Web server on port {} received shutdown signal", self.port);
|
||||
break;
|
||||
}
|
||||
_ = interval.tick() => {
|
||||
// Simulate handling requests
|
||||
let count = self.request_count.fetch_add(1, Ordering::Relaxed);
|
||||
info!("Server on port {} handled request #{}", self.port, count + 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn close(&self) -> Result<(), MadError> {
|
||||
info!("Shutting down web server on port {}", self.port);
|
||||
// In a real application, you might:
|
||||
// - Drain existing connections
|
||||
// - Save server state
|
||||
// - Close database connections
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
let total = self.request_count.load(Ordering::Relaxed);
|
||||
info!(
|
||||
"Web server on port {} shut down. Total requests handled: {}",
|
||||
self.port, total
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// A background job processor that simulates processing tasks from a queue.
|
||||
struct JobProcessor {
|
||||
queue_name: String,
|
||||
processing_interval: Duration,
|
||||
}
|
||||
|
||||
impl Component for JobProcessor {
|
||||
fn info(&self) -> ComponentInfo {
|
||||
format!("job-processor-{}", self.queue_name).into()
|
||||
}
|
||||
|
||||
async fn setup(&self) -> Result<(), MadError> {
|
||||
info!("Connecting to queue: {}", self.queue_name);
|
||||
// Simulate connecting to a message queue
|
||||
tokio::time::sleep(Duration::from_millis(150)).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
|
||||
info!("Job processor for {} started", self.queue_name);
|
||||
let mut interval = interval(self.processing_interval);
|
||||
let mut job_count = 0;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancellation.cancelled() => {
|
||||
info!("Job processor for {} stopping...", self.queue_name);
|
||||
break;
|
||||
}
|
||||
_ = interval.tick() => {
|
||||
job_count += 1;
|
||||
info!("Processing job #{} from {}", job_count, self.queue_name);
|
||||
|
||||
// Simulate job processing
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
// Simulate occasional errors (but don't fail the component)
|
||||
if job_count % 10 == 0 {
|
||||
warn!("Job #{} from {} required retry", job_count, self.queue_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"Job processor for {} processed {} jobs",
|
||||
self.queue_name, job_count
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn close(&self) -> Result<(), MadError> {
|
||||
info!("Disconnecting from queue: {}", self.queue_name);
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// A health check component that monitors system health.
|
||||
struct HealthChecker {
|
||||
check_interval: Duration,
|
||||
}
|
||||
|
||||
impl Component for HealthChecker {
|
||||
fn info(&self) -> ComponentInfo {
|
||||
"health-checker".into()
|
||||
}
|
||||
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
|
||||
info!("Health checker started");
|
||||
let mut interval = interval(self.check_interval);
|
||||
|
||||
while !cancellation.is_cancelled() {
|
||||
tokio::select! {
|
||||
_ = cancellation.cancelled() => {
|
||||
info!("Health checker stopping...");
|
||||
break;
|
||||
}
|
||||
_ = interval.tick() => {
|
||||
// Simulate health checks
|
||||
let cpu_usage = rand::random::<f32>() * 100.0;
|
||||
let memory_usage = rand::random::<f32>() * 100.0;
|
||||
|
||||
info!("System health: CPU={:.1}%, Memory={:.1}%", cpu_usage, memory_usage);
|
||||
|
||||
if cpu_usage > 90.0 {
|
||||
warn!("High CPU usage detected: {:.1}%", cpu_usage);
|
||||
}
|
||||
if memory_usage > 90.0 {
|
||||
warn!("High memory usage detected: {:.1}%", memory_usage);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// A component that will fail after some time to demonstrate error handling.
|
||||
struct FailingComponent {
|
||||
fail_after: Duration,
|
||||
}
|
||||
|
||||
impl Component for FailingComponent {
|
||||
fn info(&self) -> ComponentInfo {
|
||||
"failing-component".into()
|
||||
}
|
||||
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
|
||||
info!(
|
||||
"Failing component started (will fail after {:?})",
|
||||
self.fail_after
|
||||
);
|
||||
|
||||
tokio::select! {
|
||||
_ = cancellation.cancelled() => {
|
||||
info!("Failing component cancelled before failure");
|
||||
Ok(())
|
||||
}
|
||||
_ = tokio::time::sleep(self.fail_after) => {
|
||||
error!("Failing component encountered an error!");
|
||||
Err(anyhow::anyhow!("Simulated component failure").into())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Debug component that logs system status periodically.
|
||||
struct DebugComponent;
|
||||
|
||||
impl Component for DebugComponent {
|
||||
fn info(&self) -> ComponentInfo {
|
||||
"debug-component".into()
|
||||
}
|
||||
|
||||
async fn run(&self, cancel: CancellationToken) -> Result<(), MadError> {
|
||||
info!("Debug mode enabled - verbose logging active");
|
||||
let mut interval = interval(Duration::from_secs(5));
|
||||
|
||||
while !cancel.is_cancelled() {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => break,
|
||||
_ = interval.tick() => {
|
||||
info!("DEBUG: System running normally");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("Debug component shutting down");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
// Initialize tracing for logging
|
||||
tracing_subscriber::fmt()
|
||||
.with_target(false)
|
||||
.without_time()
|
||||
.init();
|
||||
|
||||
info!("Starting comprehensive MAD example application");
|
||||
|
||||
// Check if we should enable the failing component
|
||||
let enable_failure_demo = std::env::var("ENABLE_FAILURE").is_ok();
|
||||
|
||||
// Check if we should enable debug mode
|
||||
let debug_mode = std::env::var("DEBUG").is_ok();
|
||||
|
||||
// Shared state for demonstration
|
||||
let request_count = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
// Build and run the application
|
||||
let result = Mad::builder()
|
||||
// Add web servers
|
||||
.add(WebServer {
|
||||
port: 8080,
|
||||
request_count: request_count.clone(),
|
||||
})
|
||||
.add(WebServer {
|
||||
port: 8081,
|
||||
request_count: request_count.clone(),
|
||||
})
|
||||
// Add job processors
|
||||
.add(JobProcessor {
|
||||
queue_name: "high-priority".to_string(),
|
||||
processing_interval: Duration::from_secs(2),
|
||||
})
|
||||
.add(JobProcessor {
|
||||
queue_name: "low-priority".to_string(),
|
||||
processing_interval: Duration::from_secs(5),
|
||||
})
|
||||
// Add health checker
|
||||
.add(HealthChecker {
|
||||
check_interval: Duration::from_secs(3),
|
||||
})
|
||||
// Conditionally add a debug component using add_fn
|
||||
.add_conditional(debug_mode, DebugComponent)
|
||||
// Conditionally add failing component to demonstrate error handling
|
||||
.add_conditional(
|
||||
enable_failure_demo,
|
||||
FailingComponent {
|
||||
fail_after: Duration::from_secs(10),
|
||||
},
|
||||
)
|
||||
// Add a simple metrics reporter using add_fn
|
||||
.add_fn(|cancel: CancellationToken| async move {
|
||||
info!("Metrics reporter started");
|
||||
let mut interval = interval(Duration::from_secs(10));
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
while !cancel.is_cancelled() {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => break,
|
||||
_ = interval.tick() => {
|
||||
let uptime = start.elapsed();
|
||||
info!("Application uptime: {:?}", uptime);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("Metrics reporter stopped");
|
||||
Ok(())
|
||||
})
|
||||
// Configure graceful shutdown timeout
|
||||
.cancellation(Some(Duration::from_secs(5)))
|
||||
// Run the application
|
||||
.run()
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(()) => {
|
||||
info!("Application shut down successfully");
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Application failed: {}", e);
|
||||
|
||||
// Check if it's an aggregate error with multiple failures
|
||||
if let MadError::AggregateError(ref agg) = e {
|
||||
error!("Multiple component failures detected:");
|
||||
for (i, err) in agg.get_errors().iter().enumerate() {
|
||||
error!(" {}. {}", i + 1, err);
|
||||
}
|
||||
}
|
||||
|
||||
Err(e.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,15 +1,16 @@
|
||||
use notmad::ComponentInfo;
|
||||
use async_trait::async_trait;
|
||||
use rand::Rng;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Level;
|
||||
|
||||
struct ErrorServer {}
|
||||
#[async_trait]
|
||||
impl notmad::Component for ErrorServer {
|
||||
fn info(&self) -> ComponentInfo {
|
||||
"ErrorServer".into()
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("ErrorServer".into())
|
||||
}
|
||||
|
||||
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||
let millis_wait = rand::thread_rng().gen_range(500..3000);
|
||||
|
||||
tracing::debug!("waiting: {}ms", millis_wait);
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
use notmad::ComponentInfo;
|
||||
use async_trait::async_trait;
|
||||
use rand::Rng;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Level;
|
||||
|
||||
struct WaitServer {}
|
||||
#[async_trait]
|
||||
impl notmad::Component for WaitServer {
|
||||
fn info(&self) -> ComponentInfo {
|
||||
"WaitServer".into()
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("WaitServer".into())
|
||||
}
|
||||
|
||||
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
[package]
|
||||
name = "mad-multi-service-example"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[[bin]]
|
||||
name = "multi_service"
|
||||
path = "main.rs"
|
||||
|
||||
[dependencies]
|
||||
notmad = { path = "../.." }
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
tokio-util = "0.7"
|
||||
async-trait = "0.1"
|
||||
anyhow = "1"
|
||||
@@ -1,210 +0,0 @@
|
||||
//! Example demonstrating running multiple services with MAD.
|
||||
//!
|
||||
//! This example shows how to run a web server, queue processor, and
|
||||
//! scheduled task together, with graceful shutdown on Ctrl+C.
|
||||
|
||||
use notmad::{Component, ComponentInfo, Mad, MadError};
|
||||
use tokio::time::{Duration, interval};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
/// A simple web server component.
|
||||
///
|
||||
/// In a real application, this would bind to a port and handle HTTP requests.
|
||||
/// Here we simulate it by periodically logging that we're "handling" requests.
|
||||
struct WebServer {
|
||||
port: u16,
|
||||
}
|
||||
|
||||
impl Component for WebServer {
|
||||
fn info(&self) -> ComponentInfo {
|
||||
format!("WebServer:{}", self.port).into()
|
||||
}
|
||||
|
||||
async fn setup(&self) -> Result<(), MadError> {
|
||||
println!("[{}] Binding to port...", self.info());
|
||||
// Simulate server setup time
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
println!("[{}] Ready to accept connections", self.info());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
|
||||
println!("[{}] Server started", self.info());
|
||||
|
||||
// Simulate handling requests until shutdown
|
||||
let mut request_id = 0;
|
||||
let mut interval = interval(Duration::from_secs(2));
|
||||
|
||||
while !cancellation.is_cancelled() {
|
||||
tokio::select! {
|
||||
_ = cancellation.cancelled() => {
|
||||
println!("[{}] Shutdown signal received", self.info());
|
||||
break;
|
||||
}
|
||||
_ = interval.tick() => {
|
||||
request_id += 1;
|
||||
println!("[{}] Handling request #{}", self.info(), request_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn close(&self) -> Result<(), MadError> {
|
||||
println!("[{}] Closing connections...", self.info());
|
||||
// Simulate graceful connection drain
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
println!("[{}] Server stopped", self.info());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// A queue processor that consumes messages from a queue.
|
||||
///
|
||||
/// This simulates processing messages from a message queue like
|
||||
/// RabbitMQ, Kafka, or AWS SQS.
|
||||
struct QueueProcessor {
|
||||
queue_name: String,
|
||||
}
|
||||
|
||||
impl Component for QueueProcessor {
|
||||
fn info(&self) -> ComponentInfo {
|
||||
format!("QueueProcessor:{}", self.queue_name).into()
|
||||
}
|
||||
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
|
||||
println!("[{}] Started processing", self.info());
|
||||
|
||||
let mut message_count = 0;
|
||||
|
||||
// Process messages until shutdown
|
||||
while !cancellation.is_cancelled() {
|
||||
// Simulate waiting for and processing a message
|
||||
tokio::select! {
|
||||
_ = cancellation.cancelled() => {
|
||||
println!("[{}] Stopping message processing", self.info());
|
||||
break;
|
||||
}
|
||||
_ = tokio::time::sleep(Duration::from_secs(1)) => {
|
||||
message_count += 1;
|
||||
println!("[{}] Processed message #{}", self.info(), message_count);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!(
|
||||
"[{}] Processed {} messages total",
|
||||
self.info(),
|
||||
message_count
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// A scheduled task that runs periodically.
|
||||
///
|
||||
/// This could be used for tasks like:
|
||||
/// - Cleaning up old data
|
||||
/// - Generating reports
|
||||
/// - Syncing with external systems
|
||||
struct ScheduledTask {
|
||||
task_name: String,
|
||||
interval_secs: u64,
|
||||
}
|
||||
|
||||
impl Component for ScheduledTask {
|
||||
fn info(&self) -> ComponentInfo {
|
||||
format!("ScheduledTask:{}", self.task_name).into()
|
||||
}
|
||||
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
|
||||
println!(
|
||||
"[{}] Scheduled to run every {} seconds",
|
||||
self.info(),
|
||||
self.interval_secs
|
||||
);
|
||||
|
||||
let mut interval = interval(Duration::from_secs(self.interval_secs));
|
||||
let mut run_count = 0;
|
||||
|
||||
while !cancellation.is_cancelled() {
|
||||
tokio::select! {
|
||||
_ = cancellation.cancelled() => {
|
||||
println!("[{}] Scheduler stopping", self.info());
|
||||
break;
|
||||
}
|
||||
_ = interval.tick() => {
|
||||
run_count += 1;
|
||||
println!("[{}] Executing run #{}", self.info(), run_count);
|
||||
|
||||
// Simulate task execution
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
|
||||
println!("[{}] Run #{} completed", self.info(), run_count);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
println!("Starting multi-service application");
|
||||
println!("Press Ctrl+C to trigger graceful shutdown");
|
||||
println!("----------------------------------------");
|
||||
|
||||
// Build the application with multiple services
|
||||
Mad::builder()
|
||||
// Add a web server on port 8080
|
||||
.add(WebServer { port: 8080 })
|
||||
// Add another web server on port 8081 (e.g., admin interface)
|
||||
.add(WebServer { port: 8081 })
|
||||
// Add queue processors for different queues
|
||||
.add(QueueProcessor {
|
||||
queue_name: "orders".to_string(),
|
||||
})
|
||||
.add(QueueProcessor {
|
||||
queue_name: "notifications".to_string(),
|
||||
})
|
||||
// Add scheduled tasks
|
||||
.add(ScheduledTask {
|
||||
task_name: "cleanup".to_string(),
|
||||
interval_secs: 5,
|
||||
})
|
||||
.add(ScheduledTask {
|
||||
task_name: "report_generator".to_string(),
|
||||
interval_secs: 10,
|
||||
})
|
||||
// Add a monitoring component using a closure
|
||||
.add_fn(|cancel| async move {
|
||||
println!("[Monitor] Starting system monitor");
|
||||
let mut interval = interval(Duration::from_secs(15));
|
||||
|
||||
while !cancel.is_cancelled() {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
println!("[Monitor] Stopping monitor");
|
||||
break;
|
||||
}
|
||||
_ = interval.tick() => {
|
||||
println!("[Monitor] All systems operational");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
// Set graceful shutdown timeout to 3 seconds
|
||||
.cancellation(Some(Duration::from_secs(3)))
|
||||
// Run all components
|
||||
.run()
|
||||
.await?;
|
||||
|
||||
println!("----------------------------------------");
|
||||
println!("All services shut down successfully");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,84 +0,0 @@
|
||||
use notmad::ComponentInfo;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
struct NestedErrorComponent {
|
||||
name: String,
|
||||
}
|
||||
|
||||
impl notmad::Component for NestedErrorComponent {
|
||||
fn info(&self) -> ComponentInfo {
|
||||
self.name.clone().into()
|
||||
}
|
||||
|
||||
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||
// Simulate a deeply nested error
|
||||
let io_error = std::io::Error::new(
|
||||
std::io::ErrorKind::PermissionDenied,
|
||||
"access denied to /etc/secret",
|
||||
);
|
||||
|
||||
Err(anyhow::Error::from(io_error)
|
||||
.context("failed to read configuration file")
|
||||
.context("unable to initialize database connection pool")
|
||||
.context(format!("component '{}' startup failed", self.name))
|
||||
.into())
|
||||
}
|
||||
}
|
||||
|
||||
struct AnotherFailingComponent;
|
||||
|
||||
impl notmad::Component for AnotherFailingComponent {
|
||||
fn info(&self) -> ComponentInfo {
|
||||
"another-component".into()
|
||||
}
|
||||
|
||||
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
|
||||
Err(anyhow::anyhow!("network timeout after 30s")
|
||||
.context("failed to connect to external API")
|
||||
.context("service health check failed")
|
||||
.into())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter("mad=debug")
|
||||
.init();
|
||||
|
||||
let result = notmad::Mad::builder()
|
||||
.add(NestedErrorComponent {
|
||||
name: "database-service".into(),
|
||||
})
|
||||
.add(AnotherFailingComponent)
|
||||
.run()
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(()) => println!("Success!"),
|
||||
Err(e) => {
|
||||
eprintln!("\n=== Error occurred ===");
|
||||
eprintln!("{}", e);
|
||||
|
||||
// Also demonstrate how to walk the error chain manually
|
||||
if let notmad::MadError::AggregateError(ref agg) = e {
|
||||
eprintln!("\n=== Detailed error chains ===");
|
||||
for (i, error) in agg.get_errors().iter().enumerate() {
|
||||
eprintln!("\nComponent {} error chain:", i + 1);
|
||||
if let notmad::MadError::Inner(inner) = error {
|
||||
for (j, cause) in inner.chain().enumerate() {
|
||||
eprintln!(" {}. {}", j + 1, cause);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if let notmad::MadError::Inner(ref inner) = e {
|
||||
eprintln!("\n=== Error chain ===");
|
||||
for (i, cause) in inner.chain().enumerate() {
|
||||
eprintln!(" {}. {}", i + 1, cause);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,15 +1,16 @@
|
||||
use notmad::ComponentInfo;
|
||||
use async_trait::async_trait;
|
||||
use rand::Rng;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Level;
|
||||
|
||||
struct WaitServer {}
|
||||
#[async_trait]
|
||||
impl notmad::Component for WaitServer {
|
||||
fn info(&self) -> ComponentInfo {
|
||||
"WaitServer".into()
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("WaitServer".into())
|
||||
}
|
||||
|
||||
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||
let millis_wait = rand::thread_rng().gen_range(500..3000);
|
||||
|
||||
tracing::debug!("waiting: {}ms", millis_wait);
|
||||
@@ -22,9 +23,10 @@ impl notmad::Component for WaitServer {
|
||||
}
|
||||
|
||||
struct RespectCancel {}
|
||||
#[async_trait]
|
||||
impl notmad::Component for RespectCancel {
|
||||
fn info(&self) -> ComponentInfo {
|
||||
"RespectCancel".into()
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("RespectCancel".into())
|
||||
}
|
||||
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||
@@ -36,12 +38,13 @@ impl notmad::Component for RespectCancel {
|
||||
}
|
||||
|
||||
struct NeverStopServer {}
|
||||
#[async_trait]
|
||||
impl notmad::Component for NeverStopServer {
|
||||
fn info(&self) -> ComponentInfo {
|
||||
"NeverStopServer".into()
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("NeverStopServer".into())
|
||||
}
|
||||
|
||||
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||
tokio::time::sleep(std::time::Duration::from_millis(999999999)).await;
|
||||
|
||||
|
||||
@@ -1,193 +1,51 @@
|
||||
//! # MAD - Lifecycle Manager for Rust Applications
|
||||
//!
|
||||
//! A simple lifecycle manager for long-running Rust applications. Run multiple services
|
||||
//! concurrently with graceful shutdown handling.
|
||||
//!
|
||||
//! ## Quick Start
|
||||
//!
|
||||
//! ```rust,no_run
|
||||
//! use notmad::{Component, Mad};
|
||||
//! use tokio_util::sync::CancellationToken;
|
||||
//!
|
||||
//! struct MyService;
|
||||
//!
|
||||
//! impl Component for MyService {
|
||||
//! async fn run(&self, cancel: CancellationToken) -> Result<(), notmad::MadError> {
|
||||
//! println!("Running...");
|
||||
//! cancel.cancelled().await;
|
||||
//! println!("Stopped");
|
||||
//! Ok(())
|
||||
//! }
|
||||
//! }
|
||||
//!
|
||||
//! #[tokio::main]
|
||||
//! async fn main() -> anyhow::Result<()> {
|
||||
//! Mad::builder()
|
||||
//! .add(MyService)
|
||||
//! .run()
|
||||
//! .await?;
|
||||
//! Ok(())
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
//! ## Features
|
||||
//!
|
||||
//! - Run multiple components concurrently
|
||||
//! - Graceful shutdown with cancellation tokens
|
||||
//! - Optional lifecycle hooks: `setup()`, `run()`, `close()`
|
||||
//! - Automatic error aggregation
|
||||
//! - SIGTERM and Ctrl+C signal handling
|
||||
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures_util::StreamExt;
|
||||
use std::{error::Error, fmt::Display, pin::Pin, sync::Arc};
|
||||
use tokio::signal::unix::{SignalKind, signal};
|
||||
use std::{fmt::Display, sync::Arc};
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::waiter::Waiter;
|
||||
|
||||
mod waiter;
|
||||
|
||||
/// Error type for MAD operations.
|
||||
///
|
||||
/// This enum represents all possible errors that can occur during
|
||||
/// the lifecycle of MAD components.
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum MadError {
|
||||
/// Generic error wrapper for anyhow errors.
|
||||
///
|
||||
/// This variant is used when components return errors via the `?` operator
|
||||
/// or when converting from `anyhow::Error`.
|
||||
#[error(transparent)]
|
||||
Inner(anyhow::Error),
|
||||
#[error("component failed: {0}")]
|
||||
Inner(#[source] anyhow::Error),
|
||||
|
||||
/// Error that occurred during the run phase of a component.
|
||||
#[error(transparent)]
|
||||
#[error("component(s) failed: {run}")]
|
||||
RunError { run: anyhow::Error },
|
||||
|
||||
/// Error that occurred during the close phase of a component.
|
||||
#[error("component(s) failed during close")]
|
||||
CloseError {
|
||||
#[source]
|
||||
close: anyhow::Error,
|
||||
},
|
||||
#[error("component(s) failed: {close}")]
|
||||
CloseError { close: anyhow::Error },
|
||||
|
||||
/// Multiple errors from different components.
|
||||
///
|
||||
/// This is used when multiple components fail simultaneously,
|
||||
/// allowing all errors to be reported rather than just the first one.
|
||||
#[error("{0}")]
|
||||
#[error("component(s) failed: {0}")]
|
||||
AggregateError(AggregateError),
|
||||
|
||||
/// Returned when a component doesn't implement the optional setup method.
|
||||
///
|
||||
/// This is not typically an error condition as setup is optional.
|
||||
#[error("setup not defined")]
|
||||
SetupNotDefined,
|
||||
|
||||
/// Returned when a component doesn't implement the optional close method.
|
||||
///
|
||||
/// This is not typically an error condition as close is optional.
|
||||
#[error("close not defined")]
|
||||
CloseNotDefined,
|
||||
}
|
||||
|
||||
impl From<anyhow::Error> for MadError {
|
||||
fn from(value: anyhow::Error) -> Self {
|
||||
Self::Inner(value)
|
||||
}
|
||||
}
|
||||
|
||||
/// Container for multiple errors from different components.
|
||||
///
|
||||
/// When multiple components fail, their errors are collected
|
||||
/// into this struct to provide complete error reporting.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[derive(Debug)]
|
||||
pub struct AggregateError {
|
||||
errors: Vec<MadError>,
|
||||
}
|
||||
|
||||
impl AggregateError {
|
||||
/// Returns a slice of all contained errors.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust,ignore
|
||||
/// match result {
|
||||
/// Err(notmad::MadError::AggregateError(agg)) => {
|
||||
/// for error in agg.get_errors() {
|
||||
/// eprintln!("Component error: {}", error);
|
||||
/// }
|
||||
/// }
|
||||
/// _ => {}
|
||||
/// }
|
||||
/// ```
|
||||
pub fn get_errors(&self) -> &[MadError] {
|
||||
&self.errors
|
||||
}
|
||||
|
||||
pub fn take_errors(self) -> Vec<MadError> {
|
||||
self.errors
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for AggregateError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
if self.errors.is_empty() {
|
||||
return Ok(());
|
||||
f.write_str("MadError::AggregateError: (")?;
|
||||
|
||||
for error in &self.errors {
|
||||
f.write_str(&error.to_string())?;
|
||||
f.write_str(", ")?;
|
||||
}
|
||||
|
||||
if self.errors.len() == 1 {
|
||||
return write!(f, "{}", self.errors[0]);
|
||||
}
|
||||
|
||||
writeln!(f, "{} component errors occurred:", self.errors.len())?;
|
||||
for (i, error) in self.errors.iter().enumerate() {
|
||||
write!(f, "\n[Component {}] {}", i + 1, error)?;
|
||||
|
||||
// Print the error chain for each component error
|
||||
let mut source = error.source();
|
||||
let mut level = 1;
|
||||
while let Some(err) = source {
|
||||
write!(f, "\n {}. {}", level, err)?;
|
||||
source = err.source();
|
||||
level += 1;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
f.write_str(")")
|
||||
}
|
||||
}
|
||||
|
||||
/// The main lifecycle manager for running multiple components.
|
||||
///
|
||||
/// `Mad` orchestrates the lifecycle of multiple components, ensuring they
|
||||
/// start up in order, run concurrently, and shut down gracefully.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// use notmad::{Component, Mad};
|
||||
/// use tokio_util::sync::CancellationToken;
|
||||
///
|
||||
/// struct MyComponent;
|
||||
///
|
||||
/// impl Component for MyComponent {
|
||||
/// async fn run(&self, _cancel: CancellationToken) -> Result<(), notmad::MadError> {
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// # async fn example() -> Result<(), notmad::MadError> {
|
||||
/// Mad::builder()
|
||||
/// .add(MyComponent)
|
||||
/// .run()
|
||||
/// .await?;
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub struct Mad {
|
||||
components: Vec<SharedComponent>,
|
||||
components: Vec<Arc<dyn Component + Send + Sync + 'static>>,
|
||||
|
||||
should_cancel: Option<std::time::Duration>,
|
||||
}
|
||||
@@ -198,18 +56,6 @@ struct CompletionResult {
|
||||
}
|
||||
|
||||
impl Mad {
|
||||
/// Creates a new `Mad` builder.
|
||||
///
|
||||
/// This is the entry point for constructing a MAD application.
|
||||
/// Components are added using the builder pattern before calling `run()`.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// use notmad::Mad;
|
||||
///
|
||||
/// let mut app = Mad::builder();
|
||||
/// ```
|
||||
pub fn builder() -> Self {
|
||||
Self {
|
||||
components: Vec::default(),
|
||||
@@ -218,119 +64,12 @@ impl Mad {
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds a component to the MAD application.
|
||||
///
|
||||
/// Components will be set up in the order they are added,
|
||||
/// run concurrently, and closed in the order they were added.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `component` - Any type that implements `Component` or `IntoComponent`
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// use notmad::{Component, Mad};
|
||||
/// # use tokio_util::sync::CancellationToken;
|
||||
/// # struct MyService;
|
||||
/// # impl Component for MyService {
|
||||
/// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) }
|
||||
/// # }
|
||||
///
|
||||
/// Mad::builder()
|
||||
/// .add(MyService)
|
||||
/// .add(MyService);
|
||||
/// ```
|
||||
pub fn add(&mut self, component: impl IntoComponent) -> &mut Self {
|
||||
self.components.push(component.into_component());
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
/// Conditionally adds a component based on a boolean condition.
|
||||
///
|
||||
/// If the condition is false, a waiter component is added instead,
|
||||
/// which simply waits for cancellation without doing any work.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `condition` - If true, adds the component; if false, adds a waiter
|
||||
/// * `component` - The component to add if condition is true
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// use notmad::Mad;
|
||||
/// # use notmad::Component;
|
||||
/// # use tokio_util::sync::CancellationToken;
|
||||
/// # struct DebugService;
|
||||
/// # impl Component for DebugService {
|
||||
/// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) }
|
||||
/// # }
|
||||
///
|
||||
/// let enable_debug = std::env::var("DEBUG").is_ok();
|
||||
///
|
||||
/// Mad::builder()
|
||||
/// .add_conditional(enable_debug, DebugService);
|
||||
/// ```
|
||||
pub fn add_conditional(&mut self, condition: bool, component: impl IntoComponent) -> &mut Self {
|
||||
if condition {
|
||||
self.components.push(component.into_component());
|
||||
} else {
|
||||
self.components
|
||||
.push(Waiter::new(component.into_component()).into_component())
|
||||
}
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
/// Adds a waiter component that does nothing but wait for cancellation.
|
||||
///
|
||||
/// This is useful when you need a placeholder component or want
|
||||
/// the application to keep running without any specific work.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust,no_run
|
||||
/// # async fn example() {
|
||||
/// use notmad::Mad;
|
||||
///
|
||||
/// Mad::builder()
|
||||
/// .add_wait() // Keeps the app running until shutdown signal
|
||||
/// .run()
|
||||
/// .await;
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn add_wait(&mut self) -> &mut Self {
|
||||
self.components.push(Waiter::default().into_component());
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
/// Adds a closure or function as a component.
|
||||
///
|
||||
/// This is a convenient way to add simple components without
|
||||
/// creating a full struct that implements `Component`.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `f` - A closure that takes a `CancellationToken` and returns a future
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// use notmad::Mad;
|
||||
/// use tokio_util::sync::CancellationToken;
|
||||
///
|
||||
/// Mad::builder()
|
||||
/// .add_fn(|cancel: CancellationToken| async move {
|
||||
/// while !cancel.is_cancelled() {
|
||||
/// println!("Working...");
|
||||
/// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
/// }
|
||||
/// Ok(())
|
||||
/// });
|
||||
/// ```
|
||||
pub fn add_fn<F, Fut>(&mut self, f: F) -> &mut Self
|
||||
where
|
||||
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
|
||||
@@ -341,63 +80,12 @@ impl Mad {
|
||||
self.add(comp)
|
||||
}
|
||||
|
||||
/// Configures the cancellation timeout behavior.
|
||||
///
|
||||
/// When a shutdown signal is received, MAD will:
|
||||
/// 1. Send cancellation tokens to all components
|
||||
/// 2. Wait for the specified duration
|
||||
/// 3. Force shutdown if components haven't stopped
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `should_cancel` - Duration to wait after cancellation before forcing shutdown.
|
||||
/// Pass `None` to wait indefinitely.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust,no_run
|
||||
/// # async fn example() {
|
||||
/// use notmad::Mad;
|
||||
/// use std::time::Duration;
|
||||
///
|
||||
/// Mad::builder()
|
||||
/// .cancellation(Some(Duration::from_secs(30))) // 30 second grace period
|
||||
/// .run()
|
||||
/// .await;
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn cancellation(&mut self, should_cancel: Option<std::time::Duration>) -> &mut Self {
|
||||
self.should_cancel = should_cancel;
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
/// Runs all components until completion or shutdown.
|
||||
///
|
||||
/// This method:
|
||||
/// 1. Calls `setup()` on all components (in order)
|
||||
/// 2. Starts all components concurrently
|
||||
/// 3. Waits for shutdown signal (SIGTERM, Ctrl+C) or component failure
|
||||
/// 4. Sends cancellation to all components
|
||||
/// 5. Calls `close()` on all components (in order)
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `Ok(())` if all components shut down cleanly
|
||||
/// * `Err(MadError)` if any component fails
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust,no_run
|
||||
/// # use notmad::Mad;
|
||||
/// # async fn example() -> Result<(), notmad::MadError> {
|
||||
/// Mad::builder()
|
||||
/// .add_wait()
|
||||
/// .run()
|
||||
/// .await?;
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub async fn run(&mut self) -> Result<(), MadError> {
|
||||
tracing::info!("running mad setup");
|
||||
|
||||
@@ -412,7 +100,7 @@ impl Mad {
|
||||
(Err(run), Err(close)) => {
|
||||
return Err(MadError::AggregateError(AggregateError {
|
||||
errors: vec![run, close],
|
||||
}));
|
||||
}))
|
||||
}
|
||||
(Ok(_), Ok(_)) => {}
|
||||
(Ok(_), Err(close)) => return Err(close),
|
||||
@@ -426,7 +114,7 @@ impl Mad {
|
||||
tracing::debug!("setting up components");
|
||||
|
||||
for comp in &self.components {
|
||||
tracing::trace!(component = %comp.info(), "mad setting up");
|
||||
tracing::trace!(component = &comp.name(), "mad setting up");
|
||||
|
||||
match comp.setup().await {
|
||||
Ok(_) | Err(MadError::SetupNotDefined) => {}
|
||||
@@ -456,37 +144,16 @@ impl Mad {
|
||||
channels.push(error_rx);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let info = comp.info().clone();
|
||||
let name = comp.name().clone();
|
||||
|
||||
tracing::debug!(component = %info, "mad running");
|
||||
|
||||
let handle = tokio::spawn(async move { comp.run(job_cancellation).await });
|
||||
tracing::debug!(component = name, "mad running");
|
||||
|
||||
tokio::select! {
|
||||
_ = cancellation_token.cancelled() => {
|
||||
error_tx.send(CompletionResult { res: Ok(()) , name: info.name }).await
|
||||
error_tx.send(CompletionResult { res: Ok(()) , name }).await
|
||||
}
|
||||
res = handle => {
|
||||
let res = match res {
|
||||
Ok(res) => res,
|
||||
Err(join) => {
|
||||
match join.source() {
|
||||
Some(error) => {
|
||||
Err(MadError::RunError{run: anyhow::anyhow!("component aborted: {:?}", error)})
|
||||
},
|
||||
None => {
|
||||
if join.is_panic(){
|
||||
Err(MadError::RunError { run: anyhow::anyhow!("component panicked: {}", join) })
|
||||
} else {
|
||||
Err(MadError::RunError { run: anyhow::anyhow!("component faced unknown error: {}", join) })
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
|
||||
error_tx.send(CompletionResult { res , name: info.name }).await
|
||||
res = comp.run(job_cancellation) => {
|
||||
error_tx.send(CompletionResult { res , name }).await
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -563,7 +230,7 @@ impl Mad {
|
||||
tracing::debug!("closing components");
|
||||
|
||||
for comp in &self.components {
|
||||
tracing::trace!(component = %comp.info(), "mad closing");
|
||||
tracing::trace!(component = &comp.name(), "mad closing");
|
||||
match comp.close().await {
|
||||
Ok(_) | Err(MadError::CloseNotDefined) => {}
|
||||
Err(e) => return Err(e),
|
||||
@@ -581,259 +248,30 @@ async fn signal_unix_terminate() {
|
||||
sigterm.recv().await;
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
pub struct ComponentInfo {
|
||||
name: Option<String>,
|
||||
#[async_trait::async_trait]
|
||||
pub trait Component {
|
||||
fn name(&self) -> Option<String> {
|
||||
None
|
||||
}
|
||||
|
||||
impl Display for ComponentInfo {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str(
|
||||
self.name
|
||||
.as_ref()
|
||||
.map(|n| n.as_str())
|
||||
.unwrap_or_else(|| "unknown"),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl ComponentInfo {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub fn with_name(&mut self, name: impl Into<String>) -> &mut Self {
|
||||
self.name = Some(name.into());
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl From<String> for ComponentInfo {
|
||||
fn from(value: String) -> Self {
|
||||
Self { name: Some(value) }
|
||||
}
|
||||
}
|
||||
impl From<&str> for ComponentInfo {
|
||||
fn from(value: &str) -> Self {
|
||||
Self {
|
||||
name: Some(value.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Trait for implementing MAD components.
|
||||
///
|
||||
/// Components represent individual services or tasks that run as part
|
||||
/// of your application. Each component has its own lifecycle with
|
||||
/// optional setup and cleanup phases.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// use notmad::{Component, ComponentInfo, MadError};
|
||||
/// use tokio_util::sync::CancellationToken;
|
||||
///
|
||||
/// struct DatabaseConnection {
|
||||
/// url: String,
|
||||
/// }
|
||||
///
|
||||
/// impl Component for DatabaseConnection {
|
||||
/// fn info(&self) -> ComponentInfo {
|
||||
/// "database".into()
|
||||
/// }
|
||||
///
|
||||
/// async fn setup(&self) -> Result<(), MadError> {
|
||||
/// println!("Connecting to database...");
|
||||
/// // Initialize connection pool
|
||||
/// Ok(())
|
||||
/// }
|
||||
///
|
||||
/// async fn run(&self, cancel: CancellationToken) -> Result<(), MadError> {
|
||||
/// // Keep connection alive, handle queries
|
||||
/// cancel.cancelled().await;
|
||||
/// Ok(())
|
||||
/// }
|
||||
///
|
||||
/// async fn close(&self) -> Result<(), MadError> {
|
||||
/// println!("Closing database connection...");
|
||||
/// // Clean up resources
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
pub trait Component: Send + Sync + 'static {
|
||||
/// Returns an optional name for the component.
|
||||
///
|
||||
/// This name is used in logging and error messages to identify
|
||||
/// which component is being processed.
|
||||
///
|
||||
/// # Default
|
||||
///
|
||||
/// Returns `None` if not overridden.
|
||||
fn info(&self) -> ComponentInfo {
|
||||
ComponentInfo::default()
|
||||
}
|
||||
|
||||
/// Optional setup phase called before the component starts running.
|
||||
///
|
||||
/// Use this for initialization tasks like:
|
||||
/// - Establishing database connections
|
||||
/// - Loading configuration
|
||||
/// - Preparing resources
|
||||
///
|
||||
/// # Default
|
||||
///
|
||||
/// Returns `MadError::SetupNotDefined` which is handled gracefully.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// If setup fails with an error other than `SetupNotDefined`,
|
||||
/// the entire application will stop before any components start running.
|
||||
fn setup(&self) -> impl Future<Output = Result<(), MadError>> + Send + '_ {
|
||||
async { Err(MadError::SetupNotDefined) }
|
||||
}
|
||||
|
||||
/// Main execution phase of the component.
|
||||
///
|
||||
/// This method should contain the primary logic of your component.
|
||||
/// It should respect the cancellation token and shut down gracefully
|
||||
/// when cancellation is requested.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `cancellation_token` - Signal for graceful shutdown
|
||||
///
|
||||
/// # Implementation Guidelines
|
||||
///
|
||||
/// - Check `cancellation_token.is_cancelled()` periodically
|
||||
/// - Use `tokio::select!` with `cancellation_token.cancelled()` for async operations
|
||||
/// - Clean up resources before returning
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Any error returned will trigger shutdown of all other components.
|
||||
fn run(
|
||||
&self,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> impl Future<Output = Result<(), MadError>> + Send + '_;
|
||||
|
||||
/// Optional cleanup phase called after the component stops.
|
||||
///
|
||||
/// Use this for cleanup tasks like:
|
||||
/// - Flushing buffers
|
||||
/// - Closing connections
|
||||
/// - Saving state
|
||||
///
|
||||
/// # Default
|
||||
///
|
||||
/// Returns `MadError::CloseNotDefined` which is handled gracefully.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Errors during close are logged but don't prevent other components
|
||||
/// from closing.
|
||||
fn close(&self) -> impl Future<Output = Result<(), MadError>> + Send + '_ {
|
||||
async { Err(MadError::CloseNotDefined) }
|
||||
}
|
||||
}
|
||||
|
||||
trait AsyncComponent: Send + Sync + 'static {
|
||||
fn info_async(&self) -> ComponentInfo;
|
||||
|
||||
fn setup_async(&self) -> Pin<Box<dyn Future<Output = Result<(), MadError>> + Send + '_>>;
|
||||
|
||||
fn run_async(
|
||||
&self,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Pin<Box<dyn Future<Output = Result<(), MadError>> + Send + '_>>;
|
||||
|
||||
fn close_async(&self) -> Pin<Box<dyn Future<Output = Result<(), MadError>> + Send + '_>>;
|
||||
}
|
||||
|
||||
impl<E: Component> AsyncComponent for E {
|
||||
#[inline(always)]
|
||||
fn info_async(&self) -> ComponentInfo {
|
||||
self.info()
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn setup_async(&self) -> Pin<Box<dyn Future<Output = Result<(), MadError>> + Send + '_>> {
|
||||
Box::pin(self.setup())
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn run_async(
|
||||
&self,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Pin<Box<dyn Future<Output = Result<(), MadError>> + Send + '_>> {
|
||||
Box::pin(self.run(cancellation_token))
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn close_async(&self) -> Pin<Box<dyn Future<Output = Result<(), MadError>> + Send + '_>> {
|
||||
Box::pin(self.close())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SharedComponent {
|
||||
component: Arc<dyn AsyncComponent + Send + Sync + 'static>,
|
||||
}
|
||||
|
||||
impl SharedComponent {
|
||||
#[inline(always)]
|
||||
pub fn info(&self) -> ComponentInfo {
|
||||
self.component.info_async()
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
async fn setup(&self) -> Result<(), MadError> {
|
||||
self.component.setup_async().await
|
||||
Err(MadError::SetupNotDefined)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError> {
|
||||
self.component.run_async(cancellation_token).await
|
||||
}
|
||||
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError>;
|
||||
|
||||
#[inline(always)]
|
||||
async fn close(&self) -> Result<(), MadError> {
|
||||
self.component.close_async().await
|
||||
Err(MadError::CloseNotDefined)
|
||||
}
|
||||
}
|
||||
|
||||
/// Trait for converting types into components.
|
||||
///
|
||||
/// This trait is automatically implemented for all types that implement
|
||||
/// `Component + Send + Sync + 'static`, allowing them to be added to MAD
|
||||
/// directly without manual conversion.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// use notmad::{Component, IntoComponent, Mad};
|
||||
/// # use tokio_util::sync::CancellationToken;
|
||||
///
|
||||
/// struct MyService;
|
||||
///
|
||||
/// # impl Component for MyService {
|
||||
/// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) }
|
||||
/// # }
|
||||
///
|
||||
/// // MyService automatically implements IntoComponent
|
||||
/// Mad::builder()
|
||||
/// .add(MyService); // Works directly
|
||||
/// ```
|
||||
pub trait IntoComponent {
|
||||
/// Converts self into an Arc-wrapped component.
|
||||
fn into_component(self) -> SharedComponent;
|
||||
fn into_component(self) -> Arc<dyn Component + Send + Sync + 'static>;
|
||||
}
|
||||
|
||||
impl<T: Component> IntoComponent for T {
|
||||
fn into_component(self) -> SharedComponent {
|
||||
SharedComponent {
|
||||
component: Arc::new(self),
|
||||
}
|
||||
impl<T: Component + Send + Sync + 'static> IntoComponent for T {
|
||||
fn into_component(self) -> Arc<dyn Component + Send + Sync + 'static> {
|
||||
Arc::new(self)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -857,6 +295,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<F, Fut> Component for ClosureComponent<F, Fut>
|
||||
where
|
||||
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
|
||||
@@ -866,130 +305,3 @@ where
|
||||
self.execute(cancellation_token).await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_error_chaining_display() {
|
||||
// Test single error with context chain
|
||||
let base_error = std::io::Error::new(std::io::ErrorKind::NotFound, "file not found");
|
||||
let error = anyhow::Error::from(base_error)
|
||||
.context("failed to read configuration")
|
||||
.context("unable to initialize database")
|
||||
.context("service startup failed");
|
||||
|
||||
let mad_error = MadError::Inner(error);
|
||||
let display = format!("{}", mad_error);
|
||||
|
||||
// Should display the top-level error message
|
||||
assert!(display.contains("service startup failed"));
|
||||
|
||||
// Test error chain iteration
|
||||
if let MadError::Inner(ref e) = mad_error {
|
||||
let chain: Vec<String> = e.chain().map(|c| c.to_string()).collect();
|
||||
assert_eq!(chain.len(), 4);
|
||||
assert_eq!(chain[0], "service startup failed");
|
||||
assert_eq!(chain[1], "unable to initialize database");
|
||||
assert_eq!(chain[2], "failed to read configuration");
|
||||
assert_eq!(chain[3], "file not found");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_aggregate_error_display() {
|
||||
let error1 = MadError::Inner(
|
||||
anyhow::anyhow!("database connection failed")
|
||||
.context("failed to connect to PostgreSQL"),
|
||||
);
|
||||
|
||||
let error2 = MadError::Inner(
|
||||
anyhow::anyhow!("port already in use")
|
||||
.context("failed to bind to port 8080")
|
||||
.context("web server initialization failed"),
|
||||
);
|
||||
|
||||
let aggregate = MadError::AggregateError(AggregateError {
|
||||
errors: vec![error1, error2],
|
||||
});
|
||||
|
||||
let display = format!("{}", aggregate);
|
||||
|
||||
// Check that it shows multiple errors
|
||||
assert!(display.contains("2 component errors occurred"));
|
||||
assert!(display.contains("[Component 1]"));
|
||||
assert!(display.contains("[Component 2]"));
|
||||
|
||||
// Check that context chains are displayed
|
||||
assert!(display.contains("failed to connect to PostgreSQL"));
|
||||
assert!(display.contains("database connection failed"));
|
||||
assert!(display.contains("web server initialization failed"));
|
||||
assert!(display.contains("failed to bind to port 8080"));
|
||||
assert!(display.contains("port already in use"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_single_error_aggregate() {
|
||||
let error = MadError::Inner(anyhow::anyhow!("single error"));
|
||||
let aggregate = AggregateError {
|
||||
errors: vec![error],
|
||||
};
|
||||
|
||||
let display = format!("{}", aggregate);
|
||||
// Single error should be displayed directly
|
||||
assert!(display.contains("single error"));
|
||||
assert!(!display.contains("component errors occurred"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_error_source_chain() {
|
||||
let error = MadError::Inner(
|
||||
anyhow::anyhow!("root cause")
|
||||
.context("middle layer")
|
||||
.context("top layer"),
|
||||
);
|
||||
|
||||
// Test that we can access the error chain
|
||||
if let MadError::Inner(ref e) = error {
|
||||
let chain: Vec<String> = e.chain().map(|c| c.to_string()).collect();
|
||||
assert_eq!(chain.len(), 3);
|
||||
assert_eq!(chain[0], "top layer");
|
||||
assert_eq!(chain[1], "middle layer");
|
||||
assert_eq!(chain[2], "root cause");
|
||||
} else {
|
||||
panic!("Expected MadError::Inner");
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_component_error_propagation() {
|
||||
struct FailingComponent;
|
||||
|
||||
impl Component for FailingComponent {
|
||||
fn info(&self) -> ComponentInfo {
|
||||
"test-component".into()
|
||||
}
|
||||
|
||||
async fn run(&self, _cancel: CancellationToken) -> Result<(), MadError> {
|
||||
Err(anyhow::anyhow!("IO error")
|
||||
.context("failed to open file")
|
||||
.context("component initialization failed")
|
||||
.into())
|
||||
}
|
||||
}
|
||||
|
||||
let result = Mad::builder()
|
||||
.add(FailingComponent)
|
||||
.cancellation(Some(std::time::Duration::from_millis(100)))
|
||||
.run()
|
||||
.await;
|
||||
|
||||
assert!(result.is_err());
|
||||
let error = result.unwrap_err();
|
||||
|
||||
// Check error display
|
||||
let display = format!("{}", error);
|
||||
assert!(display.contains("component initialization failed"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,79 +0,0 @@
|
||||
//! Waiter components for MAD.
|
||||
//!
|
||||
//! This module provides waiter components that simply wait for cancellation
|
||||
//! without performing any work. Useful for keeping the application alive
|
||||
//! or as placeholders in conditional component loading.
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::{Component, ComponentInfo, IntoComponent, MadError, SharedComponent};
|
||||
|
||||
/// A default waiter component that panics if run.
|
||||
///
|
||||
/// This is used internally as a placeholder that should never
|
||||
/// actually be executed.
|
||||
pub struct DefaultWaiter;
|
||||
impl Component for DefaultWaiter {
|
||||
async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), MadError> {
|
||||
panic!("should never be called");
|
||||
}
|
||||
}
|
||||
|
||||
/// A wrapper component that waits for cancellation.
|
||||
///
|
||||
/// Instead of running the wrapped component's logic, this simply
|
||||
/// waits for the cancellation token. This is useful for conditionally
|
||||
/// disabling components while keeping the same structure.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust,ignore
|
||||
/// use mad::Waiter;
|
||||
///
|
||||
/// // Instead of running the service, just wait
|
||||
/// let waiter = Waiter::new(service.into_component());
|
||||
/// ```
|
||||
pub struct Waiter {
|
||||
comp: SharedComponent,
|
||||
}
|
||||
|
||||
impl Default for Waiter {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
comp: DefaultWaiter {}.into_component(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Waiter {
|
||||
/// Creates a new waiter that wraps the given component.
|
||||
///
|
||||
/// The wrapped component's name will be used (prefixed with "waiter/"),
|
||||
/// but its run method will not be called.
|
||||
pub fn new(c: SharedComponent) -> Self {
|
||||
Self { comp: c }
|
||||
}
|
||||
}
|
||||
|
||||
impl Component for Waiter {
|
||||
/// Returns the name of the waiter, prefixed with "waiter/".
|
||||
///
|
||||
/// If the wrapped component has a name, it will be "waiter/{name}".
|
||||
/// Otherwise, returns "waiter".
|
||||
fn info(&self) -> ComponentInfo {
|
||||
match &self.comp.info().name {
|
||||
Some(name) => format!("waiter/{name}").into(),
|
||||
None => "waiter".into(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Waits for cancellation without performing any work.
|
||||
///
|
||||
/// This method simply waits for the cancellation token to be triggered,
|
||||
/// then returns successfully.
|
||||
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError> {
|
||||
cancellation_token.cancelled().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use notmad::{Component, ComponentInfo, Mad, MadError};
|
||||
use async_trait::async_trait;
|
||||
use notmad::{Component, Mad};
|
||||
use rand::Rng;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -8,12 +9,13 @@ use tracing_test::traced_test;
|
||||
|
||||
struct NeverEndingRun {}
|
||||
|
||||
#[async_trait]
|
||||
impl Component for NeverEndingRun {
|
||||
fn info(&self) -> ComponentInfo {
|
||||
"NeverEndingRun".into()
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("NeverEndingRun".into())
|
||||
}
|
||||
|
||||
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||
let millis_wait = rand::thread_rng().gen_range(50..1000);
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
||||
@@ -135,44 +137,3 @@ async fn test_can_shutdown_gracefully() -> anyhow::Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn test_component_panics_shutdowns_cleanly() -> anyhow::Result<()> {
|
||||
let res = Mad::builder()
|
||||
.add_fn({
|
||||
move |_cancel| async move {
|
||||
panic!("my inner panic");
|
||||
}
|
||||
})
|
||||
.add_fn(|cancel| async move {
|
||||
cancel.cancelled().await;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.run()
|
||||
.await;
|
||||
|
||||
let err_content = res.unwrap_err().to_string();
|
||||
assert!(err_content.contains("component panicked"));
|
||||
assert!(err_content.contains("my inner panic"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_can_easily_transform_error() -> anyhow::Result<()> {
|
||||
fn fallible() -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn inner() -> Result<(), MadError> {
|
||||
fallible()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
inner()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,12 +1,10 @@
|
||||
# yaml-language-server: $schema=https://git.kjuulh.io/kjuulh/cuddle/raw/branch/main/schemas/base.json
|
||||
# yaml-language-server: $schema=https://git.front.kjuulh.io/kjuulh/cuddle/raw/branch/main/schemas/base.json
|
||||
|
||||
base: "git@git.kjuulh.io:kjuulh/cuddle-rust-lib-plan.git"
|
||||
base: "git@git.front.kjuulh.io:kjuulh/cuddle-rust-lib-plan.git"
|
||||
|
||||
vars:
|
||||
service: "mad"
|
||||
registry: kasperhermansen
|
||||
rust:
|
||||
publish: {}
|
||||
|
||||
please:
|
||||
project:
|
||||
@@ -14,6 +12,6 @@ please:
|
||||
repository: "mad"
|
||||
branch: main
|
||||
settings:
|
||||
api_url: "https://git.kjuulh.io"
|
||||
api_url: "https://git.front.kjuulh.io"
|
||||
actions:
|
||||
rust:
|
||||
|
||||
Reference in New Issue
Block a user