BREAKING: name() -> info() and removed async_trait
Some checks failed
continuous-integration/drone/push Build encountered an error

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
2026-02-05 23:20:56 +01:00
parent f0c90edce9
commit 689bfd1325
11 changed files with 240 additions and 244 deletions

210
README.md
View File

@@ -4,165 +4,163 @@
[![Documentation](https://docs.rs/notmad/badge.svg)](https://docs.rs/notmad) [![Documentation](https://docs.rs/notmad/badge.svg)](https://docs.rs/notmad)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
## Overview A simple lifecycle manager for long-running Rust applications. Run multiple services concurrently with graceful shutdown handling.
MAD is a robust lifecycle manager designed for long-running Rust operations. It provides a simple, composable way to manage multiple concurrent services within your application, handling graceful startup and shutdown automatically.
### Perfect for:
- 🌐 Web servers
- 📨 Queue consumers and message processors
- 🔌 gRPC servers
- ⏰ Cron job runners
- 🔄 Background workers
- 📡 Any long-running async operations
## Features
- **Component-based architecture** - Build your application from composable, reusable components
- **Graceful shutdown** - Automatic handling of shutdown signals with proper cleanup
- **Concurrent execution** - Run multiple components in parallel with tokio
- **Error handling** - Built-in error propagation and logging
- **Cancellation tokens** - Coordinate shutdown across all components
- **Minimal boilerplate** - Focus on your business logic, not lifecycle management
## Installation ## Installation
Add MAD to your `Cargo.toml`:
```toml ```toml
[dependencies] [dependencies]
notmad = "0.7.5" notmad = "0.10.0"
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
async-trait = "0.1"
``` ```
## Quick Start ## Quick Start
Here's a simple example of a component that simulates a long-running server:
```rust ```rust
use mad::{Component, Mad}; use notmad::{Component, Mad};
use async_trait::async_trait;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
// Define your component struct MyService;
struct WebServer {
port: u16,
}
#[async_trait]
impl Component for WebServer {
fn name(&self) -> Option<String> {
Some(format!("WebServer on port {}", self.port))
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
println!("Starting web server on port {}", self.port);
// Your server logic here
// The cancellation token will be triggered on shutdown
tokio::select! {
_ = cancellation.cancelled() => {
println!("Shutting down web server");
}
_ = self.serve() => {
println!("Server stopped");
}
}
impl Component for MyService {
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
println!("Service running...");
cancellation.cancelled().await;
println!("Service stopped");
Ok(()) Ok(())
} }
} }
impl WebServer {
async fn serve(&self) {
// Simulate a running server
tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
}
}
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
// Build and run your application
Mad::builder() Mad::builder()
.add(WebServer { port: 8080 }) .add(MyService)
.add(WebServer { port: 8081 }) // You can add multiple instances
.run() .run()
.await?; .await?;
Ok(()) Ok(())
} }
``` ```
## Advanced Usage ## Basic Usage
### Custom Components ### Axum Web Server with Graceful Shutdown
Components can be anything that implements the `Component` trait: Here's how to run an Axum server with MAD's graceful shutdown:
```rust ```rust
use mad::{Component, Mad}; use axum::{Router, routing::get};
use async_trait::async_trait; use notmad::{Component, ComponentInfo};
use tokio_util::sync::CancellationToken;
struct QueueProcessor { struct WebServer {
queue_name: String, port: u16,
} }
#[async_trait] impl Component for WebServer {
impl Component for QueueProcessor { fn info(&self) -> ComponentInfo {
fn name(&self) -> Option<String> { format!("WebServer:{}", self.port).into()
Some(format!("QueueProcessor-{}", self.queue_name))
} }
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> { async fn run(&self, cancel: CancellationToken) -> Result<(), notmad::MadError> {
while !cancellation.is_cancelled() { let app = Router::new().route("/", get(|| async { "Hello, World!" }));
// Process messages from queue
self.process_next_message().await?; let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", self.port))
} .await?;
println!("Listening on http://0.0.0.0:{}", self.port);
// Run server with graceful shutdown
axum::serve(listener, app)
.with_graceful_shutdown(async move {
cancel.cancelled().await;
println!("Shutting down server...");
})
.await?;
Ok(()) Ok(())
} }
} }
``` ```
### Error Handling ### Run Multiple Services
MAD provides comprehensive error handling through the `MadError` type with automatic conversion from `anyhow::Error`:
```rust ```rust
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> { Mad::builder()
// Errors automatically convert from anyhow::Error to MadError .add(WebServer { port: 8080 })
database_operation().await?; .add(WebServer { port: 8081 })
.run()
.await?;
```
// Or return explicit errors ### Use Functions as Components
if some_condition {
return Err(anyhow::anyhow!("Something went wrong").into()); ```rust
Mad::builder()
.add_fn(|cancel| async move {
println!("Running...");
cancel.cancelled().await;
Ok(())
})
.run()
.await?;
```
## Lifecycle Hooks
Components support optional setup and cleanup phases:
```rust
impl Component for DatabaseService {
async fn setup(&self) -> Result<(), notmad::MadError> {
println!("Connecting to database...");
Ok(())
} }
Ok(()) async fn run(&self, cancel: CancellationToken) -> Result<(), notmad::MadError> {
cancel.cancelled().await;
Ok(())
}
async fn close(&self) -> Result<(), notmad::MadError> {
println!("Closing database connection...");
Ok(())
}
} }
``` ```
## Migration from v0.9
### Breaking Changes
1. **`name()``info()`**: Returns `ComponentInfo` instead of `Option<String>`
```rust
// Before
fn name(&self) -> Option<String> { Some("my-service".into()) }
// After
fn info(&self) -> ComponentInfo { "my-service".into() }
```
2. **No more `async-trait`**: Remove the dependency and `#[async_trait]` attribute
```rust
// Before
#[async_trait]
impl Component for MyService { }
// After
impl Component for MyService { }
```
## Examples ## Examples
Check out the [examples directory](crates/mad/examples) for more detailed examples: See [examples directory](crates/mad/examples) for complete working examples.
- **basic** - Simple component lifecycle
- **fn** - Using functions as components
- **signals** - Handling system signals
- **error_log** - Error handling and logging
## Contributing
Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
## License ## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. MIT - see [LICENSE](LICENSE)
## Author ## Links
Created and maintained by [kjuulh](https://github.com/kjuulh) - [Documentation](https://docs.rs/notmad)
- [Repository](https://github.com/kjuulh/mad)
## Repository - [Crates.io](https://crates.io/crates/notmad)
Find the source code at [https://github.com/kjuulh/mad](https://github.com/kjuulh/mad)

View File

@@ -1,11 +1,12 @@
use notmad::ComponentInfo;
use rand::Rng; use rand::Rng;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::Level; use tracing::Level;
struct WaitServer {} struct WaitServer {}
impl notmad::Component for WaitServer { impl notmad::Component for WaitServer {
fn name(&self) -> Option<String> { fn info(&self) -> ComponentInfo {
Some("WaitServer".into()) "WaitServer".into()
} }
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> { async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {

View File

@@ -7,7 +7,7 @@
//! - Graceful shutdown with cancellation tokens //! - Graceful shutdown with cancellation tokens
//! - Concurrent component execution //! - Concurrent component execution
use notmad::{Component, Mad, MadError}; use notmad::{Component, ComponentInfo, Mad, MadError};
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::time::{Duration, interval}; use tokio::time::{Duration, interval};
@@ -21,8 +21,8 @@ struct WebServer {
} }
impl Component for WebServer { impl Component for WebServer {
fn name(&self) -> Option<String> { fn info(&self) -> ComponentInfo {
Some(format!("web-server-{}", self.port)) format!("web-server-{}", self.port).into()
} }
async fn setup(&self) -> Result<(), MadError> { async fn setup(&self) -> Result<(), MadError> {
@@ -80,8 +80,8 @@ struct JobProcessor {
} }
impl Component for JobProcessor { impl Component for JobProcessor {
fn name(&self) -> Option<String> { fn info(&self) -> ComponentInfo {
Some(format!("job-processor-{}", self.queue_name)) format!("job-processor-{}", self.queue_name).into()
} }
async fn setup(&self) -> Result<(), MadError> { async fn setup(&self) -> Result<(), MadError> {
@@ -137,8 +137,8 @@ struct HealthChecker {
} }
impl Component for HealthChecker { impl Component for HealthChecker {
fn name(&self) -> Option<String> { fn info(&self) -> ComponentInfo {
Some("health-checker".to_string()) "health-checker".into()
} }
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> { async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
@@ -178,8 +178,8 @@ struct FailingComponent {
} }
impl Component for FailingComponent { impl Component for FailingComponent {
fn name(&self) -> Option<String> { fn info(&self) -> ComponentInfo {
Some("failing-component".to_string()) "failing-component".into()
} }
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> { async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
@@ -205,8 +205,8 @@ impl Component for FailingComponent {
struct DebugComponent; struct DebugComponent;
impl Component for DebugComponent { impl Component for DebugComponent {
fn name(&self) -> Option<String> { fn info(&self) -> ComponentInfo {
Some("debug-component".to_string()) "debug-component".into()
} }
async fn run(&self, cancel: CancellationToken) -> Result<(), MadError> { async fn run(&self, cancel: CancellationToken) -> Result<(), MadError> {

View File

@@ -1,11 +1,12 @@
use notmad::ComponentInfo;
use rand::Rng; use rand::Rng;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::Level; use tracing::Level;
struct ErrorServer {} struct ErrorServer {}
impl notmad::Component for ErrorServer { impl notmad::Component for ErrorServer {
fn name(&self) -> Option<String> { fn info(&self) -> ComponentInfo {
Some("ErrorServer".into()) "ErrorServer".into()
} }
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> { async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {

View File

@@ -1,11 +1,12 @@
use notmad::ComponentInfo;
use rand::Rng; use rand::Rng;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::Level; use tracing::Level;
struct WaitServer {} struct WaitServer {}
impl notmad::Component for WaitServer { impl notmad::Component for WaitServer {
fn name(&self) -> Option<String> { fn info(&self) -> ComponentInfo {
Some("WaitServer".into()) "WaitServer".into()
} }
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> { async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {

View File

@@ -3,7 +3,7 @@
//! This example shows how to run a web server, queue processor, and //! This example shows how to run a web server, queue processor, and
//! scheduled task together, with graceful shutdown on Ctrl+C. //! scheduled task together, with graceful shutdown on Ctrl+C.
use notmad::{Component, Mad, MadError}; use notmad::{Component, ComponentInfo, Mad, MadError};
use tokio::time::{Duration, interval}; use tokio::time::{Duration, interval};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@@ -16,20 +16,20 @@ struct WebServer {
} }
impl Component for WebServer { impl Component for WebServer {
fn name(&self) -> Option<String> { fn info(&self) -> ComponentInfo {
Some(format!("WebServer:{}", self.port)) format!("WebServer:{}", self.port).into()
} }
async fn setup(&self) -> Result<(), MadError> { async fn setup(&self) -> Result<(), MadError> {
println!("[{}] Binding to port...", self.name().unwrap()); println!("[{}] Binding to port...", self.info());
// Simulate server setup time // Simulate server setup time
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
println!("[{}] Ready to accept connections", self.name().unwrap()); println!("[{}] Ready to accept connections", self.info());
Ok(()) Ok(())
} }
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> { async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
println!("[{}] Server started", self.name().unwrap()); println!("[{}] Server started", self.info());
// Simulate handling requests until shutdown // Simulate handling requests until shutdown
let mut request_id = 0; let mut request_id = 0;
@@ -38,12 +38,12 @@ impl Component for WebServer {
while !cancellation.is_cancelled() { while !cancellation.is_cancelled() {
tokio::select! { tokio::select! {
_ = cancellation.cancelled() => { _ = cancellation.cancelled() => {
println!("[{}] Shutdown signal received", self.name().unwrap()); println!("[{}] Shutdown signal received", self.info());
break; break;
} }
_ = interval.tick() => { _ = interval.tick() => {
request_id += 1; request_id += 1;
println!("[{}] Handling request #{}", self.name().unwrap(), request_id); println!("[{}] Handling request #{}", self.info(), request_id);
} }
} }
} }
@@ -52,10 +52,10 @@ impl Component for WebServer {
} }
async fn close(&self) -> Result<(), MadError> { async fn close(&self) -> Result<(), MadError> {
println!("[{}] Closing connections...", self.name().unwrap()); println!("[{}] Closing connections...", self.info());
// Simulate graceful connection drain // Simulate graceful connection drain
tokio::time::sleep(Duration::from_millis(200)).await; tokio::time::sleep(Duration::from_millis(200)).await;
println!("[{}] Server stopped", self.name().unwrap()); println!("[{}] Server stopped", self.info());
Ok(()) Ok(())
} }
} }
@@ -69,12 +69,12 @@ struct QueueProcessor {
} }
impl Component for QueueProcessor { impl Component for QueueProcessor {
fn name(&self) -> Option<String> { fn info(&self) -> ComponentInfo {
Some(format!("QueueProcessor:{}", self.queue_name)) format!("QueueProcessor:{}", self.queue_name).into()
} }
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> { async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
println!("[{}] Started processing", self.name().unwrap()); println!("[{}] Started processing", self.info());
let mut message_count = 0; let mut message_count = 0;
@@ -83,19 +83,19 @@ impl Component for QueueProcessor {
// Simulate waiting for and processing a message // Simulate waiting for and processing a message
tokio::select! { tokio::select! {
_ = cancellation.cancelled() => { _ = cancellation.cancelled() => {
println!("[{}] Stopping message processing", self.name().unwrap()); println!("[{}] Stopping message processing", self.info());
break; break;
} }
_ = tokio::time::sleep(Duration::from_secs(1)) => { _ = tokio::time::sleep(Duration::from_secs(1)) => {
message_count += 1; message_count += 1;
println!("[{}] Processed message #{}", self.name().unwrap(), message_count); println!("[{}] Processed message #{}", self.info(), message_count);
} }
} }
} }
println!( println!(
"[{}] Processed {} messages total", "[{}] Processed {} messages total",
self.name().unwrap(), self.info(),
message_count message_count
); );
Ok(()) Ok(())
@@ -114,14 +114,14 @@ struct ScheduledTask {
} }
impl Component for ScheduledTask { impl Component for ScheduledTask {
fn name(&self) -> Option<String> { fn info(&self) -> ComponentInfo {
Some(format!("ScheduledTask:{}", self.task_name)) format!("ScheduledTask:{}", self.task_name).into()
} }
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> { async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
println!( println!(
"[{}] Scheduled to run every {} seconds", "[{}] Scheduled to run every {} seconds",
self.name().unwrap(), self.info(),
self.interval_secs self.interval_secs
); );
@@ -131,17 +131,17 @@ impl Component for ScheduledTask {
while !cancellation.is_cancelled() { while !cancellation.is_cancelled() {
tokio::select! { tokio::select! {
_ = cancellation.cancelled() => { _ = cancellation.cancelled() => {
println!("[{}] Scheduler stopping", self.name().unwrap()); println!("[{}] Scheduler stopping", self.info());
break; break;
} }
_ = interval.tick() => { _ = interval.tick() => {
run_count += 1; run_count += 1;
println!("[{}] Executing run #{}", self.name().unwrap(), run_count); println!("[{}] Executing run #{}", self.info(), run_count);
// Simulate task execution // Simulate task execution
tokio::time::sleep(Duration::from_millis(500)).await; tokio::time::sleep(Duration::from_millis(500)).await;
println!("[{}] Run #{} completed", self.name().unwrap(), run_count); println!("[{}] Run #{} completed", self.info(), run_count);
} }
} }
} }

View File

@@ -1,3 +1,4 @@
use notmad::ComponentInfo;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
struct NestedErrorComponent { struct NestedErrorComponent {
@@ -5,8 +6,8 @@ struct NestedErrorComponent {
} }
impl notmad::Component for NestedErrorComponent { impl notmad::Component for NestedErrorComponent {
fn name(&self) -> Option<String> { fn info(&self) -> ComponentInfo {
Some(self.name.clone()) self.name.clone().into()
} }
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> { async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
@@ -27,8 +28,8 @@ impl notmad::Component for NestedErrorComponent {
struct AnotherFailingComponent; struct AnotherFailingComponent;
impl notmad::Component for AnotherFailingComponent { impl notmad::Component for AnotherFailingComponent {
fn name(&self) -> Option<String> { fn info(&self) -> ComponentInfo {
Some("another-component".into()) "another-component".into()
} }
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> { async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {

View File

@@ -1,11 +1,12 @@
use notmad::ComponentInfo;
use rand::Rng; use rand::Rng;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::Level; use tracing::Level;
struct WaitServer {} struct WaitServer {}
impl notmad::Component for WaitServer { impl notmad::Component for WaitServer {
fn name(&self) -> Option<String> { fn info(&self) -> ComponentInfo {
Some("WaitServer".into()) "WaitServer".into()
} }
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> { async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
@@ -22,8 +23,8 @@ impl notmad::Component for WaitServer {
struct RespectCancel {} struct RespectCancel {}
impl notmad::Component for RespectCancel { impl notmad::Component for RespectCancel {
fn name(&self) -> Option<String> { fn info(&self) -> ComponentInfo {
Some("RespectCancel".into()) "RespectCancel".into()
} }
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> { async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
@@ -36,8 +37,8 @@ impl notmad::Component for RespectCancel {
struct NeverStopServer {} struct NeverStopServer {}
impl notmad::Component for NeverStopServer { impl notmad::Component for NeverStopServer {
fn name(&self) -> Option<String> { fn info(&self) -> ComponentInfo {
Some("NeverStopServer".into()) "NeverStopServer".into()
} }
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> { async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {

View File

@@ -1,42 +1,21 @@
//! # MAD - Lifecycle Manager for Rust Applications //! # MAD - Lifecycle Manager for Rust Applications
//! //!
//! MAD is a robust lifecycle manager designed for long-running Rust operations. It provides //! A simple lifecycle manager for long-running Rust applications. Run multiple services
//! a simple, composable way to manage multiple concurrent services within your application, //! concurrently with graceful shutdown handling.
//! handling graceful startup and shutdown automatically.
//!
//! ## Overview
//!
//! MAD helps you build applications composed of multiple long-running components that need
//! to be orchestrated together. It handles:
//!
//! - **Concurrent execution** of multiple components
//! - **Graceful shutdown** with cancellation tokens
//! - **Error aggregation** from multiple components
//! - **Lifecycle management** with setup, run, and close phases
//! //!
//! ## Quick Start //! ## Quick Start
//! //!
//! ```rust,no_run //! ```rust,no_run
//! use notmad::{Component, Mad}; //! use notmad::{Component, Mad};
//! use async_trait::async_trait;
//! use tokio_util::sync::CancellationToken; //! use tokio_util::sync::CancellationToken;
//! //!
//! struct MyService { //! struct MyService;
//! name: String,
//! }
//! //!
//! #[async_trait]
//! impl Component for MyService { //! impl Component for MyService {
//! fn name(&self) -> Option<String> { //! async fn run(&self, cancel: CancellationToken) -> Result<(), notmad::MadError> {
//! Some(self.name.clone()) //! println!("Running...");
//! } //! cancel.cancelled().await;
//! //! println!("Stopped");
//! async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
//! // Your service logic here
//! while !cancellation.is_cancelled() {
//! // Do work...
//! tokio::time::sleep(std::time::Duration::from_secs(1)).await;
//! }
//! Ok(()) //! Ok(())
//! } //! }
//! } //! }
@@ -44,36 +23,20 @@
//! #[tokio::main] //! #[tokio::main]
//! async fn main() -> anyhow::Result<()> { //! async fn main() -> anyhow::Result<()> {
//! Mad::builder() //! Mad::builder()
//! .add(MyService { name: "service-1".into() }) //! .add(MyService)
//! .add(MyService { name: "service-2".into() })
//! .run() //! .run()
//! .await?; //! .await?;
//! Ok(()) //! Ok(())
//! } //! }
//! ``` //! ```
//! //!
//! ## Component Lifecycle //! ## Features
//! //!
//! Components go through three phases: //! - Run multiple components concurrently
//! //! - Graceful shutdown with cancellation tokens
//! 1. **Setup**: Optional initialization phase before components start running //! - Optional lifecycle hooks: `setup()`, `run()`, `close()`
//! 2. **Run**: Main execution phase where components perform their work //! - Automatic error aggregation
//! 3. **Close**: Optional cleanup phase after components stop //! - SIGTERM and Ctrl+C signal handling
//!
//! ## Error Handling
//!
//! MAD provides comprehensive error handling through [`MadError`], which can:
//! - Wrap errors from individual components
//! - Aggregate multiple errors when several components fail
//! - Automatically convert from `anyhow::Error`
//!
//! ## Shutdown Behavior
//!
//! MAD handles shutdown gracefully:
//! - Responds to SIGTERM and Ctrl+C signals
//! - Propagates cancellation tokens to all components
//! - Waits for components to finish cleanup
//! - Configurable cancellation timeout
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures_util::StreamExt; use futures_util::StreamExt;
@@ -205,12 +168,10 @@ impl Display for AggregateError {
/// ///
/// ```rust /// ```rust
/// use notmad::{Component, Mad}; /// use notmad::{Component, Mad};
/// use async_trait::async_trait;
/// use tokio_util::sync::CancellationToken; /// use tokio_util::sync::CancellationToken;
/// ///
/// struct MyComponent; /// struct MyComponent;
/// ///
/// #[async_trait]
/// impl Component for MyComponent { /// impl Component for MyComponent {
/// async fn run(&self, _cancel: CancellationToken) -> Result<(), notmad::MadError> { /// async fn run(&self, _cancel: CancellationToken) -> Result<(), notmad::MadError> {
/// Ok(()) /// Ok(())
@@ -270,10 +231,8 @@ impl Mad {
/// ///
/// ```rust /// ```rust
/// use notmad::{Component, Mad}; /// use notmad::{Component, Mad};
/// # use async_trait::async_trait;
/// # use tokio_util::sync::CancellationToken; /// # use tokio_util::sync::CancellationToken;
/// # struct MyService; /// # struct MyService;
/// # #[async_trait]
/// # impl Component for MyService { /// # impl Component for MyService {
/// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) } /// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) }
/// # } /// # }
@@ -303,10 +262,8 @@ impl Mad {
/// ```rust /// ```rust
/// use notmad::Mad; /// use notmad::Mad;
/// # use notmad::Component; /// # use notmad::Component;
/// # use async_trait::async_trait;
/// # use tokio_util::sync::CancellationToken; /// # use tokio_util::sync::CancellationToken;
/// # struct DebugService; /// # struct DebugService;
/// # #[async_trait]
/// # impl Component for DebugService { /// # impl Component for DebugService {
/// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) } /// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) }
/// # } /// # }
@@ -469,7 +426,7 @@ impl Mad {
tracing::debug!("setting up components"); tracing::debug!("setting up components");
for comp in &self.components { for comp in &self.components {
tracing::trace!(component = &comp.name(), "mad setting up"); tracing::trace!(component = %comp.info(), "mad setting up");
match comp.setup().await { match comp.setup().await {
Ok(_) | Err(MadError::SetupNotDefined) => {} Ok(_) | Err(MadError::SetupNotDefined) => {}
@@ -499,15 +456,15 @@ impl Mad {
channels.push(error_rx); channels.push(error_rx);
tokio::spawn(async move { tokio::spawn(async move {
let name = comp.name().clone(); let info = comp.info().clone();
tracing::debug!(component = name, "mad running"); tracing::debug!(component = %info, "mad running");
let handle = tokio::spawn(async move { comp.run(job_cancellation).await }); let handle = tokio::spawn(async move { comp.run(job_cancellation).await });
tokio::select! { tokio::select! {
_ = cancellation_token.cancelled() => { _ = cancellation_token.cancelled() => {
error_tx.send(CompletionResult { res: Ok(()) , name }).await error_tx.send(CompletionResult { res: Ok(()) , name: info.name }).await
} }
res = handle => { res = handle => {
let res = match res { let res = match res {
@@ -529,7 +486,7 @@ impl Mad {
}; };
error_tx.send(CompletionResult { res , name }).await error_tx.send(CompletionResult { res , name: info.name }).await
} }
} }
}); });
@@ -606,7 +563,7 @@ impl Mad {
tracing::debug!("closing components"); tracing::debug!("closing components");
for comp in &self.components { for comp in &self.components {
tracing::trace!(component = &comp.name(), "mad closing"); tracing::trace!(component = %comp.info(), "mad closing");
match comp.close().await { match comp.close().await {
Ok(_) | Err(MadError::CloseNotDefined) => {} Ok(_) | Err(MadError::CloseNotDefined) => {}
Err(e) => return Err(e), Err(e) => return Err(e),
@@ -624,6 +581,46 @@ async fn signal_unix_terminate() {
sigterm.recv().await; sigterm.recv().await;
} }
#[derive(Default, Clone)]
pub struct ComponentInfo {
name: Option<String>,
}
impl Display for ComponentInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(
self.name
.as_ref()
.map(|n| n.as_str())
.unwrap_or_else(|| "unknown"),
)
}
}
impl ComponentInfo {
pub fn new() -> Self {
Self::default()
}
pub fn with_name(&mut self, name: impl Into<String>) -> &mut Self {
self.name = Some(name.into());
self
}
}
impl From<String> for ComponentInfo {
fn from(value: String) -> Self {
Self { name: Some(value) }
}
}
impl From<&str> for ComponentInfo {
fn from(value: &str) -> Self {
Self {
name: Some(value.into()),
}
}
}
/// Trait for implementing MAD components. /// Trait for implementing MAD components.
/// ///
/// Components represent individual services or tasks that run as part /// Components represent individual services or tasks that run as part
@@ -633,18 +630,16 @@ async fn signal_unix_terminate() {
/// # Example /// # Example
/// ///
/// ```rust /// ```rust
/// use notmad::{Component, MadError}; /// use notmad::{Component, ComponentInfo, MadError};
/// use async_trait::async_trait;
/// use tokio_util::sync::CancellationToken; /// use tokio_util::sync::CancellationToken;
/// ///
/// struct DatabaseConnection { /// struct DatabaseConnection {
/// url: String, /// url: String,
/// } /// }
/// ///
/// #[async_trait]
/// impl Component for DatabaseConnection { /// impl Component for DatabaseConnection {
/// fn name(&self) -> Option<String> { /// fn info(&self) -> ComponentInfo {
/// Some("database".to_string()) /// "database".into()
/// } /// }
/// ///
/// async fn setup(&self) -> Result<(), MadError> { /// async fn setup(&self) -> Result<(), MadError> {
@@ -675,8 +670,8 @@ pub trait Component: Send + Sync + 'static {
/// # Default /// # Default
/// ///
/// Returns `None` if not overridden. /// Returns `None` if not overridden.
fn name(&self) -> Option<String> { fn info(&self) -> ComponentInfo {
None ComponentInfo::default()
} }
/// Optional setup phase called before the component starts running. /// Optional setup phase called before the component starts running.
@@ -743,7 +738,7 @@ pub trait Component: Send + Sync + 'static {
} }
trait AsyncComponent: Send + Sync + 'static { trait AsyncComponent: Send + Sync + 'static {
fn name_async(&self) -> Option<String>; fn info_async(&self) -> ComponentInfo;
fn setup_async(&self) -> Pin<Box<dyn Future<Output = Result<(), MadError>> + Send + '_>>; fn setup_async(&self) -> Pin<Box<dyn Future<Output = Result<(), MadError>> + Send + '_>>;
@@ -757,8 +752,8 @@ trait AsyncComponent: Send + Sync + 'static {
impl<E: Component> AsyncComponent for E { impl<E: Component> AsyncComponent for E {
#[inline(always)] #[inline(always)]
fn name_async(&self) -> Option<String> { fn info_async(&self) -> ComponentInfo {
self.name() self.info()
} }
#[inline(always)] #[inline(always)]
@@ -787,8 +782,8 @@ pub struct SharedComponent {
impl SharedComponent { impl SharedComponent {
#[inline(always)] #[inline(always)]
pub fn name(&self) -> Option<String> { pub fn info(&self) -> ComponentInfo {
self.component.name_async() self.component.info_async()
} }
#[inline(always)] #[inline(always)]
@@ -817,12 +812,10 @@ impl SharedComponent {
/// ///
/// ```rust /// ```rust
/// use notmad::{Component, IntoComponent, Mad}; /// use notmad::{Component, IntoComponent, Mad};
/// # use async_trait::async_trait;
/// # use tokio_util::sync::CancellationToken; /// # use tokio_util::sync::CancellationToken;
/// ///
/// struct MyService; /// struct MyService;
/// ///
/// # #[async_trait]
/// # impl Component for MyService { /// # impl Component for MyService {
/// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) } /// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) }
/// # } /// # }
@@ -974,8 +967,8 @@ mod tests {
struct FailingComponent; struct FailingComponent;
impl Component for FailingComponent { impl Component for FailingComponent {
fn name(&self) -> Option<String> { fn info(&self) -> ComponentInfo {
Some("test-component".to_string()) "test-component".into()
} }
async fn run(&self, _cancel: CancellationToken) -> Result<(), MadError> { async fn run(&self, _cancel: CancellationToken) -> Result<(), MadError> {

View File

@@ -6,7 +6,7 @@
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use crate::{Component, IntoComponent, MadError, SharedComponent}; use crate::{Component, ComponentInfo, IntoComponent, MadError, SharedComponent};
/// A default waiter component that panics if run. /// A default waiter component that panics if run.
/// ///
@@ -60,10 +60,10 @@ impl Component for Waiter {
/// ///
/// If the wrapped component has a name, it will be "waiter/{name}". /// If the wrapped component has a name, it will be "waiter/{name}".
/// Otherwise, returns "waiter". /// Otherwise, returns "waiter".
fn name(&self) -> Option<String> { fn info(&self) -> ComponentInfo {
match self.comp.name() { match &self.comp.info().name {
Some(name) => Some(format!("waiter/{name}")), Some(name) => format!("waiter/{name}").into(),
None => Some("waiter".into()), None => "waiter".into(),
} }
} }

View File

@@ -1,6 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use notmad::{Component, Mad, MadError}; use notmad::{Component, ComponentInfo, Mad, MadError};
use rand::Rng; use rand::Rng;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@@ -9,8 +9,8 @@ use tracing_test::traced_test;
struct NeverEndingRun {} struct NeverEndingRun {}
impl Component for NeverEndingRun { impl Component for NeverEndingRun {
fn name(&self) -> Option<String> { fn info(&self) -> ComponentInfo {
Some("NeverEndingRun".into()) "NeverEndingRun".into()
} }
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> { async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {