1 Commits

Author SHA1 Message Date
cuddle-please
bdcd04716b chore(release): 0.2.1
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2024-08-07 10:09:47 +00:00
19 changed files with 364 additions and 2633 deletions

View File

@@ -6,126 +6,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.10.0] - 2025-11-15
### Added
- implement take errors
## [0.9.0] - 2025-11-15
### Added
- mad not properly surfaces panics
- add publish
- add readme
### Fixed
- *(deps)* update all dependencies (#38)
### Other
- *(deps)* update rust crate tracing-subscriber to v0.3.20 (#37)
## [0.8.1] - 2025-08-09
### Other
- error logging
## [0.8.0] - 2025-08-08
### Added
- add docs
- update readme
## [0.7.5] - 2025-07-24
### Added
- print big inner
### Other
- more error correction
- correct error test to not be as verbose
## [0.7.4] - 2025-07-24
### Added
- cleanup aggregate error for single error
## [0.7.3] - 2025-07-24
### Added
- automatic conversion from anyhow::Error and access to aggregate errors
### Fixed
- *(deps)* update all dependencies (#30)
## [0.7.2] - 2025-06-25
### Added
- add wait
- add conditional, allows adding or waiting for close
### Fixed
- *(deps)* update rust crate async-trait to v0.1.86 (#28)
- *(deps)* update rust crate rand to 0.9.0 (#27)
- *(deps)* update rust crate thiserror to v2.0.11 (#26)
- *(deps)* update all dependencies (#25)
- *(deps)* update rust crate async-trait to v0.1.84 (#24)
- *(deps)* update rust crate thiserror to v2.0.9 (#22)
- *(deps)* update rust crate thiserror to v2.0.8 (#21)
- *(deps)* update rust crate thiserror to v2.0.7 (#20)
- *(deps)* update rust crate thiserror to v2.0.6 (#19)
- *(deps)* update rust crate thiserror to v2.0.5 (#18)
- *(deps)* update rust crate tokio-util to v0.7.13 (#17)
### Other
- chore
- *(deps)* update all dependencies (#29)
- *(deps)* update rust crate anyhow to v1.0.95 (#23)
- *(deps)* update all dependencies (#16)
- *(deps)* update rust crate tracing-subscriber to v0.3.19 (#15)
- *(deps)* update rust crate tracing to v0.1.41 (#13)
## [0.7.1] - 2024-11-24
### Fixed
- make sure to close on final
## [0.7.0] - 2024-11-24
### Added
- actually bubble up errors
### Fixed
- *(deps)* update rust crate thiserror to v2 (#9)
## [0.6.0] - 2024-11-23
### Added
- adding test to make sure we can gracefully shutdown
- make sure to close down properly
## [0.5.0] - 2024-11-19
### Added
- update name
- respect sigterm
- include author
- update with rename
### Docs
- add examples
## [0.4.0] - 2024-08-07
### Added
- add correction
- add small docs
## [0.3.0] - 2024-08-07
### Added
- add add_fn to execute immediate lambdas
## [0.2.1] - 2024-08-07
### Docs

757
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -3,9 +3,10 @@ members = ["crates/*"]
resolver = "2"
[workspace.package]
version = "0.11.0"
version = "0.2.1"
[workspace.dependencies]
mad = { path = "crates/mad" }
anyhow = { version = "1.0.71" }
tokio = { version = "1", features = ["full"] }

169
README.md
View File

@@ -1,32 +1,29 @@
# MAD - Lifecycle Manager for Rust Applications
# MAD
[![Crates.io](https://img.shields.io/crates/v/notmad.svg)](https://crates.io/crates/notmad)
[![Documentation](https://docs.rs/notmad/badge.svg)](https://docs.rs/notmad)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
Mad is a life-cycle manager for long running rust operations.
A simple lifecycle manager for long-running Rust applications. Run multiple services concurrently with graceful shutdown handling.
- Webservers
- Queue bindings
- gRPC servers etc
- Cron runners
## Installation
```toml
[dependencies]
notmad = "0.10.0"
tokio = { version = "1", features = ["full"] }
```
## Quick Start
It is supposed to be the main thing the application runs, and everything from it is spawned and managed by it.
```rust
use notmad::{Component, Mad};
use tokio_util::sync::CancellationToken;
struct WaitServer {}
struct MyService;
#[async_trait]
impl Component for WaitServer {
fn name(&self) -> Option<String> {
Some("NeverEndingRun".into())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
let millis_wait = rand::thread_rng().gen_range(50..1000);
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
impl Component for MyService {
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
println!("Service running...");
cancellation.cancelled().await;
println!("Service stopped");
Ok(())
}
}
@@ -34,133 +31,13 @@ impl Component for MyService {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
Mad::builder()
.add(MyService)
.add(WaitServer {})
.add(WaitServer {})
.add(WaitServer {})
.run()
.await?;
Ok(())
}
```
## Basic Usage
### Axum Web Server with Graceful Shutdown
Here's how to run an Axum server with MAD's graceful shutdown:
```rust
use axum::{Router, routing::get};
use notmad::{Component, ComponentInfo};
use tokio_util::sync::CancellationToken;
struct WebServer {
port: u16,
}
impl Component for WebServer {
fn info(&self) -> ComponentInfo {
format!("WebServer:{}", self.port).into()
}
async fn run(&self, cancel: CancellationToken) -> Result<(), notmad::MadError> {
let app = Router::new().route("/", get(|| async { "Hello, World!" }));
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", self.port))
.await?;
println!("Listening on http://0.0.0.0:{}", self.port);
// Run server with graceful shutdown
axum::serve(listener, app)
.with_graceful_shutdown(async move {
cancel.cancelled().await;
println!("Shutting down server...");
})
.await?;
Ok(())
}
}
```
### Run Multiple Services
```rust
Mad::builder()
.add(WebServer { port: 8080 })
.add(WebServer { port: 8081 })
.run()
.await?;
```
### Use Functions as Components
```rust
Mad::builder()
.add_fn(|cancel| async move {
println!("Running...");
cancel.cancelled().await;
Ok(())
})
.run()
.await?;
```
## Lifecycle Hooks
Components support optional setup and cleanup phases:
```rust
impl Component for DatabaseService {
async fn setup(&self) -> Result<(), notmad::MadError> {
println!("Connecting to database...");
Ok(())
}
async fn run(&self, cancel: CancellationToken) -> Result<(), notmad::MadError> {
cancel.cancelled().await;
Ok(())
}
async fn close(&self) -> Result<(), notmad::MadError> {
println!("Closing database connection...");
Ok(())
}
}
```
## Migration from v0.10
### Breaking Changes
1. **`name()``info()`**: Returns `ComponentInfo` instead of `Option<String>`
```rust
// Before
fn name(&self) -> Option<String> { Some("my-service".into()) }
// After
fn info(&self) -> ComponentInfo { "my-service".into() }
```
2. **No more `async-trait`**: Remove the dependency and `#[async_trait]` attribute
```rust
// Before
#[async_trait]
impl Component for MyService { }
// After
impl Component for MyService { }
```
## Examples
See [examples directory](crates/mad/examples) for complete working examples.
## License
MIT - see [LICENSE](LICENSE)
## Links
- [Documentation](https://docs.rs/notmad)
- [Repository](https://github.com/kjuulh/mad)
- [Crates.io](https://crates.io/crates/notmad)

View File

@@ -1,23 +1,18 @@
[package]
name = "notmad"
name = "mad"
version.workspace = true
description = "notmad is a life-cycle manager for long running rust operations"
license = "MIT"
repository = "https://github.com/kjuulh/mad"
authors = ["kjuulh"]
edition = "2024"
readme = "../../README.md"
edition = "2021"
[dependencies]
anyhow.workspace = true
async-trait = "0.1.81"
futures = "0.3.30"
futures-util = "0.3.30"
rand = "0.10.0"
thiserror = "2.0.0"
rand = "0.8.5"
thiserror = "1.0.63"
tokio.workspace = true
tokio-util = "0.7.11"
tracing.workspace = true
[dev-dependencies]
tracing-subscriber = "0.3.18"
tracing-test = { version = "0.2.5", features = ["no-env-filter"] }

View File

@@ -1,39 +0,0 @@
use notmad::ComponentInfo;
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::Level;
struct WaitServer {}
impl notmad::Component for WaitServer {
fn info(&self) -> ComponentInfo {
"WaitServer".into()
}
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
let millis_wait = rand::thread_rng().gen_range(500..3000);
tracing::debug!("waiting: {}ms", millis_wait);
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_max_level(Level::TRACE)
.init();
notmad::Mad::builder()
.add(WaitServer {})
.add(WaitServer {})
.add(WaitServer {})
.add(WaitServer {})
.run()
.await?;
Ok(())
}

View File

@@ -1,18 +0,0 @@
[package]
name = "mad-comprehensive-example"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "comprehensive"
path = "main.rs"
[dependencies]
notmad = { path = "../.." }
tokio = { version = "1", features = ["full"] }
tokio-util = "0.7"
async-trait = "0.1"
anyhow = "1"
tracing = "0.1"
tracing-subscriber = "0.3"
rand = "0.8"

View File

@@ -1,326 +0,0 @@
//! Comprehensive example demonstrating MAD's full capabilities.
//!
//! This example shows:
//! - Multiple component types (struct, closure, conditional)
//! - Component lifecycle (setup, run, close)
//! - Error handling and propagation
//! - Graceful shutdown with cancellation tokens
//! - Concurrent component execution
use notmad::{Component, ComponentInfo, Mad, MadError};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::time::{Duration, interval};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
/// A web server component that simulates handling HTTP requests.
struct WebServer {
port: u16,
request_count: Arc<AtomicUsize>,
}
impl Component for WebServer {
fn info(&self) -> ComponentInfo {
format!("web-server-{}", self.port).into()
}
async fn setup(&self) -> Result<(), MadError> {
info!("Setting up web server on port {}", self.port);
// In a real application, you might:
// - Bind to the port
// - Set up TLS certificates
// - Initialize middleware
tokio::time::sleep(Duration::from_millis(100)).await;
info!("Web server on port {} is ready", self.port);
Ok(())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
info!("Web server on port {} started", self.port);
let mut interval = interval(Duration::from_secs(1));
while !cancellation.is_cancelled() {
tokio::select! {
_ = cancellation.cancelled() => {
info!("Web server on port {} received shutdown signal", self.port);
break;
}
_ = interval.tick() => {
// Simulate handling requests
let count = self.request_count.fetch_add(1, Ordering::Relaxed);
info!("Server on port {} handled request #{}", self.port, count + 1);
}
}
}
Ok(())
}
async fn close(&self) -> Result<(), MadError> {
info!("Shutting down web server on port {}", self.port);
// In a real application, you might:
// - Drain existing connections
// - Save server state
// - Close database connections
tokio::time::sleep(Duration::from_millis(200)).await;
let total = self.request_count.load(Ordering::Relaxed);
info!(
"Web server on port {} shut down. Total requests handled: {}",
self.port, total
);
Ok(())
}
}
/// A background job processor that simulates processing tasks from a queue.
struct JobProcessor {
queue_name: String,
processing_interval: Duration,
}
impl Component for JobProcessor {
fn info(&self) -> ComponentInfo {
format!("job-processor-{}", self.queue_name).into()
}
async fn setup(&self) -> Result<(), MadError> {
info!("Connecting to queue: {}", self.queue_name);
// Simulate connecting to a message queue
tokio::time::sleep(Duration::from_millis(150)).await;
Ok(())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
info!("Job processor for {} started", self.queue_name);
let mut interval = interval(self.processing_interval);
let mut job_count = 0;
loop {
tokio::select! {
_ = cancellation.cancelled() => {
info!("Job processor for {} stopping...", self.queue_name);
break;
}
_ = interval.tick() => {
job_count += 1;
info!("Processing job #{} from {}", job_count, self.queue_name);
// Simulate job processing
tokio::time::sleep(Duration::from_millis(100)).await;
// Simulate occasional errors (but don't fail the component)
if job_count % 10 == 0 {
warn!("Job #{} from {} required retry", job_count, self.queue_name);
}
}
}
}
info!(
"Job processor for {} processed {} jobs",
self.queue_name, job_count
);
Ok(())
}
async fn close(&self) -> Result<(), MadError> {
info!("Disconnecting from queue: {}", self.queue_name);
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
}
/// A health check component that monitors system health.
struct HealthChecker {
check_interval: Duration,
}
impl Component for HealthChecker {
fn info(&self) -> ComponentInfo {
"health-checker".into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
info!("Health checker started");
let mut interval = interval(self.check_interval);
while !cancellation.is_cancelled() {
tokio::select! {
_ = cancellation.cancelled() => {
info!("Health checker stopping...");
break;
}
_ = interval.tick() => {
// Simulate health checks
let cpu_usage = rand::random::<f32>() * 100.0;
let memory_usage = rand::random::<f32>() * 100.0;
info!("System health: CPU={:.1}%, Memory={:.1}%", cpu_usage, memory_usage);
if cpu_usage > 90.0 {
warn!("High CPU usage detected: {:.1}%", cpu_usage);
}
if memory_usage > 90.0 {
warn!("High memory usage detected: {:.1}%", memory_usage);
}
}
}
}
Ok(())
}
}
/// A component that will fail after some time to demonstrate error handling.
struct FailingComponent {
fail_after: Duration,
}
impl Component for FailingComponent {
fn info(&self) -> ComponentInfo {
"failing-component".into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
info!(
"Failing component started (will fail after {:?})",
self.fail_after
);
tokio::select! {
_ = cancellation.cancelled() => {
info!("Failing component cancelled before failure");
Ok(())
}
_ = tokio::time::sleep(self.fail_after) => {
error!("Failing component encountered an error!");
Err(anyhow::anyhow!("Simulated component failure").into())
}
}
}
}
/// Debug component that logs system status periodically.
struct DebugComponent;
impl Component for DebugComponent {
fn info(&self) -> ComponentInfo {
"debug-component".into()
}
async fn run(&self, cancel: CancellationToken) -> Result<(), MadError> {
info!("Debug mode enabled - verbose logging active");
let mut interval = interval(Duration::from_secs(5));
while !cancel.is_cancelled() {
tokio::select! {
_ = cancel.cancelled() => break,
_ = interval.tick() => {
info!("DEBUG: System running normally");
}
}
}
info!("Debug component shutting down");
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Initialize tracing for logging
tracing_subscriber::fmt()
.with_target(false)
.without_time()
.init();
info!("Starting comprehensive MAD example application");
// Check if we should enable the failing component
let enable_failure_demo = std::env::var("ENABLE_FAILURE").is_ok();
// Check if we should enable debug mode
let debug_mode = std::env::var("DEBUG").is_ok();
// Shared state for demonstration
let request_count = Arc::new(AtomicUsize::new(0));
// Build and run the application
let result = Mad::builder()
// Add web servers
.add(WebServer {
port: 8080,
request_count: request_count.clone(),
})
.add(WebServer {
port: 8081,
request_count: request_count.clone(),
})
// Add job processors
.add(JobProcessor {
queue_name: "high-priority".to_string(),
processing_interval: Duration::from_secs(2),
})
.add(JobProcessor {
queue_name: "low-priority".to_string(),
processing_interval: Duration::from_secs(5),
})
// Add health checker
.add(HealthChecker {
check_interval: Duration::from_secs(3),
})
// Conditionally add a debug component using add_fn
.add_conditional(debug_mode, DebugComponent)
// Conditionally add failing component to demonstrate error handling
.add_conditional(
enable_failure_demo,
FailingComponent {
fail_after: Duration::from_secs(10),
},
)
// Add a simple metrics reporter using add_fn
.add_fn(|cancel: CancellationToken| async move {
info!("Metrics reporter started");
let mut interval = interval(Duration::from_secs(10));
let start = std::time::Instant::now();
while !cancel.is_cancelled() {
tokio::select! {
_ = cancel.cancelled() => break,
_ = interval.tick() => {
let uptime = start.elapsed();
info!("Application uptime: {:?}", uptime);
}
}
}
info!("Metrics reporter stopped");
Ok(())
})
// Configure graceful shutdown timeout
.cancellation(Some(Duration::from_secs(5)))
// Run the application
.run()
.await;
match result {
Ok(()) => {
info!("Application shut down successfully");
Ok(())
}
Err(e) => {
error!("Application failed: {}", e);
// Check if it's an aggregate error with multiple failures
if let MadError::AggregateError(ref agg) = e {
error!("Multiple component failures detected:");
for (i, err) in agg.get_errors().iter().enumerate() {
error!(" {}. {}", i + 1, err);
}
}
Err(e.into())
}
}
}

View File

@@ -1,41 +0,0 @@
use notmad::ComponentInfo;
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::Level;
struct ErrorServer {}
impl notmad::Component for ErrorServer {
fn info(&self) -> ComponentInfo {
"ErrorServer".into()
}
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
let millis_wait = rand::thread_rng().gen_range(500..3000);
tracing::debug!("waiting: {}ms", millis_wait);
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
Err(notmad::MadError::Inner(anyhow::anyhow!("expected error")))
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_max_level(Level::TRACE)
.init();
// Do note that only the first server which returns an error is guaranteed to be handled. This is because if servers don't respect cancellation, they will be dropped
notmad::Mad::builder()
.add(ErrorServer {})
.add(ErrorServer {})
.add(ErrorServer {})
.add(ErrorServer {})
.run()
.await?;
Ok(())
}

View File

@@ -1,66 +0,0 @@
use notmad::ComponentInfo;
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::Level;
struct WaitServer {}
impl notmad::Component for WaitServer {
fn info(&self) -> ComponentInfo {
"WaitServer".into()
}
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
let millis_wait = rand::thread_rng().gen_range(500..3000);
tracing::debug!("waiting: {}ms", millis_wait);
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_max_level(Level::TRACE)
.init();
let item = "some item".to_string();
notmad::Mad::builder()
.add(WaitServer {})
.add_fn(|_cancel| async move {
let millis_wait = 50;
tracing::debug!("waiting: {}ms", millis_wait);
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
Ok(())
})
.add_fn(move |_cancel| {
// I am an actual closure
let item = item.clone();
async move {
let _item = item;
let millis_wait = 50;
tracing::debug!("waiting: {}ms", millis_wait);
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
Ok(())
}
})
.run()
.await?;
Ok(())
}

View File

@@ -1,15 +0,0 @@
[package]
name = "mad-multi-service-example"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "multi_service"
path = "main.rs"
[dependencies]
notmad = { path = "../.." }
tokio = { version = "1", features = ["full"] }
tokio-util = "0.7"
async-trait = "0.1"
anyhow = "1"

View File

@@ -1,210 +0,0 @@
//! Example demonstrating running multiple services with MAD.
//!
//! This example shows how to run a web server, queue processor, and
//! scheduled task together, with graceful shutdown on Ctrl+C.
use notmad::{Component, ComponentInfo, Mad, MadError};
use tokio::time::{Duration, interval};
use tokio_util::sync::CancellationToken;
/// A simple web server component.
///
/// In a real application, this would bind to a port and handle HTTP requests.
/// Here we simulate it by periodically logging that we're "handling" requests.
struct WebServer {
port: u16,
}
impl Component for WebServer {
fn info(&self) -> ComponentInfo {
format!("WebServer:{}", self.port).into()
}
async fn setup(&self) -> Result<(), MadError> {
println!("[{}] Binding to port...", self.info());
// Simulate server setup time
tokio::time::sleep(Duration::from_millis(100)).await;
println!("[{}] Ready to accept connections", self.info());
Ok(())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
println!("[{}] Server started", self.info());
// Simulate handling requests until shutdown
let mut request_id = 0;
let mut interval = interval(Duration::from_secs(2));
while !cancellation.is_cancelled() {
tokio::select! {
_ = cancellation.cancelled() => {
println!("[{}] Shutdown signal received", self.info());
break;
}
_ = interval.tick() => {
request_id += 1;
println!("[{}] Handling request #{}", self.info(), request_id);
}
}
}
Ok(())
}
async fn close(&self) -> Result<(), MadError> {
println!("[{}] Closing connections...", self.info());
// Simulate graceful connection drain
tokio::time::sleep(Duration::from_millis(200)).await;
println!("[{}] Server stopped", self.info());
Ok(())
}
}
/// A queue processor that consumes messages from a queue.
///
/// This simulates processing messages from a message queue like
/// RabbitMQ, Kafka, or AWS SQS.
struct QueueProcessor {
queue_name: String,
}
impl Component for QueueProcessor {
fn info(&self) -> ComponentInfo {
format!("QueueProcessor:{}", self.queue_name).into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
println!("[{}] Started processing", self.info());
let mut message_count = 0;
// Process messages until shutdown
while !cancellation.is_cancelled() {
// Simulate waiting for and processing a message
tokio::select! {
_ = cancellation.cancelled() => {
println!("[{}] Stopping message processing", self.info());
break;
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {
message_count += 1;
println!("[{}] Processed message #{}", self.info(), message_count);
}
}
}
println!(
"[{}] Processed {} messages total",
self.info(),
message_count
);
Ok(())
}
}
/// A scheduled task that runs periodically.
///
/// This could be used for tasks like:
/// - Cleaning up old data
/// - Generating reports
/// - Syncing with external systems
struct ScheduledTask {
task_name: String,
interval_secs: u64,
}
impl Component for ScheduledTask {
fn info(&self) -> ComponentInfo {
format!("ScheduledTask:{}", self.task_name).into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
println!(
"[{}] Scheduled to run every {} seconds",
self.info(),
self.interval_secs
);
let mut interval = interval(Duration::from_secs(self.interval_secs));
let mut run_count = 0;
while !cancellation.is_cancelled() {
tokio::select! {
_ = cancellation.cancelled() => {
println!("[{}] Scheduler stopping", self.info());
break;
}
_ = interval.tick() => {
run_count += 1;
println!("[{}] Executing run #{}", self.info(), run_count);
// Simulate task execution
tokio::time::sleep(Duration::from_millis(500)).await;
println!("[{}] Run #{} completed", self.info(), run_count);
}
}
}
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
println!("Starting multi-service application");
println!("Press Ctrl+C to trigger graceful shutdown");
println!("----------------------------------------");
// Build the application with multiple services
Mad::builder()
// Add a web server on port 8080
.add(WebServer { port: 8080 })
// Add another web server on port 8081 (e.g., admin interface)
.add(WebServer { port: 8081 })
// Add queue processors for different queues
.add(QueueProcessor {
queue_name: "orders".to_string(),
})
.add(QueueProcessor {
queue_name: "notifications".to_string(),
})
// Add scheduled tasks
.add(ScheduledTask {
task_name: "cleanup".to_string(),
interval_secs: 5,
})
.add(ScheduledTask {
task_name: "report_generator".to_string(),
interval_secs: 10,
})
// Add a monitoring component using a closure
.add_fn(|cancel| async move {
println!("[Monitor] Starting system monitor");
let mut interval = interval(Duration::from_secs(15));
while !cancel.is_cancelled() {
tokio::select! {
_ = cancel.cancelled() => {
println!("[Monitor] Stopping monitor");
break;
}
_ = interval.tick() => {
println!("[Monitor] All systems operational");
}
}
}
Ok(())
})
// Set graceful shutdown timeout to 3 seconds
.cancellation(Some(Duration::from_secs(3)))
// Run all components
.run()
.await?;
println!("----------------------------------------");
println!("All services shut down successfully");
Ok(())
}

View File

@@ -1,84 +0,0 @@
use notmad::ComponentInfo;
use tokio_util::sync::CancellationToken;
struct NestedErrorComponent {
name: String,
}
impl notmad::Component for NestedErrorComponent {
fn info(&self) -> ComponentInfo {
self.name.clone().into()
}
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
// Simulate a deeply nested error
let io_error = std::io::Error::new(
std::io::ErrorKind::PermissionDenied,
"access denied to /etc/secret",
);
Err(anyhow::Error::from(io_error)
.context("failed to read configuration file")
.context("unable to initialize database connection pool")
.context(format!("component '{}' startup failed", self.name))
.into())
}
}
struct AnotherFailingComponent;
impl notmad::Component for AnotherFailingComponent {
fn info(&self) -> ComponentInfo {
"another-component".into()
}
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Err(anyhow::anyhow!("network timeout after 30s")
.context("failed to connect to external API")
.context("service health check failed")
.into())
}
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_env_filter("mad=debug")
.init();
let result = notmad::Mad::builder()
.add(NestedErrorComponent {
name: "database-service".into(),
})
.add(AnotherFailingComponent)
.run()
.await;
match result {
Ok(()) => println!("Success!"),
Err(e) => {
eprintln!("\n=== Error occurred ===");
eprintln!("{}", e);
// Also demonstrate how to walk the error chain manually
if let notmad::MadError::AggregateError(ref agg) = e {
eprintln!("\n=== Detailed error chains ===");
for (i, error) in agg.get_errors().iter().enumerate() {
eprintln!("\nComponent {} error chain:", i + 1);
if let notmad::MadError::Inner(inner) = error {
for (j, cause) in inner.chain().enumerate() {
eprintln!(" {}. {}", j + 1, cause);
}
}
}
} else if let notmad::MadError::Inner(ref inner) = e {
eprintln!("\n=== Error chain ===");
for (i, cause) in inner.chain().enumerate() {
eprintln!(" {}. {}", i + 1, cause);
}
}
}
}
}

View File

@@ -1,66 +0,0 @@
use notmad::ComponentInfo;
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::Level;
struct WaitServer {}
impl notmad::Component for WaitServer {
fn info(&self) -> ComponentInfo {
"WaitServer".into()
}
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
let millis_wait = rand::thread_rng().gen_range(500..3000);
tracing::debug!("waiting: {}ms", millis_wait);
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
Ok(())
}
}
struct RespectCancel {}
impl notmad::Component for RespectCancel {
fn info(&self) -> ComponentInfo {
"RespectCancel".into()
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
cancellation.cancelled().await;
tracing::debug!("stopping because job is cancelled");
Ok(())
}
}
struct NeverStopServer {}
impl notmad::Component for NeverStopServer {
fn info(&self) -> ComponentInfo {
"NeverStopServer".into()
}
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(999999999)).await;
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_max_level(Level::TRACE)
.init();
notmad::Mad::builder()
.add(WaitServer {})
.add(NeverStopServer {})
.add(RespectCancel {})
.run()
.await?;
Ok(())
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,79 +0,0 @@
//! Waiter components for MAD.
//!
//! This module provides waiter components that simply wait for cancellation
//! without performing any work. Useful for keeping the application alive
//! or as placeholders in conditional component loading.
use tokio_util::sync::CancellationToken;
use crate::{Component, ComponentInfo, IntoComponent, MadError, SharedComponent};
/// A default waiter component that panics if run.
///
/// This is used internally as a placeholder that should never
/// actually be executed.
pub struct DefaultWaiter;
impl Component for DefaultWaiter {
async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), MadError> {
panic!("should never be called");
}
}
/// A wrapper component that waits for cancellation.
///
/// Instead of running the wrapped component's logic, this simply
/// waits for the cancellation token. This is useful for conditionally
/// disabling components while keeping the same structure.
///
/// # Example
///
/// ```rust,ignore
/// use mad::Waiter;
///
/// // Instead of running the service, just wait
/// let waiter = Waiter::new(service.into_component());
/// ```
pub struct Waiter {
comp: SharedComponent,
}
impl Default for Waiter {
fn default() -> Self {
Self {
comp: DefaultWaiter {}.into_component(),
}
}
}
impl Waiter {
/// Creates a new waiter that wraps the given component.
///
/// The wrapped component's name will be used (prefixed with "waiter/"),
/// but its run method will not be called.
pub fn new(c: SharedComponent) -> Self {
Self { comp: c }
}
}
impl Component for Waiter {
/// Returns the name of the waiter, prefixed with "waiter/".
///
/// If the wrapped component has a name, it will be "waiter/{name}".
/// Otherwise, returns "waiter".
fn info(&self) -> ComponentInfo {
match &self.comp.info().name {
Some(name) => format!("waiter/{name}").into(),
None => "waiter".into(),
}
}
/// Waits for cancellation without performing any work.
///
/// This method simply waits for the cancellation token to be triggered,
/// then returns successfully.
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError> {
cancellation_token.cancelled().await;
Ok(())
}
}

View File

@@ -1,19 +1,18 @@
use std::sync::Arc;
use notmad::{Component, ComponentInfo, Mad, MadError};
use async_trait::async_trait;
use mad::{Component, Mad};
use rand::Rng;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use tracing_test::traced_test;
struct NeverEndingRun {}
#[async_trait]
impl Component for NeverEndingRun {
fn info(&self) -> ComponentInfo {
"NeverEndingRun".into()
fn name(&self) -> Option<String> {
Some("NeverEndingRun".into())
}
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
let millis_wait = rand::thread_rng().gen_range(50..1000);
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
@@ -87,92 +86,3 @@ async fn test_can_run_components() -> anyhow::Result<()> {
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_can_shutdown_gracefully() -> anyhow::Result<()> {
let check = Arc::new(Mutex::new(None));
Mad::builder()
.add_fn({
let check = check.clone();
move |cancel| {
let check = check.clone();
async move {
let start = std::time::SystemTime::now();
tracing::info!("waiting for cancel");
cancel.cancelled().await;
tracing::info!("submitting check");
let mut check = check.lock().await;
let elapsed = start.elapsed().expect("to be able to get elapsed");
*check = Some(elapsed);
tracing::info!("check submitted");
Ok(())
}
}
})
.add_fn(|_| async move {
tracing::info!("starting sleep");
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
tracing::info!("sleep ended");
Ok(())
})
.run()
.await?;
let check = check
.lock()
.await
.expect("to be able to get a duration from cancel");
// We default wait 100 ms for graceful shutdown, and we explicitly wait 100ms in the sleep routine
tracing::info!("check millis: {}", check.as_millis());
assert!(check.as_millis() < 250);
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_component_panics_shutdowns_cleanly() -> anyhow::Result<()> {
let res = Mad::builder()
.add_fn({
move |_cancel| async move {
panic!("my inner panic");
}
})
.add_fn(|cancel| async move {
cancel.cancelled().await;
Ok(())
})
.run()
.await;
let err_content = res.unwrap_err().to_string();
assert!(err_content.contains("component panicked"));
assert!(err_content.contains("my inner panic"));
Ok(())
}
#[test]
fn test_can_easily_transform_error() -> anyhow::Result<()> {
fn fallible() -> anyhow::Result<()> {
Ok(())
}
fn inner() -> Result<(), MadError> {
fallible()?;
Ok(())
}
inner()?;
Ok(())
}

View File

@@ -1,12 +1,10 @@
# yaml-language-server: $schema=https://git.kjuulh.io/kjuulh/cuddle/raw/branch/main/schemas/base.json
# yaml-language-server: $schema=https://git.front.kjuulh.io/kjuulh/cuddle/raw/branch/main/schemas/base.json
base: "git@git.kjuulh.io:kjuulh/cuddle-rust-lib-plan.git"
base: "git@git.front.kjuulh.io:kjuulh/cuddle-rust-lib-plan.git"
vars:
service: "mad"
registry: kasperhermansen
rust:
publish: {}
please:
project:
@@ -14,6 +12,6 @@ please:
repository: "mad"
branch: main
settings:
api_url: "https://git.kjuulh.io"
api_url: "https://git.front.kjuulh.io"
actions:
rust:

View File

@@ -1,3 +0,0 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
}