Compare commits
75 Commits
v0.2.1
...
3122e4495e
| Author | SHA1 | Date | |
|---|---|---|---|
| 3122e4495e | |||
| ebe5110f6f | |||
| b0f8295a81 | |||
| e9e542b506 | |||
| 92c0c22319 | |||
|
ea46f0f2ac
|
|||
|
689bfd1325
|
|||
|
f0c90edce9
|
|||
| 5e60a272f7 | |||
| 34d609937d | |||
| 094b14c945 | |||
| f6d4f846fc | |||
| 7f3139f4f9 | |||
| 494ec05874 | |||
|
8c0128612f
|
|||
| cbe049b6a2 | |||
|
2d6b14ad77
|
|||
| f777ec9b1e | |||
| 2415088792 | |||
| a35d15edc2 | |||
| 613947ac88 | |||
| 145e067454 | |||
|
82de5b260f
|
|||
| eb360e565c | |||
|
c18c8a885c
|
|||
|
762da1e672
|
|||
| 3bc512ab48 | |||
|
7d8071d41b
|
|||
|
1cc4138ec7
|
|||
|
00517daaaa
|
|||
| b941dc9a76 | |||
|
5da3c83c12
|
|||
| a16bee8e37 | |||
|
a61f00a79d
|
|||
| 2bd9bd7b8e | |||
| c79ff2fde0 | |||
| c29a84d15e | |||
| 01274c1364 | |||
| 9c3f2cb7f7 | |||
| 9489f1a5a8 | |||
| f6aba7fac6 | |||
| 772366e267 | |||
| 1e08ee3dbb | |||
| 78f0c4057a | |||
| cf5d5268f6 | |||
| ce2479f6ca | |||
| 82d4699bca | |||
| 5ab7cae1fe | |||
| f049750e4c | |||
| 0b5f19fc77 | |||
| 14eabdbe82 | |||
| ea568449fe | |||
| 6ec3a6031e | |||
| 0f8fd2343e | |||
| 12c00941b5 | |||
| 72755f9cf1 | |||
|
ae0b8b703e
|
|||
| 3c3f638004 | |||
|
ea5287152c
|
|||
| 14371cdfd7 | |||
| 3a1b1673ef | |||
| 89cbae24d0 | |||
|
7c1b317d08
|
|||
|
1fec4e3708
|
|||
| 0eb24aa937 | |||
|
5c88cdd3e3
|
|||
|
d51716893f
|
|||
|
ff350f9193
|
|||
|
3d774f6d9c
|
|||
|
b78423377c
|
|||
| 1446f4c3cf | |||
|
8a80480d94
|
|||
|
b7b2992730
|
|||
| 61cbec0477 | |||
|
10e2739b6e
|
120
CHANGELOG.md
120
CHANGELOG.md
@@ -6,6 +6,126 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
## [0.10.0] - 2025-11-15
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- implement take errors
|
||||||
|
|
||||||
|
## [0.9.0] - 2025-11-15
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- mad not properly surfaces panics
|
||||||
|
- add publish
|
||||||
|
- add readme
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
- *(deps)* update all dependencies (#38)
|
||||||
|
|
||||||
|
### Other
|
||||||
|
- *(deps)* update rust crate tracing-subscriber to v0.3.20 (#37)
|
||||||
|
|
||||||
|
## [0.8.1] - 2025-08-09
|
||||||
|
|
||||||
|
### Other
|
||||||
|
- error logging
|
||||||
|
|
||||||
|
## [0.8.0] - 2025-08-08
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- add docs
|
||||||
|
- update readme
|
||||||
|
|
||||||
|
## [0.7.5] - 2025-07-24
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- print big inner
|
||||||
|
|
||||||
|
### Other
|
||||||
|
- more error correction
|
||||||
|
- correct error test to not be as verbose
|
||||||
|
|
||||||
|
## [0.7.4] - 2025-07-24
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- cleanup aggregate error for single error
|
||||||
|
|
||||||
|
## [0.7.3] - 2025-07-24
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- automatic conversion from anyhow::Error and access to aggregate errors
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
- *(deps)* update all dependencies (#30)
|
||||||
|
|
||||||
|
## [0.7.2] - 2025-06-25
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- add wait
|
||||||
|
- add conditional, allows adding or waiting for close
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
- *(deps)* update rust crate async-trait to v0.1.86 (#28)
|
||||||
|
- *(deps)* update rust crate rand to 0.9.0 (#27)
|
||||||
|
- *(deps)* update rust crate thiserror to v2.0.11 (#26)
|
||||||
|
- *(deps)* update all dependencies (#25)
|
||||||
|
- *(deps)* update rust crate async-trait to v0.1.84 (#24)
|
||||||
|
- *(deps)* update rust crate thiserror to v2.0.9 (#22)
|
||||||
|
- *(deps)* update rust crate thiserror to v2.0.8 (#21)
|
||||||
|
- *(deps)* update rust crate thiserror to v2.0.7 (#20)
|
||||||
|
- *(deps)* update rust crate thiserror to v2.0.6 (#19)
|
||||||
|
- *(deps)* update rust crate thiserror to v2.0.5 (#18)
|
||||||
|
- *(deps)* update rust crate tokio-util to v0.7.13 (#17)
|
||||||
|
|
||||||
|
### Other
|
||||||
|
- chore
|
||||||
|
|
||||||
|
- *(deps)* update all dependencies (#29)
|
||||||
|
- *(deps)* update rust crate anyhow to v1.0.95 (#23)
|
||||||
|
- *(deps)* update all dependencies (#16)
|
||||||
|
- *(deps)* update rust crate tracing-subscriber to v0.3.19 (#15)
|
||||||
|
- *(deps)* update rust crate tracing to v0.1.41 (#13)
|
||||||
|
|
||||||
|
## [0.7.1] - 2024-11-24
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
- make sure to close on final
|
||||||
|
|
||||||
|
## [0.7.0] - 2024-11-24
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- actually bubble up errors
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
- *(deps)* update rust crate thiserror to v2 (#9)
|
||||||
|
|
||||||
|
## [0.6.0] - 2024-11-23
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- adding test to make sure we can gracefully shutdown
|
||||||
|
- make sure to close down properly
|
||||||
|
|
||||||
|
## [0.5.0] - 2024-11-19
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- update name
|
||||||
|
- respect sigterm
|
||||||
|
- include author
|
||||||
|
- update with rename
|
||||||
|
|
||||||
|
### Docs
|
||||||
|
- add examples
|
||||||
|
|
||||||
|
## [0.4.0] - 2024-08-07
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- add correction
|
||||||
|
- add small docs
|
||||||
|
|
||||||
|
## [0.3.0] - 2024-08-07
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- add add_fn to execute immediate lambdas
|
||||||
|
|
||||||
## [0.2.1] - 2024-08-07
|
## [0.2.1] - 2024-08-07
|
||||||
|
|
||||||
### Docs
|
### Docs
|
||||||
|
|||||||
753
Cargo.lock
generated
753
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -3,10 +3,9 @@ members = ["crates/*"]
|
|||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
version = "0.2.1"
|
version = "0.11.0"
|
||||||
|
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
mad = { path = "crates/mad" }
|
|
||||||
|
|
||||||
anyhow = { version = "1.0.71" }
|
anyhow = { version = "1.0.71" }
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
|
|||||||
169
README.md
169
README.md
@@ -1,29 +1,32 @@
|
|||||||
# MAD
|
# MAD - Lifecycle Manager for Rust Applications
|
||||||
|
|
||||||
Mad is a life-cycle manager for long running rust operations.
|
[](https://crates.io/crates/notmad)
|
||||||
|
[](https://docs.rs/notmad)
|
||||||
|
[](https://opensource.org/licenses/MIT)
|
||||||
|
|
||||||
- Webservers
|
A simple lifecycle manager for long-running Rust applications. Run multiple services concurrently with graceful shutdown handling.
|
||||||
- Queue bindings
|
|
||||||
- gRPC servers etc
|
|
||||||
- Cron runners
|
|
||||||
|
|
||||||
It is supposed to be the main thing the application runs, and everything from it is spawned and managed by it.
|
## Installation
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[dependencies]
|
||||||
|
notmad = "0.10.0"
|
||||||
|
tokio = { version = "1", features = ["full"] }
|
||||||
|
```
|
||||||
|
|
||||||
|
## Quick Start
|
||||||
|
|
||||||
```rust
|
```rust
|
||||||
struct WaitServer {}
|
use notmad::{Component, Mad};
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
#[async_trait]
|
struct MyService;
|
||||||
impl Component for WaitServer {
|
|
||||||
fn name(&self) -> Option<String> {
|
|
||||||
Some("NeverEndingRun".into())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
|
||||||
let millis_wait = rand::thread_rng().gen_range(50..1000);
|
|
||||||
|
|
||||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
|
||||||
|
|
||||||
|
impl Component for MyService {
|
||||||
|
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||||
|
println!("Service running...");
|
||||||
|
cancellation.cancelled().await;
|
||||||
|
println!("Service stopped");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -31,13 +34,133 @@ impl Component for WaitServer {
|
|||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
Mad::builder()
|
Mad::builder()
|
||||||
.add(WaitServer {})
|
.add(MyService)
|
||||||
.add(WaitServer {})
|
|
||||||
.add(WaitServer {})
|
|
||||||
.run()
|
.run()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## Basic Usage
|
||||||
|
|
||||||
|
### Axum Web Server with Graceful Shutdown
|
||||||
|
|
||||||
|
Here's how to run an Axum server with MAD's graceful shutdown:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
use axum::{Router, routing::get};
|
||||||
|
use notmad::{Component, ComponentInfo};
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
|
struct WebServer {
|
||||||
|
port: u16,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Component for WebServer {
|
||||||
|
fn info(&self) -> ComponentInfo {
|
||||||
|
format!("WebServer:{}", self.port).into()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(&self, cancel: CancellationToken) -> Result<(), notmad::MadError> {
|
||||||
|
let app = Router::new().route("/", get(|| async { "Hello, World!" }));
|
||||||
|
|
||||||
|
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", self.port))
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
println!("Listening on http://0.0.0.0:{}", self.port);
|
||||||
|
|
||||||
|
// Run server with graceful shutdown
|
||||||
|
axum::serve(listener, app)
|
||||||
|
.with_graceful_shutdown(async move {
|
||||||
|
cancel.cancelled().await;
|
||||||
|
println!("Shutting down server...");
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Run Multiple Services
|
||||||
|
|
||||||
|
```rust
|
||||||
|
Mad::builder()
|
||||||
|
.add(WebServer { port: 8080 })
|
||||||
|
.add(WebServer { port: 8081 })
|
||||||
|
.run()
|
||||||
|
.await?;
|
||||||
|
```
|
||||||
|
|
||||||
|
### Use Functions as Components
|
||||||
|
|
||||||
|
```rust
|
||||||
|
Mad::builder()
|
||||||
|
.add_fn(|cancel| async move {
|
||||||
|
println!("Running...");
|
||||||
|
cancel.cancelled().await;
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
.run()
|
||||||
|
.await?;
|
||||||
|
```
|
||||||
|
|
||||||
|
## Lifecycle Hooks
|
||||||
|
|
||||||
|
Components support optional setup and cleanup phases:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
impl Component for DatabaseService {
|
||||||
|
async fn setup(&self) -> Result<(), notmad::MadError> {
|
||||||
|
println!("Connecting to database...");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(&self, cancel: CancellationToken) -> Result<(), notmad::MadError> {
|
||||||
|
cancel.cancelled().await;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn close(&self) -> Result<(), notmad::MadError> {
|
||||||
|
println!("Closing database connection...");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Migration from v0.10
|
||||||
|
|
||||||
|
### Breaking Changes
|
||||||
|
|
||||||
|
1. **`name()` → `info()`**: Returns `ComponentInfo` instead of `Option<String>`
|
||||||
|
```rust
|
||||||
|
// Before
|
||||||
|
fn name(&self) -> Option<String> { Some("my-service".into()) }
|
||||||
|
|
||||||
|
// After
|
||||||
|
fn info(&self) -> ComponentInfo { "my-service".into() }
|
||||||
|
```
|
||||||
|
|
||||||
|
2. **No more `async-trait`**: Remove the dependency and `#[async_trait]` attribute
|
||||||
|
```rust
|
||||||
|
// Before
|
||||||
|
#[async_trait]
|
||||||
|
impl Component for MyService { }
|
||||||
|
|
||||||
|
// After
|
||||||
|
impl Component for MyService { }
|
||||||
|
```
|
||||||
|
|
||||||
|
## Examples
|
||||||
|
|
||||||
|
See [examples directory](crates/mad/examples) for complete working examples.
|
||||||
|
|
||||||
|
## License
|
||||||
|
|
||||||
|
MIT - see [LICENSE](LICENSE)
|
||||||
|
|
||||||
|
## Links
|
||||||
|
|
||||||
|
- [Documentation](https://docs.rs/notmad)
|
||||||
|
- [Repository](https://github.com/kjuulh/mad)
|
||||||
|
- [Crates.io](https://crates.io/crates/notmad)
|
||||||
|
|||||||
@@ -1,18 +1,23 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "mad"
|
name = "notmad"
|
||||||
version.workspace = true
|
version.workspace = true
|
||||||
edition = "2021"
|
description = "notmad is a life-cycle manager for long running rust operations"
|
||||||
|
license = "MIT"
|
||||||
|
repository = "https://github.com/kjuulh/mad"
|
||||||
|
authors = ["kjuulh"]
|
||||||
|
edition = "2024"
|
||||||
|
readme = "../../README.md"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow.workspace = true
|
anyhow.workspace = true
|
||||||
async-trait = "0.1.81"
|
|
||||||
futures = "0.3.30"
|
futures = "0.3.30"
|
||||||
futures-util = "0.3.30"
|
futures-util = "0.3.30"
|
||||||
rand = "0.8.5"
|
rand = "0.10.0"
|
||||||
thiserror = "1.0.63"
|
thiserror = "2.0.0"
|
||||||
tokio.workspace = true
|
tokio.workspace = true
|
||||||
tokio-util = "0.7.11"
|
tokio-util = "0.7.11"
|
||||||
tracing.workspace = true
|
tracing.workspace = true
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
tracing-subscriber = "0.3.18"
|
||||||
tracing-test = { version = "0.2.5", features = ["no-env-filter"] }
|
tracing-test = { version = "0.2.5", features = ["no-env-filter"] }
|
||||||
|
|||||||
39
crates/mad/examples/basic/main.rs
Normal file
39
crates/mad/examples/basic/main.rs
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
use notmad::ComponentInfo;
|
||||||
|
use rand::Rng;
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
|
use tracing::Level;
|
||||||
|
|
||||||
|
struct WaitServer {}
|
||||||
|
impl notmad::Component for WaitServer {
|
||||||
|
fn info(&self) -> ComponentInfo {
|
||||||
|
"WaitServer".into()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||||
|
let millis_wait = rand::thread_rng().gen_range(500..3000);
|
||||||
|
|
||||||
|
tracing::debug!("waiting: {}ms", millis_wait);
|
||||||
|
|
||||||
|
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
tracing_subscriber::fmt()
|
||||||
|
.with_max_level(Level::TRACE)
|
||||||
|
.init();
|
||||||
|
|
||||||
|
notmad::Mad::builder()
|
||||||
|
.add(WaitServer {})
|
||||||
|
.add(WaitServer {})
|
||||||
|
.add(WaitServer {})
|
||||||
|
.add(WaitServer {})
|
||||||
|
.run()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
18
crates/mad/examples/comprehensive/Cargo.toml
Normal file
18
crates/mad/examples/comprehensive/Cargo.toml
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
[package]
|
||||||
|
name = "mad-comprehensive-example"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[[bin]]
|
||||||
|
name = "comprehensive"
|
||||||
|
path = "main.rs"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
notmad = { path = "../.." }
|
||||||
|
tokio = { version = "1", features = ["full"] }
|
||||||
|
tokio-util = "0.7"
|
||||||
|
async-trait = "0.1"
|
||||||
|
anyhow = "1"
|
||||||
|
tracing = "0.1"
|
||||||
|
tracing-subscriber = "0.3"
|
||||||
|
rand = "0.8"
|
||||||
326
crates/mad/examples/comprehensive/main.rs
Normal file
326
crates/mad/examples/comprehensive/main.rs
Normal file
@@ -0,0 +1,326 @@
|
|||||||
|
//! Comprehensive example demonstrating MAD's full capabilities.
|
||||||
|
//!
|
||||||
|
//! This example shows:
|
||||||
|
//! - Multiple component types (struct, closure, conditional)
|
||||||
|
//! - Component lifecycle (setup, run, close)
|
||||||
|
//! - Error handling and propagation
|
||||||
|
//! - Graceful shutdown with cancellation tokens
|
||||||
|
//! - Concurrent component execution
|
||||||
|
|
||||||
|
use notmad::{Component, ComponentInfo, Mad, MadError};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
use tokio::time::{Duration, interval};
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
|
use tracing::{error, info, warn};
|
||||||
|
|
||||||
|
/// A web server component that simulates handling HTTP requests.
|
||||||
|
struct WebServer {
|
||||||
|
port: u16,
|
||||||
|
request_count: Arc<AtomicUsize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Component for WebServer {
|
||||||
|
fn info(&self) -> ComponentInfo {
|
||||||
|
format!("web-server-{}", self.port).into()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn setup(&self) -> Result<(), MadError> {
|
||||||
|
info!("Setting up web server on port {}", self.port);
|
||||||
|
// In a real application, you might:
|
||||||
|
// - Bind to the port
|
||||||
|
// - Set up TLS certificates
|
||||||
|
// - Initialize middleware
|
||||||
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
info!("Web server on port {} is ready", self.port);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
|
||||||
|
info!("Web server on port {} started", self.port);
|
||||||
|
let mut interval = interval(Duration::from_secs(1));
|
||||||
|
|
||||||
|
while !cancellation.is_cancelled() {
|
||||||
|
tokio::select! {
|
||||||
|
_ = cancellation.cancelled() => {
|
||||||
|
info!("Web server on port {} received shutdown signal", self.port);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
_ = interval.tick() => {
|
||||||
|
// Simulate handling requests
|
||||||
|
let count = self.request_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
info!("Server on port {} handled request #{}", self.port, count + 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn close(&self) -> Result<(), MadError> {
|
||||||
|
info!("Shutting down web server on port {}", self.port);
|
||||||
|
// In a real application, you might:
|
||||||
|
// - Drain existing connections
|
||||||
|
// - Save server state
|
||||||
|
// - Close database connections
|
||||||
|
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||||
|
let total = self.request_count.load(Ordering::Relaxed);
|
||||||
|
info!(
|
||||||
|
"Web server on port {} shut down. Total requests handled: {}",
|
||||||
|
self.port, total
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A background job processor that simulates processing tasks from a queue.
|
||||||
|
struct JobProcessor {
|
||||||
|
queue_name: String,
|
||||||
|
processing_interval: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Component for JobProcessor {
|
||||||
|
fn info(&self) -> ComponentInfo {
|
||||||
|
format!("job-processor-{}", self.queue_name).into()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn setup(&self) -> Result<(), MadError> {
|
||||||
|
info!("Connecting to queue: {}", self.queue_name);
|
||||||
|
// Simulate connecting to a message queue
|
||||||
|
tokio::time::sleep(Duration::from_millis(150)).await;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
|
||||||
|
info!("Job processor for {} started", self.queue_name);
|
||||||
|
let mut interval = interval(self.processing_interval);
|
||||||
|
let mut job_count = 0;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = cancellation.cancelled() => {
|
||||||
|
info!("Job processor for {} stopping...", self.queue_name);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
_ = interval.tick() => {
|
||||||
|
job_count += 1;
|
||||||
|
info!("Processing job #{} from {}", job_count, self.queue_name);
|
||||||
|
|
||||||
|
// Simulate job processing
|
||||||
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
|
||||||
|
// Simulate occasional errors (but don't fail the component)
|
||||||
|
if job_count % 10 == 0 {
|
||||||
|
warn!("Job #{} from {} required retry", job_count, self.queue_name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"Job processor for {} processed {} jobs",
|
||||||
|
self.queue_name, job_count
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn close(&self) -> Result<(), MadError> {
|
||||||
|
info!("Disconnecting from queue: {}", self.queue_name);
|
||||||
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A health check component that monitors system health.
|
||||||
|
struct HealthChecker {
|
||||||
|
check_interval: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Component for HealthChecker {
|
||||||
|
fn info(&self) -> ComponentInfo {
|
||||||
|
"health-checker".into()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
|
||||||
|
info!("Health checker started");
|
||||||
|
let mut interval = interval(self.check_interval);
|
||||||
|
|
||||||
|
while !cancellation.is_cancelled() {
|
||||||
|
tokio::select! {
|
||||||
|
_ = cancellation.cancelled() => {
|
||||||
|
info!("Health checker stopping...");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
_ = interval.tick() => {
|
||||||
|
// Simulate health checks
|
||||||
|
let cpu_usage = rand::random::<f32>() * 100.0;
|
||||||
|
let memory_usage = rand::random::<f32>() * 100.0;
|
||||||
|
|
||||||
|
info!("System health: CPU={:.1}%, Memory={:.1}%", cpu_usage, memory_usage);
|
||||||
|
|
||||||
|
if cpu_usage > 90.0 {
|
||||||
|
warn!("High CPU usage detected: {:.1}%", cpu_usage);
|
||||||
|
}
|
||||||
|
if memory_usage > 90.0 {
|
||||||
|
warn!("High memory usage detected: {:.1}%", memory_usage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A component that will fail after some time to demonstrate error handling.
|
||||||
|
struct FailingComponent {
|
||||||
|
fail_after: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Component for FailingComponent {
|
||||||
|
fn info(&self) -> ComponentInfo {
|
||||||
|
"failing-component".into()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
|
||||||
|
info!(
|
||||||
|
"Failing component started (will fail after {:?})",
|
||||||
|
self.fail_after
|
||||||
|
);
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
_ = cancellation.cancelled() => {
|
||||||
|
info!("Failing component cancelled before failure");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
_ = tokio::time::sleep(self.fail_after) => {
|
||||||
|
error!("Failing component encountered an error!");
|
||||||
|
Err(anyhow::anyhow!("Simulated component failure").into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Debug component that logs system status periodically.
|
||||||
|
struct DebugComponent;
|
||||||
|
|
||||||
|
impl Component for DebugComponent {
|
||||||
|
fn info(&self) -> ComponentInfo {
|
||||||
|
"debug-component".into()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(&self, cancel: CancellationToken) -> Result<(), MadError> {
|
||||||
|
info!("Debug mode enabled - verbose logging active");
|
||||||
|
let mut interval = interval(Duration::from_secs(5));
|
||||||
|
|
||||||
|
while !cancel.is_cancelled() {
|
||||||
|
tokio::select! {
|
||||||
|
_ = cancel.cancelled() => break,
|
||||||
|
_ = interval.tick() => {
|
||||||
|
info!("DEBUG: System running normally");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("Debug component shutting down");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
// Initialize tracing for logging
|
||||||
|
tracing_subscriber::fmt()
|
||||||
|
.with_target(false)
|
||||||
|
.without_time()
|
||||||
|
.init();
|
||||||
|
|
||||||
|
info!("Starting comprehensive MAD example application");
|
||||||
|
|
||||||
|
// Check if we should enable the failing component
|
||||||
|
let enable_failure_demo = std::env::var("ENABLE_FAILURE").is_ok();
|
||||||
|
|
||||||
|
// Check if we should enable debug mode
|
||||||
|
let debug_mode = std::env::var("DEBUG").is_ok();
|
||||||
|
|
||||||
|
// Shared state for demonstration
|
||||||
|
let request_count = Arc::new(AtomicUsize::new(0));
|
||||||
|
|
||||||
|
// Build and run the application
|
||||||
|
let result = Mad::builder()
|
||||||
|
// Add web servers
|
||||||
|
.add(WebServer {
|
||||||
|
port: 8080,
|
||||||
|
request_count: request_count.clone(),
|
||||||
|
})
|
||||||
|
.add(WebServer {
|
||||||
|
port: 8081,
|
||||||
|
request_count: request_count.clone(),
|
||||||
|
})
|
||||||
|
// Add job processors
|
||||||
|
.add(JobProcessor {
|
||||||
|
queue_name: "high-priority".to_string(),
|
||||||
|
processing_interval: Duration::from_secs(2),
|
||||||
|
})
|
||||||
|
.add(JobProcessor {
|
||||||
|
queue_name: "low-priority".to_string(),
|
||||||
|
processing_interval: Duration::from_secs(5),
|
||||||
|
})
|
||||||
|
// Add health checker
|
||||||
|
.add(HealthChecker {
|
||||||
|
check_interval: Duration::from_secs(3),
|
||||||
|
})
|
||||||
|
// Conditionally add a debug component using add_fn
|
||||||
|
.add_conditional(debug_mode, DebugComponent)
|
||||||
|
// Conditionally add failing component to demonstrate error handling
|
||||||
|
.add_conditional(
|
||||||
|
enable_failure_demo,
|
||||||
|
FailingComponent {
|
||||||
|
fail_after: Duration::from_secs(10),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
// Add a simple metrics reporter using add_fn
|
||||||
|
.add_fn(|cancel: CancellationToken| async move {
|
||||||
|
info!("Metrics reporter started");
|
||||||
|
let mut interval = interval(Duration::from_secs(10));
|
||||||
|
let start = std::time::Instant::now();
|
||||||
|
|
||||||
|
while !cancel.is_cancelled() {
|
||||||
|
tokio::select! {
|
||||||
|
_ = cancel.cancelled() => break,
|
||||||
|
_ = interval.tick() => {
|
||||||
|
let uptime = start.elapsed();
|
||||||
|
info!("Application uptime: {:?}", uptime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("Metrics reporter stopped");
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
// Configure graceful shutdown timeout
|
||||||
|
.cancellation(Some(Duration::from_secs(5)))
|
||||||
|
// Run the application
|
||||||
|
.run()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(()) => {
|
||||||
|
info!("Application shut down successfully");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Application failed: {}", e);
|
||||||
|
|
||||||
|
// Check if it's an aggregate error with multiple failures
|
||||||
|
if let MadError::AggregateError(ref agg) = e {
|
||||||
|
error!("Multiple component failures detected:");
|
||||||
|
for (i, err) in agg.get_errors().iter().enumerate() {
|
||||||
|
error!(" {}. {}", i + 1, err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(e.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
41
crates/mad/examples/error_log/main.rs
Normal file
41
crates/mad/examples/error_log/main.rs
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
use notmad::ComponentInfo;
|
||||||
|
use rand::Rng;
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
|
use tracing::Level;
|
||||||
|
|
||||||
|
struct ErrorServer {}
|
||||||
|
impl notmad::Component for ErrorServer {
|
||||||
|
fn info(&self) -> ComponentInfo {
|
||||||
|
"ErrorServer".into()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||||
|
let millis_wait = rand::thread_rng().gen_range(500..3000);
|
||||||
|
|
||||||
|
tracing::debug!("waiting: {}ms", millis_wait);
|
||||||
|
|
||||||
|
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
||||||
|
|
||||||
|
Err(notmad::MadError::Inner(anyhow::anyhow!("expected error")))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
tracing_subscriber::fmt()
|
||||||
|
.with_max_level(Level::TRACE)
|
||||||
|
.init();
|
||||||
|
|
||||||
|
// Do note that only the first server which returns an error is guaranteed to be handled. This is because if servers don't respect cancellation, they will be dropped
|
||||||
|
|
||||||
|
notmad::Mad::builder()
|
||||||
|
.add(ErrorServer {})
|
||||||
|
.add(ErrorServer {})
|
||||||
|
.add(ErrorServer {})
|
||||||
|
.add(ErrorServer {})
|
||||||
|
.run()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
66
crates/mad/examples/fn/main.rs
Normal file
66
crates/mad/examples/fn/main.rs
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
use notmad::ComponentInfo;
|
||||||
|
use rand::Rng;
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
|
use tracing::Level;
|
||||||
|
|
||||||
|
struct WaitServer {}
|
||||||
|
impl notmad::Component for WaitServer {
|
||||||
|
fn info(&self) -> ComponentInfo {
|
||||||
|
"WaitServer".into()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||||
|
let millis_wait = rand::thread_rng().gen_range(500..3000);
|
||||||
|
|
||||||
|
tracing::debug!("waiting: {}ms", millis_wait);
|
||||||
|
|
||||||
|
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
tracing_subscriber::fmt()
|
||||||
|
.with_max_level(Level::TRACE)
|
||||||
|
.init();
|
||||||
|
|
||||||
|
let item = "some item".to_string();
|
||||||
|
|
||||||
|
notmad::Mad::builder()
|
||||||
|
.add(WaitServer {})
|
||||||
|
.add_fn(|_cancel| async move {
|
||||||
|
let millis_wait = 50;
|
||||||
|
|
||||||
|
tracing::debug!("waiting: {}ms", millis_wait);
|
||||||
|
|
||||||
|
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
.add_fn(move |_cancel| {
|
||||||
|
// I am an actual closure
|
||||||
|
|
||||||
|
let item = item.clone();
|
||||||
|
|
||||||
|
async move {
|
||||||
|
let _item = item;
|
||||||
|
|
||||||
|
let millis_wait = 50;
|
||||||
|
|
||||||
|
tracing::debug!("waiting: {}ms", millis_wait);
|
||||||
|
|
||||||
|
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.run()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
15
crates/mad/examples/multi_service/Cargo.toml
Normal file
15
crates/mad/examples/multi_service/Cargo.toml
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
[package]
|
||||||
|
name = "mad-multi-service-example"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[[bin]]
|
||||||
|
name = "multi_service"
|
||||||
|
path = "main.rs"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
notmad = { path = "../.." }
|
||||||
|
tokio = { version = "1", features = ["full"] }
|
||||||
|
tokio-util = "0.7"
|
||||||
|
async-trait = "0.1"
|
||||||
|
anyhow = "1"
|
||||||
210
crates/mad/examples/multi_service/main.rs
Normal file
210
crates/mad/examples/multi_service/main.rs
Normal file
@@ -0,0 +1,210 @@
|
|||||||
|
//! Example demonstrating running multiple services with MAD.
|
||||||
|
//!
|
||||||
|
//! This example shows how to run a web server, queue processor, and
|
||||||
|
//! scheduled task together, with graceful shutdown on Ctrl+C.
|
||||||
|
|
||||||
|
use notmad::{Component, ComponentInfo, Mad, MadError};
|
||||||
|
use tokio::time::{Duration, interval};
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
|
/// A simple web server component.
|
||||||
|
///
|
||||||
|
/// In a real application, this would bind to a port and handle HTTP requests.
|
||||||
|
/// Here we simulate it by periodically logging that we're "handling" requests.
|
||||||
|
struct WebServer {
|
||||||
|
port: u16,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Component for WebServer {
|
||||||
|
fn info(&self) -> ComponentInfo {
|
||||||
|
format!("WebServer:{}", self.port).into()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn setup(&self) -> Result<(), MadError> {
|
||||||
|
println!("[{}] Binding to port...", self.info());
|
||||||
|
// Simulate server setup time
|
||||||
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
println!("[{}] Ready to accept connections", self.info());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
|
||||||
|
println!("[{}] Server started", self.info());
|
||||||
|
|
||||||
|
// Simulate handling requests until shutdown
|
||||||
|
let mut request_id = 0;
|
||||||
|
let mut interval = interval(Duration::from_secs(2));
|
||||||
|
|
||||||
|
while !cancellation.is_cancelled() {
|
||||||
|
tokio::select! {
|
||||||
|
_ = cancellation.cancelled() => {
|
||||||
|
println!("[{}] Shutdown signal received", self.info());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
_ = interval.tick() => {
|
||||||
|
request_id += 1;
|
||||||
|
println!("[{}] Handling request #{}", self.info(), request_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn close(&self) -> Result<(), MadError> {
|
||||||
|
println!("[{}] Closing connections...", self.info());
|
||||||
|
// Simulate graceful connection drain
|
||||||
|
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||||
|
println!("[{}] Server stopped", self.info());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A queue processor that consumes messages from a queue.
|
||||||
|
///
|
||||||
|
/// This simulates processing messages from a message queue like
|
||||||
|
/// RabbitMQ, Kafka, or AWS SQS.
|
||||||
|
struct QueueProcessor {
|
||||||
|
queue_name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Component for QueueProcessor {
|
||||||
|
fn info(&self) -> ComponentInfo {
|
||||||
|
format!("QueueProcessor:{}", self.queue_name).into()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
|
||||||
|
println!("[{}] Started processing", self.info());
|
||||||
|
|
||||||
|
let mut message_count = 0;
|
||||||
|
|
||||||
|
// Process messages until shutdown
|
||||||
|
while !cancellation.is_cancelled() {
|
||||||
|
// Simulate waiting for and processing a message
|
||||||
|
tokio::select! {
|
||||||
|
_ = cancellation.cancelled() => {
|
||||||
|
println!("[{}] Stopping message processing", self.info());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
_ = tokio::time::sleep(Duration::from_secs(1)) => {
|
||||||
|
message_count += 1;
|
||||||
|
println!("[{}] Processed message #{}", self.info(), message_count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
println!(
|
||||||
|
"[{}] Processed {} messages total",
|
||||||
|
self.info(),
|
||||||
|
message_count
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A scheduled task that runs periodically.
|
||||||
|
///
|
||||||
|
/// This could be used for tasks like:
|
||||||
|
/// - Cleaning up old data
|
||||||
|
/// - Generating reports
|
||||||
|
/// - Syncing with external systems
|
||||||
|
struct ScheduledTask {
|
||||||
|
task_name: String,
|
||||||
|
interval_secs: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Component for ScheduledTask {
|
||||||
|
fn info(&self) -> ComponentInfo {
|
||||||
|
format!("ScheduledTask:{}", self.task_name).into()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
|
||||||
|
println!(
|
||||||
|
"[{}] Scheduled to run every {} seconds",
|
||||||
|
self.info(),
|
||||||
|
self.interval_secs
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut interval = interval(Duration::from_secs(self.interval_secs));
|
||||||
|
let mut run_count = 0;
|
||||||
|
|
||||||
|
while !cancellation.is_cancelled() {
|
||||||
|
tokio::select! {
|
||||||
|
_ = cancellation.cancelled() => {
|
||||||
|
println!("[{}] Scheduler stopping", self.info());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
_ = interval.tick() => {
|
||||||
|
run_count += 1;
|
||||||
|
println!("[{}] Executing run #{}", self.info(), run_count);
|
||||||
|
|
||||||
|
// Simulate task execution
|
||||||
|
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||||
|
|
||||||
|
println!("[{}] Run #{} completed", self.info(), run_count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
println!("Starting multi-service application");
|
||||||
|
println!("Press Ctrl+C to trigger graceful shutdown");
|
||||||
|
println!("----------------------------------------");
|
||||||
|
|
||||||
|
// Build the application with multiple services
|
||||||
|
Mad::builder()
|
||||||
|
// Add a web server on port 8080
|
||||||
|
.add(WebServer { port: 8080 })
|
||||||
|
// Add another web server on port 8081 (e.g., admin interface)
|
||||||
|
.add(WebServer { port: 8081 })
|
||||||
|
// Add queue processors for different queues
|
||||||
|
.add(QueueProcessor {
|
||||||
|
queue_name: "orders".to_string(),
|
||||||
|
})
|
||||||
|
.add(QueueProcessor {
|
||||||
|
queue_name: "notifications".to_string(),
|
||||||
|
})
|
||||||
|
// Add scheduled tasks
|
||||||
|
.add(ScheduledTask {
|
||||||
|
task_name: "cleanup".to_string(),
|
||||||
|
interval_secs: 5,
|
||||||
|
})
|
||||||
|
.add(ScheduledTask {
|
||||||
|
task_name: "report_generator".to_string(),
|
||||||
|
interval_secs: 10,
|
||||||
|
})
|
||||||
|
// Add a monitoring component using a closure
|
||||||
|
.add_fn(|cancel| async move {
|
||||||
|
println!("[Monitor] Starting system monitor");
|
||||||
|
let mut interval = interval(Duration::from_secs(15));
|
||||||
|
|
||||||
|
while !cancel.is_cancelled() {
|
||||||
|
tokio::select! {
|
||||||
|
_ = cancel.cancelled() => {
|
||||||
|
println!("[Monitor] Stopping monitor");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
_ = interval.tick() => {
|
||||||
|
println!("[Monitor] All systems operational");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
// Set graceful shutdown timeout to 3 seconds
|
||||||
|
.cancellation(Some(Duration::from_secs(3)))
|
||||||
|
// Run all components
|
||||||
|
.run()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
println!("----------------------------------------");
|
||||||
|
println!("All services shut down successfully");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
84
crates/mad/examples/nested_errors/main.rs
Normal file
84
crates/mad/examples/nested_errors/main.rs
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
use notmad::ComponentInfo;
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
|
struct NestedErrorComponent {
|
||||||
|
name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl notmad::Component for NestedErrorComponent {
|
||||||
|
fn info(&self) -> ComponentInfo {
|
||||||
|
self.name.clone().into()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||||
|
// Simulate a deeply nested error
|
||||||
|
let io_error = std::io::Error::new(
|
||||||
|
std::io::ErrorKind::PermissionDenied,
|
||||||
|
"access denied to /etc/secret",
|
||||||
|
);
|
||||||
|
|
||||||
|
Err(anyhow::Error::from(io_error)
|
||||||
|
.context("failed to read configuration file")
|
||||||
|
.context("unable to initialize database connection pool")
|
||||||
|
.context(format!("component '{}' startup failed", self.name))
|
||||||
|
.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct AnotherFailingComponent;
|
||||||
|
|
||||||
|
impl notmad::Component for AnotherFailingComponent {
|
||||||
|
fn info(&self) -> ComponentInfo {
|
||||||
|
"another-component".into()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||||
|
|
||||||
|
Err(anyhow::anyhow!("network timeout after 30s")
|
||||||
|
.context("failed to connect to external API")
|
||||||
|
.context("service health check failed")
|
||||||
|
.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
tracing_subscriber::fmt()
|
||||||
|
.with_env_filter("mad=debug")
|
||||||
|
.init();
|
||||||
|
|
||||||
|
let result = notmad::Mad::builder()
|
||||||
|
.add(NestedErrorComponent {
|
||||||
|
name: "database-service".into(),
|
||||||
|
})
|
||||||
|
.add(AnotherFailingComponent)
|
||||||
|
.run()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(()) => println!("Success!"),
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("\n=== Error occurred ===");
|
||||||
|
eprintln!("{}", e);
|
||||||
|
|
||||||
|
// Also demonstrate how to walk the error chain manually
|
||||||
|
if let notmad::MadError::AggregateError(ref agg) = e {
|
||||||
|
eprintln!("\n=== Detailed error chains ===");
|
||||||
|
for (i, error) in agg.get_errors().iter().enumerate() {
|
||||||
|
eprintln!("\nComponent {} error chain:", i + 1);
|
||||||
|
if let notmad::MadError::Inner(inner) = error {
|
||||||
|
for (j, cause) in inner.chain().enumerate() {
|
||||||
|
eprintln!(" {}. {}", j + 1, cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if let notmad::MadError::Inner(ref inner) = e {
|
||||||
|
eprintln!("\n=== Error chain ===");
|
||||||
|
for (i, cause) in inner.chain().enumerate() {
|
||||||
|
eprintln!(" {}. {}", i + 1, cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
66
crates/mad/examples/signals/main.rs
Normal file
66
crates/mad/examples/signals/main.rs
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
use notmad::ComponentInfo;
|
||||||
|
use rand::Rng;
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
|
use tracing::Level;
|
||||||
|
|
||||||
|
struct WaitServer {}
|
||||||
|
impl notmad::Component for WaitServer {
|
||||||
|
fn info(&self) -> ComponentInfo {
|
||||||
|
"WaitServer".into()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||||
|
let millis_wait = rand::thread_rng().gen_range(500..3000);
|
||||||
|
|
||||||
|
tracing::debug!("waiting: {}ms", millis_wait);
|
||||||
|
|
||||||
|
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct RespectCancel {}
|
||||||
|
impl notmad::Component for RespectCancel {
|
||||||
|
fn info(&self) -> ComponentInfo {
|
||||||
|
"RespectCancel".into()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||||
|
cancellation.cancelled().await;
|
||||||
|
tracing::debug!("stopping because job is cancelled");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct NeverStopServer {}
|
||||||
|
impl notmad::Component for NeverStopServer {
|
||||||
|
fn info(&self) -> ComponentInfo {
|
||||||
|
"NeverStopServer".into()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||||
|
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(999999999)).await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
tracing_subscriber::fmt()
|
||||||
|
.with_max_level(Level::TRACE)
|
||||||
|
.init();
|
||||||
|
|
||||||
|
notmad::Mad::builder()
|
||||||
|
.add(WaitServer {})
|
||||||
|
.add(NeverStopServer {})
|
||||||
|
.add(RespectCancel {})
|
||||||
|
.run()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
File diff suppressed because it is too large
Load Diff
79
crates/mad/src/waiter.rs
Normal file
79
crates/mad/src/waiter.rs
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
//! Waiter components for MAD.
|
||||||
|
//!
|
||||||
|
//! This module provides waiter components that simply wait for cancellation
|
||||||
|
//! without performing any work. Useful for keeping the application alive
|
||||||
|
//! or as placeholders in conditional component loading.
|
||||||
|
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
|
use crate::{Component, ComponentInfo, IntoComponent, MadError, SharedComponent};
|
||||||
|
|
||||||
|
/// A default waiter component that panics if run.
|
||||||
|
///
|
||||||
|
/// This is used internally as a placeholder that should never
|
||||||
|
/// actually be executed.
|
||||||
|
pub struct DefaultWaiter;
|
||||||
|
impl Component for DefaultWaiter {
|
||||||
|
async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), MadError> {
|
||||||
|
panic!("should never be called");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A wrapper component that waits for cancellation.
|
||||||
|
///
|
||||||
|
/// Instead of running the wrapped component's logic, this simply
|
||||||
|
/// waits for the cancellation token. This is useful for conditionally
|
||||||
|
/// disabling components while keeping the same structure.
|
||||||
|
///
|
||||||
|
/// # Example
|
||||||
|
///
|
||||||
|
/// ```rust,ignore
|
||||||
|
/// use mad::Waiter;
|
||||||
|
///
|
||||||
|
/// // Instead of running the service, just wait
|
||||||
|
/// let waiter = Waiter::new(service.into_component());
|
||||||
|
/// ```
|
||||||
|
pub struct Waiter {
|
||||||
|
comp: SharedComponent,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Waiter {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
comp: DefaultWaiter {}.into_component(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Waiter {
|
||||||
|
/// Creates a new waiter that wraps the given component.
|
||||||
|
///
|
||||||
|
/// The wrapped component's name will be used (prefixed with "waiter/"),
|
||||||
|
/// but its run method will not be called.
|
||||||
|
pub fn new(c: SharedComponent) -> Self {
|
||||||
|
Self { comp: c }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Component for Waiter {
|
||||||
|
/// Returns the name of the waiter, prefixed with "waiter/".
|
||||||
|
///
|
||||||
|
/// If the wrapped component has a name, it will be "waiter/{name}".
|
||||||
|
/// Otherwise, returns "waiter".
|
||||||
|
fn info(&self) -> ComponentInfo {
|
||||||
|
match &self.comp.info().name {
|
||||||
|
Some(name) => format!("waiter/{name}").into(),
|
||||||
|
None => "waiter".into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Waits for cancellation without performing any work.
|
||||||
|
///
|
||||||
|
/// This method simply waits for the cancellation token to be triggered,
|
||||||
|
/// then returns successfully.
|
||||||
|
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError> {
|
||||||
|
cancellation_token.cancelled().await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,18 +1,19 @@
|
|||||||
use async_trait::async_trait;
|
use std::sync::Arc;
|
||||||
use mad::{Component, Mad};
|
|
||||||
|
use notmad::{Component, ComponentInfo, Mad, MadError};
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing_test::traced_test;
|
use tracing_test::traced_test;
|
||||||
|
|
||||||
struct NeverEndingRun {}
|
struct NeverEndingRun {}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl Component for NeverEndingRun {
|
impl Component for NeverEndingRun {
|
||||||
fn name(&self) -> Option<String> {
|
fn info(&self) -> ComponentInfo {
|
||||||
Some("NeverEndingRun".into())
|
"NeverEndingRun".into()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
|
||||||
let millis_wait = rand::thread_rng().gen_range(50..1000);
|
let millis_wait = rand::thread_rng().gen_range(50..1000);
|
||||||
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
||||||
@@ -86,3 +87,92 @@ async fn test_can_run_components() -> anyhow::Result<()> {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
#[traced_test]
|
||||||
|
async fn test_can_shutdown_gracefully() -> anyhow::Result<()> {
|
||||||
|
let check = Arc::new(Mutex::new(None));
|
||||||
|
|
||||||
|
Mad::builder()
|
||||||
|
.add_fn({
|
||||||
|
let check = check.clone();
|
||||||
|
|
||||||
|
move |cancel| {
|
||||||
|
let check = check.clone();
|
||||||
|
|
||||||
|
async move {
|
||||||
|
let start = std::time::SystemTime::now();
|
||||||
|
tracing::info!("waiting for cancel");
|
||||||
|
cancel.cancelled().await;
|
||||||
|
tracing::info!("submitting check");
|
||||||
|
let mut check = check.lock().await;
|
||||||
|
let elapsed = start.elapsed().expect("to be able to get elapsed");
|
||||||
|
*check = Some(elapsed);
|
||||||
|
tracing::info!("check submitted");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.add_fn(|_| async move {
|
||||||
|
tracing::info!("starting sleep");
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||||
|
tracing::info!("sleep ended");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
.run()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let check = check
|
||||||
|
.lock()
|
||||||
|
.await
|
||||||
|
.expect("to be able to get a duration from cancel");
|
||||||
|
|
||||||
|
// We default wait 100 ms for graceful shutdown, and we explicitly wait 100ms in the sleep routine
|
||||||
|
tracing::info!("check millis: {}", check.as_millis());
|
||||||
|
assert!(check.as_millis() < 250);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
#[traced_test]
|
||||||
|
async fn test_component_panics_shutdowns_cleanly() -> anyhow::Result<()> {
|
||||||
|
let res = Mad::builder()
|
||||||
|
.add_fn({
|
||||||
|
move |_cancel| async move {
|
||||||
|
panic!("my inner panic");
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.add_fn(|cancel| async move {
|
||||||
|
cancel.cancelled().await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
.run()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let err_content = res.unwrap_err().to_string();
|
||||||
|
assert!(err_content.contains("component panicked"));
|
||||||
|
assert!(err_content.contains("my inner panic"));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_can_easily_transform_error() -> anyhow::Result<()> {
|
||||||
|
fn fallible() -> anyhow::Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inner() -> Result<(), MadError> {
|
||||||
|
fallible()?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
inner()?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,10 +1,12 @@
|
|||||||
# yaml-language-server: $schema=https://git.front.kjuulh.io/kjuulh/cuddle/raw/branch/main/schemas/base.json
|
# yaml-language-server: $schema=https://git.kjuulh.io/kjuulh/cuddle/raw/branch/main/schemas/base.json
|
||||||
|
|
||||||
base: "git@git.front.kjuulh.io:kjuulh/cuddle-rust-lib-plan.git"
|
base: "git@git.kjuulh.io:kjuulh/cuddle-rust-lib-plan.git"
|
||||||
|
|
||||||
vars:
|
vars:
|
||||||
service: "mad"
|
service: "mad"
|
||||||
registry: kasperhermansen
|
registry: kasperhermansen
|
||||||
|
rust:
|
||||||
|
publish: {}
|
||||||
|
|
||||||
please:
|
please:
|
||||||
project:
|
project:
|
||||||
@@ -12,6 +14,6 @@ please:
|
|||||||
repository: "mad"
|
repository: "mad"
|
||||||
branch: main
|
branch: main
|
||||||
settings:
|
settings:
|
||||||
api_url: "https://git.front.kjuulh.io"
|
api_url: "https://git.kjuulh.io"
|
||||||
actions:
|
actions:
|
||||||
rust:
|
rust:
|
||||||
|
|||||||
3
renovate.json
Normal file
3
renovate.json
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
{
|
||||||
|
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user