1 Commits

Author SHA1 Message Date
cuddle-please
cd23d83a22 chore(release): 0.2.0
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2024-08-07 09:41:10 +00:00
18 changed files with 154 additions and 1988 deletions

View File

@@ -6,108 +6,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.8.0] - 2025-08-08
### Added
- add docs
- update readme
## [0.7.5] - 2025-07-24
### Added
- print big inner
### Other
- more error correction
- correct error test to not be as verbose
## [0.7.4] - 2025-07-24
### Added
- cleanup aggregate error for single error
## [0.7.3] - 2025-07-24
### Added
- automatic conversion from anyhow::Error and access to aggregate errors
### Fixed
- *(deps)* update all dependencies (#30)
## [0.7.2] - 2025-06-25
### Added
- add wait
- add conditional, allows adding or waiting for close
### Fixed
- *(deps)* update rust crate async-trait to v0.1.86 (#28)
- *(deps)* update rust crate rand to 0.9.0 (#27)
- *(deps)* update rust crate thiserror to v2.0.11 (#26)
- *(deps)* update all dependencies (#25)
- *(deps)* update rust crate async-trait to v0.1.84 (#24)
- *(deps)* update rust crate thiserror to v2.0.9 (#22)
- *(deps)* update rust crate thiserror to v2.0.8 (#21)
- *(deps)* update rust crate thiserror to v2.0.7 (#20)
- *(deps)* update rust crate thiserror to v2.0.6 (#19)
- *(deps)* update rust crate thiserror to v2.0.5 (#18)
- *(deps)* update rust crate tokio-util to v0.7.13 (#17)
### Other
- chore
- *(deps)* update all dependencies (#29)
- *(deps)* update rust crate anyhow to v1.0.95 (#23)
- *(deps)* update all dependencies (#16)
- *(deps)* update rust crate tracing-subscriber to v0.3.19 (#15)
- *(deps)* update rust crate tracing to v0.1.41 (#13)
## [0.7.1] - 2024-11-24
### Fixed
- make sure to close on final
## [0.7.0] - 2024-11-24
### Added
- actually bubble up errors
### Fixed
- *(deps)* update rust crate thiserror to v2 (#9)
## [0.6.0] - 2024-11-23
### Added
- adding test to make sure we can gracefully shutdown
- make sure to close down properly
## [0.5.0] - 2024-11-19
### Added
- update name
- respect sigterm
- include author
- update with rename
### Docs
- add examples
## [0.4.0] - 2024-08-07
### Added
- add correction
- add small docs
## [0.3.0] - 2024-08-07
### Added
- add add_fn to execute immediate lambdas
## [0.2.1] - 2024-08-07
### Docs
- add a small readme
## [0.2.0] - 2024-08-07
### Added

291
Cargo.lock generated
View File

@@ -1,21 +1,21 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 4
version = 3
[[package]]
name = "addr2line"
version = "0.24.2"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1"
checksum = "6e4503c46a5c0c7844e948c9a4d6acd9f50cccb4de1c48eb9e291ea17470c678"
dependencies = [
"gimli",
]
[[package]]
name = "adler2"
version = "2.0.0"
name = "adler"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "aho-corasick"
@@ -28,15 +28,15 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.98"
version = "1.0.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da"
[[package]]
name = "async-trait"
version = "0.1.88"
version = "0.1.81"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5"
checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107"
dependencies = [
"proc-macro2",
"quote",
@@ -45,23 +45,23 @@ dependencies = [
[[package]]
name = "autocfg"
version = "1.4.0"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0"
[[package]]
name = "backtrace"
version = "0.3.74"
version = "0.3.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a"
checksum = "5cc23269a4f8976d0a4d2e7109211a419fe30e8d88d677cd60b6bc79c5732e0a"
dependencies = [
"addr2line",
"cc",
"cfg-if",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
"windows-targets",
]
[[package]]
@@ -78,9 +78,15 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.8.0"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da"
checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50"
[[package]]
name = "cc"
version = "1.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26a5c3fd7bfa1ce3897a3a3501d362b2d87b7f2583ebcb4a949ec25911025cbc"
[[package]]
name = "cfg-if"
@@ -90,9 +96,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "futures"
version = "0.3.31"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
@@ -105,9 +111,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.31"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
dependencies = [
"futures-core",
"futures-sink",
@@ -115,15 +121,15 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.31"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-executor"
version = "0.3.31"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
dependencies = [
"futures-core",
"futures-task",
@@ -132,15 +138,15 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.31"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-macro"
version = "0.3.31"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
@@ -149,21 +155,21 @@ dependencies = [
[[package]]
name = "futures-sink"
version = "0.3.31"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5"
[[package]]
name = "futures-task"
version = "0.3.31"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
[[package]]
name = "futures-util"
version = "0.3.31"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures-channel",
"futures-core",
@@ -179,21 +185,20 @@ dependencies = [
[[package]]
name = "getrandom"
version = "0.3.1"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43a49c392881ce6d5c3b8cb70f98717b7c07aabbdff06687b9030dbfbe2725f8"
checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7"
dependencies = [
"cfg-if",
"libc",
"wasi 0.13.3+wasi-0.2.2",
"windows-targets",
"wasi",
]
[[package]]
name = "gimli"
version = "0.31.1"
version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd"
[[package]]
name = "hermit-abi"
@@ -201,17 +206,6 @@ version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
[[package]]
name = "io-uring"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b86e202f00093dcba4275d4636b93ef9dd75d025ae560d2521b45ea28ab49013"
dependencies = [
"bitflags",
"cfg-if",
"libc",
]
[[package]]
name = "lazy_static"
version = "1.5.0"
@@ -220,9 +214,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]]
name = "libc"
version = "0.2.169"
version = "0.2.155"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a"
checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
[[package]]
name = "lock_api"
@@ -240,6 +234,22 @@ version = "0.4.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "mad"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"futures",
"futures-util",
"rand",
"thiserror",
"tokio",
"tokio-util",
"tracing",
"tracing-test",
]
[[package]]
name = "matchers"
version = "0.1.0"
@@ -257,42 +267,25 @@ checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
[[package]]
name = "miniz_oxide"
version = "0.8.0"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1"
checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08"
dependencies = [
"adler2",
"adler",
]
[[package]]
name = "mio"
version = "1.0.2"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec"
checksum = "4569e456d394deccd22ce1c1913e6ea0e54519f577285001215d33557431afe4"
dependencies = [
"hermit-abi",
"libc",
"wasi 0.11.0+wasi-snapshot-preview1",
"wasi",
"windows-sys",
]
[[package]]
name = "notmad"
version = "0.7.5"
dependencies = [
"anyhow",
"async-trait",
"futures",
"futures-util",
"rand",
"thiserror",
"tokio",
"tokio-util",
"tracing",
"tracing-subscriber",
"tracing-test",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
@@ -305,18 +298,18 @@ dependencies = [
[[package]]
name = "object"
version = "0.36.5"
version = "0.36.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e"
checksum = "27b64972346851a39438c60b341ebc01bba47464ae329e55cf343eb93964efd9"
dependencies = [
"memchr",
]
[[package]]
name = "once_cell"
version = "1.20.2"
version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775"
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "overload"
@@ -349,9 +342,9 @@ dependencies = [
[[package]]
name = "pin-project-lite"
version = "0.2.15"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "915a1e146535de9163f3987b8944ed8cf49a18bb0056bcebcdcece385cece4ff"
checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02"
[[package]]
name = "pin-utils"
@@ -365,42 +358,43 @@ version = "0.2.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04"
dependencies = [
"zerocopy 0.7.35",
"zerocopy",
]
[[package]]
name = "proc-macro2"
version = "1.0.89"
version = "1.0.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e"
checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.37"
version = "1.0.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af"
checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7"
dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.9.2"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
]
[[package]]
name = "rand_chacha"
version = "0.9.0"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
@@ -408,33 +402,32 @@ dependencies = [
[[package]]
name = "rand_core"
version = "0.9.0"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b08f3c9802962f7e1b25113931d94f43ed9725bebc59db9d0c3e9a23b67e15ff"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom",
"zerocopy 0.8.14",
]
[[package]]
name = "redox_syscall"
version = "0.5.7"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f"
checksum = "2a908a6e00f1fdd0dfd9c0eb08ce85126f6d8bbda50017e74bc4a4b7d4a926a4"
dependencies = [
"bitflags",
]
[[package]]
name = "regex"
version = "1.11.1"
version = "1.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata 0.4.9",
"regex-syntax 0.8.5",
"regex-automata 0.4.7",
"regex-syntax 0.8.4",
]
[[package]]
@@ -448,13 +441,13 @@ dependencies = [
[[package]]
name = "regex-automata"
version = "0.4.9"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908"
checksum = "38caf58cc5ef2fed281f89292ef23f6365465ed9a41b7a7754eb4e26496c92df"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax 0.8.5",
"regex-syntax 0.8.4",
]
[[package]]
@@ -465,9 +458,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.8.5"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b"
[[package]]
name = "rustc-demangle"
@@ -526,9 +519,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.87"
version = "2.0.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d"
checksum = "dc4b9b9bf2add8093d3f2c0204471e951b2285580335de42f9d2534f3ae7a8af"
dependencies = [
"proc-macro2",
"quote",
@@ -537,18 +530,18 @@ dependencies = [
[[package]]
name = "thiserror"
version = "2.0.12"
version = "1.0.63"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708"
checksum = "c0342370b38b6a11b6cc11d6a805569958d54cfa061a29969c3b5ce2ea405724"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "2.0.12"
version = "1.0.63"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d"
checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261"
dependencies = [
"proc-macro2",
"quote",
@@ -567,19 +560,17 @@ dependencies = [
[[package]]
name = "tokio"
version = "1.46.1"
version = "1.39.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0cc3a2344dafbe23a245241fe8b09735b521110d30fcefbbd5feb1797ca35d17"
checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1"
dependencies = [
"backtrace",
"bytes",
"io-uring",
"libc",
"mio",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"slab",
"socket2",
"tokio-macros",
"windows-sys",
@@ -587,9 +578,9 @@ dependencies = [
[[package]]
name = "tokio-macros"
version = "2.5.0"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8"
checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
dependencies = [
"proc-macro2",
"quote",
@@ -598,9 +589,9 @@ dependencies = [
[[package]]
name = "tokio-util"
version = "0.7.15"
version = "0.7.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df"
checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1"
dependencies = [
"bytes",
"futures-core",
@@ -611,9 +602,9 @@ dependencies = [
[[package]]
name = "tracing"
version = "0.1.41"
version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
"log",
"pin-project-lite",
@@ -623,9 +614,9 @@ dependencies = [
[[package]]
name = "tracing-attributes"
version = "0.1.28"
version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d"
checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
@@ -634,9 +625,9 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.33"
version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c"
checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
dependencies = [
"once_cell",
"valuable",
@@ -655,9 +646,9 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.3.19"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b"
dependencies = [
"matchers",
"nu-ansi-term",
@@ -694,9 +685,9 @@ dependencies = [
[[package]]
name = "unicode-ident"
version = "1.0.13"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe"
checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b"
[[package]]
name = "valuable"
@@ -710,15 +701,6 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasi"
version = "0.13.3+wasi-0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26816d2e1a4a36a2940b96c5296ce403917633dff8f3440e9b236ed6f6bacad2"
dependencies = [
"wit-bindgen-rt",
]
[[package]]
name = "winapi"
version = "0.3.9"
@@ -814,15 +796,6 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]]
name = "wit-bindgen-rt"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3268f3d866458b787f390cf61f4bbb563b922d091359f9608842999eaee3943c"
dependencies = [
"bitflags",
]
[[package]]
name = "zerocopy"
version = "0.7.35"
@@ -830,16 +803,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0"
dependencies = [
"byteorder",
"zerocopy-derive 0.7.35",
]
[[package]]
name = "zerocopy"
version = "0.8.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a367f292d93d4eab890745e75a778da40909cab4d6ff8173693812f79c4a2468"
dependencies = [
"zerocopy-derive 0.8.14",
"zerocopy-derive",
]
[[package]]
@@ -852,14 +816,3 @@ dependencies = [
"quote",
"syn",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3931cb58c62c13adec22e38686b559c86a30565e16ad6e8510a337cedc611e1"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

View File

@@ -3,7 +3,7 @@ members = ["crates/*"]
resolver = "2"
[workspace.package]
version = "0.8.0"
version = "0.2.0"
[workspace.dependencies]
mad = { path = "crates/mad" }

168
README.md
View File

@@ -1,168 +0,0 @@
# MAD - Lifecycle Manager for Rust Applications
[![Crates.io](https://img.shields.io/crates/v/notmad.svg)](https://crates.io/crates/notmad)
[![Documentation](https://docs.rs/notmad/badge.svg)](https://docs.rs/notmad)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
## Overview
MAD is a robust lifecycle manager designed for long-running Rust operations. It provides a simple, composable way to manage multiple concurrent services within your application, handling graceful startup and shutdown automatically.
### Perfect for:
- 🌐 Web servers
- 📨 Queue consumers and message processors
- 🔌 gRPC servers
- ⏰ Cron job runners
- 🔄 Background workers
- 📡 Any long-running async operations
## Features
- **Component-based architecture** - Build your application from composable, reusable components
- **Graceful shutdown** - Automatic handling of shutdown signals with proper cleanup
- **Concurrent execution** - Run multiple components in parallel with tokio
- **Error handling** - Built-in error propagation and logging
- **Cancellation tokens** - Coordinate shutdown across all components
- **Minimal boilerplate** - Focus on your business logic, not lifecycle management
## Installation
Add MAD to your `Cargo.toml`:
```toml
[dependencies]
notmad = "0.7.5"
tokio = { version = "1", features = ["full"] }
async-trait = "0.1"
```
## Quick Start
Here's a simple example of a component that simulates a long-running server:
```rust
use mad::{Component, Mad};
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
// Define your component
struct WebServer {
port: u16,
}
#[async_trait]
impl Component for WebServer {
fn name(&self) -> Option<String> {
Some(format!("WebServer on port {}", self.port))
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
println!("Starting web server on port {}", self.port);
// Your server logic here
// The cancellation token will be triggered on shutdown
tokio::select! {
_ = cancellation.cancelled() => {
println!("Shutting down web server");
}
_ = self.serve() => {
println!("Server stopped");
}
}
Ok(())
}
}
impl WebServer {
async fn serve(&self) {
// Simulate a running server
tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Build and run your application
Mad::builder()
.add(WebServer { port: 8080 })
.add(WebServer { port: 8081 }) // You can add multiple instances
.run()
.await?;
Ok(())
}
```
## Advanced Usage
### Custom Components
Components can be anything that implements the `Component` trait:
```rust
use mad::{Component, Mad};
use async_trait::async_trait;
struct QueueProcessor {
queue_name: String,
}
#[async_trait]
impl Component for QueueProcessor {
fn name(&self) -> Option<String> {
Some(format!("QueueProcessor-{}", self.queue_name))
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
while !cancellation.is_cancelled() {
// Process messages from queue
self.process_next_message().await?;
}
Ok(())
}
}
```
### Error Handling
MAD provides comprehensive error handling through the `MadError` type with automatic conversion from `anyhow::Error`:
```rust
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
// Errors automatically convert from anyhow::Error to MadError
database_operation().await?;
// Or return explicit errors
if some_condition {
return Err(anyhow::anyhow!("Something went wrong").into());
}
Ok(())
}
```
## Examples
Check out the [examples directory](crates/mad/examples) for more detailed examples:
- **basic** - Simple component lifecycle
- **fn** - Using functions as components
- **signals** - Handling system signals
- **error_log** - Error handling and logging
## Contributing
Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
## Author
Created and maintained by [kjuulh](https://github.com/kjuulh)
## Repository
Find the source code at [https://github.com/kjuulh/mad](https://github.com/kjuulh/mad)

View File

@@ -1,23 +1,18 @@
[package]
name = "notmad"
name = "mad"
version.workspace = true
description = "notmad is a life-cycle manager for long running rust operations"
license = "MIT"
repository = "https://github.com/kjuulh/mad"
authors = ["kjuulh"]
edition = "2024"
edition = "2021"
[dependencies]
anyhow.workspace = true
async-trait = "0.1.81"
futures = "0.3.30"
futures-util = "0.3.30"
rand = "0.9.0"
thiserror = "2.0.0"
rand = "0.8.5"
thiserror = "1.0.63"
tokio.workspace = true
tokio-util = "0.7.11"
tracing.workspace = true
[dev-dependencies]
tracing-subscriber = "0.3.18"
tracing-test = { version = "0.2.5", features = ["no-env-filter"] }

View File

@@ -1,40 +0,0 @@
use async_trait::async_trait;
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::Level;
struct WaitServer {}
#[async_trait]
impl notmad::Component for WaitServer {
fn name(&self) -> Option<String> {
Some("WaitServer".into())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
let millis_wait = rand::thread_rng().gen_range(500..3000);
tracing::debug!("waiting: {}ms", millis_wait);
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_max_level(Level::TRACE)
.init();
notmad::Mad::builder()
.add(WaitServer {})
.add(WaitServer {})
.add(WaitServer {})
.add(WaitServer {})
.run()
.await?;
Ok(())
}

View File

@@ -1,18 +0,0 @@
[package]
name = "mad-comprehensive-example"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "comprehensive"
path = "main.rs"
[dependencies]
notmad = { path = "../.." }
tokio = { version = "1", features = ["full"] }
tokio-util = "0.7"
async-trait = "0.1"
anyhow = "1"
tracing = "0.1"
tracing-subscriber = "0.3"
rand = "0.8"

View File

@@ -1,332 +0,0 @@
//! Comprehensive example demonstrating MAD's full capabilities.
//!
//! This example shows:
//! - Multiple component types (struct, closure, conditional)
//! - Component lifecycle (setup, run, close)
//! - Error handling and propagation
//! - Graceful shutdown with cancellation tokens
//! - Concurrent component execution
use async_trait::async_trait;
use notmad::{Component, Mad, MadError};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::time::{Duration, interval};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
/// A web server component that simulates handling HTTP requests.
struct WebServer {
port: u16,
request_count: Arc<AtomicUsize>,
}
#[async_trait]
impl Component for WebServer {
fn name(&self) -> Option<String> {
Some(format!("web-server-{}", self.port))
}
async fn setup(&self) -> Result<(), MadError> {
info!("Setting up web server on port {}", self.port);
// In a real application, you might:
// - Bind to the port
// - Set up TLS certificates
// - Initialize middleware
tokio::time::sleep(Duration::from_millis(100)).await;
info!("Web server on port {} is ready", self.port);
Ok(())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
info!("Web server on port {} started", self.port);
let mut interval = interval(Duration::from_secs(1));
while !cancellation.is_cancelled() {
tokio::select! {
_ = cancellation.cancelled() => {
info!("Web server on port {} received shutdown signal", self.port);
break;
}
_ = interval.tick() => {
// Simulate handling requests
let count = self.request_count.fetch_add(1, Ordering::Relaxed);
info!("Server on port {} handled request #{}", self.port, count + 1);
}
}
}
Ok(())
}
async fn close(&self) -> Result<(), MadError> {
info!("Shutting down web server on port {}", self.port);
// In a real application, you might:
// - Drain existing connections
// - Save server state
// - Close database connections
tokio::time::sleep(Duration::from_millis(200)).await;
let total = self.request_count.load(Ordering::Relaxed);
info!(
"Web server on port {} shut down. Total requests handled: {}",
self.port, total
);
Ok(())
}
}
/// A background job processor that simulates processing tasks from a queue.
struct JobProcessor {
queue_name: String,
processing_interval: Duration,
}
#[async_trait]
impl Component for JobProcessor {
fn name(&self) -> Option<String> {
Some(format!("job-processor-{}", self.queue_name))
}
async fn setup(&self) -> Result<(), MadError> {
info!("Connecting to queue: {}", self.queue_name);
// Simulate connecting to a message queue
tokio::time::sleep(Duration::from_millis(150)).await;
Ok(())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
info!("Job processor for {} started", self.queue_name);
let mut interval = interval(self.processing_interval);
let mut job_count = 0;
loop {
tokio::select! {
_ = cancellation.cancelled() => {
info!("Job processor for {} stopping...", self.queue_name);
break;
}
_ = interval.tick() => {
job_count += 1;
info!("Processing job #{} from {}", job_count, self.queue_name);
// Simulate job processing
tokio::time::sleep(Duration::from_millis(100)).await;
// Simulate occasional errors (but don't fail the component)
if job_count % 10 == 0 {
warn!("Job #{} from {} required retry", job_count, self.queue_name);
}
}
}
}
info!(
"Job processor for {} processed {} jobs",
self.queue_name, job_count
);
Ok(())
}
async fn close(&self) -> Result<(), MadError> {
info!("Disconnecting from queue: {}", self.queue_name);
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
}
/// A health check component that monitors system health.
struct HealthChecker {
check_interval: Duration,
}
#[async_trait]
impl Component for HealthChecker {
fn name(&self) -> Option<String> {
Some("health-checker".to_string())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
info!("Health checker started");
let mut interval = interval(self.check_interval);
while !cancellation.is_cancelled() {
tokio::select! {
_ = cancellation.cancelled() => {
info!("Health checker stopping...");
break;
}
_ = interval.tick() => {
// Simulate health checks
let cpu_usage = rand::random::<f32>() * 100.0;
let memory_usage = rand::random::<f32>() * 100.0;
info!("System health: CPU={:.1}%, Memory={:.1}%", cpu_usage, memory_usage);
if cpu_usage > 90.0 {
warn!("High CPU usage detected: {:.1}%", cpu_usage);
}
if memory_usage > 90.0 {
warn!("High memory usage detected: {:.1}%", memory_usage);
}
}
}
}
Ok(())
}
}
/// A component that will fail after some time to demonstrate error handling.
struct FailingComponent {
fail_after: Duration,
}
#[async_trait]
impl Component for FailingComponent {
fn name(&self) -> Option<String> {
Some("failing-component".to_string())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
info!(
"Failing component started (will fail after {:?})",
self.fail_after
);
tokio::select! {
_ = cancellation.cancelled() => {
info!("Failing component cancelled before failure");
Ok(())
}
_ = tokio::time::sleep(self.fail_after) => {
error!("Failing component encountered an error!");
Err(anyhow::anyhow!("Simulated component failure").into())
}
}
}
}
/// Debug component that logs system status periodically.
struct DebugComponent;
#[async_trait]
impl Component for DebugComponent {
fn name(&self) -> Option<String> {
Some("debug-component".to_string())
}
async fn run(&self, cancel: CancellationToken) -> Result<(), MadError> {
info!("Debug mode enabled - verbose logging active");
let mut interval = interval(Duration::from_secs(5));
while !cancel.is_cancelled() {
tokio::select! {
_ = cancel.cancelled() => break,
_ = interval.tick() => {
info!("DEBUG: System running normally");
}
}
}
info!("Debug component shutting down");
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Initialize tracing for logging
tracing_subscriber::fmt()
.with_target(false)
.without_time()
.init();
info!("Starting comprehensive MAD example application");
// Check if we should enable the failing component
let enable_failure_demo = std::env::var("ENABLE_FAILURE").is_ok();
// Check if we should enable debug mode
let debug_mode = std::env::var("DEBUG").is_ok();
// Shared state for demonstration
let request_count = Arc::new(AtomicUsize::new(0));
// Build and run the application
let result = Mad::builder()
// Add web servers
.add(WebServer {
port: 8080,
request_count: request_count.clone(),
})
.add(WebServer {
port: 8081,
request_count: request_count.clone(),
})
// Add job processors
.add(JobProcessor {
queue_name: "high-priority".to_string(),
processing_interval: Duration::from_secs(2),
})
.add(JobProcessor {
queue_name: "low-priority".to_string(),
processing_interval: Duration::from_secs(5),
})
// Add health checker
.add(HealthChecker {
check_interval: Duration::from_secs(3),
})
// Conditionally add a debug component using add_fn
.add_conditional(debug_mode, DebugComponent)
// Conditionally add failing component to demonstrate error handling
.add_conditional(
enable_failure_demo,
FailingComponent {
fail_after: Duration::from_secs(10),
},
)
// Add a simple metrics reporter using add_fn
.add_fn(|cancel: CancellationToken| async move {
info!("Metrics reporter started");
let mut interval = interval(Duration::from_secs(10));
let start = std::time::Instant::now();
while !cancel.is_cancelled() {
tokio::select! {
_ = cancel.cancelled() => break,
_ = interval.tick() => {
let uptime = start.elapsed();
info!("Application uptime: {:?}", uptime);
}
}
}
info!("Metrics reporter stopped");
Ok(())
})
// Configure graceful shutdown timeout
.cancellation(Some(Duration::from_secs(5)))
// Run the application
.run()
.await;
match result {
Ok(()) => {
info!("Application shut down successfully");
Ok(())
}
Err(e) => {
error!("Application failed: {}", e);
// Check if it's an aggregate error with multiple failures
if let MadError::AggregateError(ref agg) = e {
error!("Multiple component failures detected:");
for (i, err) in agg.get_errors().iter().enumerate() {
error!(" {}. {}", i + 1, err);
}
}
Err(e.into())
}
}
}

View File

@@ -1,42 +0,0 @@
use async_trait::async_trait;
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::Level;
struct ErrorServer {}
#[async_trait]
impl notmad::Component for ErrorServer {
fn name(&self) -> Option<String> {
Some("ErrorServer".into())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
let millis_wait = rand::thread_rng().gen_range(500..3000);
tracing::debug!("waiting: {}ms", millis_wait);
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
Err(notmad::MadError::Inner(anyhow::anyhow!("expected error")))
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_max_level(Level::TRACE)
.init();
// Do note that only the first server which returns an error is guaranteed to be handled. This is because if servers don't respect cancellation, they will be dropped
notmad::Mad::builder()
.add(ErrorServer {})
.add(ErrorServer {})
.add(ErrorServer {})
.add(ErrorServer {})
.run()
.await?;
Ok(())
}

View File

@@ -1,67 +0,0 @@
use async_trait::async_trait;
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::Level;
struct WaitServer {}
#[async_trait]
impl notmad::Component for WaitServer {
fn name(&self) -> Option<String> {
Some("WaitServer".into())
}
async fn run(&self, _cancellation: CancellationToken) -> Result<(), notmad::MadError> {
let millis_wait = rand::thread_rng().gen_range(500..3000);
tracing::debug!("waiting: {}ms", millis_wait);
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_max_level(Level::TRACE)
.init();
let item = "some item".to_string();
notmad::Mad::builder()
.add(WaitServer {})
.add_fn(|_cancel| async move {
let millis_wait = 50;
tracing::debug!("waiting: {}ms", millis_wait);
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
Ok(())
})
.add_fn(move |_cancel| {
// I am an actual closure
let item = item.clone();
async move {
let _item = item;
let millis_wait = 50;
tracing::debug!("waiting: {}ms", millis_wait);
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
Ok(())
}
})
.run()
.await?;
Ok(())
}

View File

@@ -1,15 +0,0 @@
[package]
name = "mad-multi-service-example"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "multi_service"
path = "main.rs"
[dependencies]
notmad = { path = "../.." }
tokio = { version = "1", features = ["full"] }
tokio-util = "0.7"
async-trait = "0.1"
anyhow = "1"

View File

@@ -1,214 +0,0 @@
//! Example demonstrating running multiple services with MAD.
//!
//! This example shows how to run a web server, queue processor, and
//! scheduled task together, with graceful shutdown on Ctrl+C.
use async_trait::async_trait;
use notmad::{Component, Mad, MadError};
use tokio::time::{Duration, interval};
use tokio_util::sync::CancellationToken;
/// A simple web server component.
///
/// In a real application, this would bind to a port and handle HTTP requests.
/// Here we simulate it by periodically logging that we're "handling" requests.
struct WebServer {
port: u16,
}
#[async_trait]
impl Component for WebServer {
fn name(&self) -> Option<String> {
Some(format!("WebServer:{}", self.port))
}
async fn setup(&self) -> Result<(), MadError> {
println!("[{}] Binding to port...", self.name().unwrap());
// Simulate server setup time
tokio::time::sleep(Duration::from_millis(100)).await;
println!("[{}] Ready to accept connections", self.name().unwrap());
Ok(())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
println!("[{}] Server started", self.name().unwrap());
// Simulate handling requests until shutdown
let mut request_id = 0;
let mut interval = interval(Duration::from_secs(2));
while !cancellation.is_cancelled() {
tokio::select! {
_ = cancellation.cancelled() => {
println!("[{}] Shutdown signal received", self.name().unwrap());
break;
}
_ = interval.tick() => {
request_id += 1;
println!("[{}] Handling request #{}", self.name().unwrap(), request_id);
}
}
}
Ok(())
}
async fn close(&self) -> Result<(), MadError> {
println!("[{}] Closing connections...", self.name().unwrap());
// Simulate graceful connection drain
tokio::time::sleep(Duration::from_millis(200)).await;
println!("[{}] Server stopped", self.name().unwrap());
Ok(())
}
}
/// A queue processor that consumes messages from a queue.
///
/// This simulates processing messages from a message queue like
/// RabbitMQ, Kafka, or AWS SQS.
struct QueueProcessor {
queue_name: String,
}
#[async_trait]
impl Component for QueueProcessor {
fn name(&self) -> Option<String> {
Some(format!("QueueProcessor:{}", self.queue_name))
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
println!("[{}] Started processing", self.name().unwrap());
let mut message_count = 0;
// Process messages until shutdown
while !cancellation.is_cancelled() {
// Simulate waiting for and processing a message
tokio::select! {
_ = cancellation.cancelled() => {
println!("[{}] Stopping message processing", self.name().unwrap());
break;
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {
message_count += 1;
println!("[{}] Processed message #{}", self.name().unwrap(), message_count);
}
}
}
println!(
"[{}] Processed {} messages total",
self.name().unwrap(),
message_count
);
Ok(())
}
}
/// A scheduled task that runs periodically.
///
/// This could be used for tasks like:
/// - Cleaning up old data
/// - Generating reports
/// - Syncing with external systems
struct ScheduledTask {
task_name: String,
interval_secs: u64,
}
#[async_trait]
impl Component for ScheduledTask {
fn name(&self) -> Option<String> {
Some(format!("ScheduledTask:{}", self.task_name))
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), MadError> {
println!(
"[{}] Scheduled to run every {} seconds",
self.name().unwrap(),
self.interval_secs
);
let mut interval = interval(Duration::from_secs(self.interval_secs));
let mut run_count = 0;
while !cancellation.is_cancelled() {
tokio::select! {
_ = cancellation.cancelled() => {
println!("[{}] Scheduler stopping", self.name().unwrap());
break;
}
_ = interval.tick() => {
run_count += 1;
println!("[{}] Executing run #{}", self.name().unwrap(), run_count);
// Simulate task execution
tokio::time::sleep(Duration::from_millis(500)).await;
println!("[{}] Run #{} completed", self.name().unwrap(), run_count);
}
}
}
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
println!("Starting multi-service application");
println!("Press Ctrl+C to trigger graceful shutdown");
println!("----------------------------------------");
// Build the application with multiple services
Mad::builder()
// Add a web server on port 8080
.add(WebServer { port: 8080 })
// Add another web server on port 8081 (e.g., admin interface)
.add(WebServer { port: 8081 })
// Add queue processors for different queues
.add(QueueProcessor {
queue_name: "orders".to_string(),
})
.add(QueueProcessor {
queue_name: "notifications".to_string(),
})
// Add scheduled tasks
.add(ScheduledTask {
task_name: "cleanup".to_string(),
interval_secs: 5,
})
.add(ScheduledTask {
task_name: "report_generator".to_string(),
interval_secs: 10,
})
// Add a monitoring component using a closure
.add_fn(|cancel| async move {
println!("[Monitor] Starting system monitor");
let mut interval = interval(Duration::from_secs(15));
while !cancel.is_cancelled() {
tokio::select! {
_ = cancel.cancelled() => {
println!("[Monitor] Stopping monitor");
break;
}
_ = interval.tick() => {
println!("[Monitor] All systems operational");
}
}
}
Ok(())
})
// Set graceful shutdown timeout to 3 seconds
.cancellation(Some(Duration::from_secs(3)))
// Run all components
.run()
.await?;
println!("----------------------------------------");
println!("All services shut down successfully");
Ok(())
}

View File

@@ -1,69 +0,0 @@
use async_trait::async_trait;
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::Level;
struct WaitServer {}
#[async_trait]
impl notmad::Component for WaitServer {
fn name(&self) -> Option<String> {
Some("WaitServer".into())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
let millis_wait = rand::thread_rng().gen_range(500..3000);
tracing::debug!("waiting: {}ms", millis_wait);
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
Ok(())
}
}
struct RespectCancel {}
#[async_trait]
impl notmad::Component for RespectCancel {
fn name(&self) -> Option<String> {
Some("RespectCancel".into())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
cancellation.cancelled().await;
tracing::debug!("stopping because job is cancelled");
Ok(())
}
}
struct NeverStopServer {}
#[async_trait]
impl notmad::Component for NeverStopServer {
fn name(&self) -> Option<String> {
Some("NeverStopServer".into())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(999999999)).await;
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_max_level(Level::TRACE)
.init();
notmad::Mad::builder()
.add(WaitServer {})
.add(NeverStopServer {})
.add(RespectCancel {})
.run()
.await?;
Ok(())
}

View File

@@ -1,177 +1,37 @@
//! # MAD - Lifecycle Manager for Rust Applications
//!
//! MAD is a robust lifecycle manager designed for long-running Rust operations. It provides
//! a simple, composable way to manage multiple concurrent services within your application,
//! handling graceful startup and shutdown automatically.
//!
//! ## Overview
//!
//! MAD helps you build applications composed of multiple long-running components that need
//! to be orchestrated together. It handles:
//!
//! - **Concurrent execution** of multiple components
//! - **Graceful shutdown** with cancellation tokens
//! - **Error aggregation** from multiple components
//! - **Lifecycle management** with setup, run, and close phases
//!
//! ## Quick Start
//!
//! ```rust,no_run
//! use notmad::{Component, Mad};
//! use async_trait::async_trait;
//! use tokio_util::sync::CancellationToken;
//!
//! struct MyService {
//! name: String,
//! }
//!
//! #[async_trait]
//! impl Component for MyService {
//! fn name(&self) -> Option<String> {
//! Some(self.name.clone())
//! }
//!
//! async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
//! // Your service logic here
//! while !cancellation.is_cancelled() {
//! // Do work...
//! tokio::time::sleep(std::time::Duration::from_secs(1)).await;
//! }
//! Ok(())
//! }
//! }
//!
//! #[tokio::main]
//! async fn main() -> anyhow::Result<()> {
//! Mad::builder()
//! .add(MyService { name: "service-1".into() })
//! .add(MyService { name: "service-2".into() })
//! .run()
//! .await?;
//! Ok(())
//! }
//! ```
//!
//! ## Component Lifecycle
//!
//! Components go through three phases:
//!
//! 1. **Setup**: Optional initialization phase before components start running
//! 2. **Run**: Main execution phase where components perform their work
//! 3. **Close**: Optional cleanup phase after components stop
//!
//! ## Error Handling
//!
//! MAD provides comprehensive error handling through [`MadError`], which can:
//! - Wrap errors from individual components
//! - Aggregate multiple errors when several components fail
//! - Automatically convert from `anyhow::Error`
//!
//! ## Shutdown Behavior
//!
//! MAD handles shutdown gracefully:
//! - Responds to SIGTERM and Ctrl+C signals
//! - Propagates cancellation tokens to all components
//! - Waits for components to finish cleanup
//! - Configurable cancellation timeout
use futures::stream::FuturesUnordered;
use futures_util::StreamExt;
use std::{fmt::Display, sync::Arc};
use tokio::signal::unix::{SignalKind, signal};
use tokio_util::sync::CancellationToken;
use crate::waiter::Waiter;
mod waiter;
/// Error type for MAD operations.
///
/// This enum represents all possible errors that can occur during
/// the lifecycle of MAD components.
#[derive(thiserror::Error, Debug)]
pub enum MadError {
/// Generic error wrapper for anyhow errors.
///
/// This variant is used when components return errors via the `?` operator
/// or when converting from `anyhow::Error`.
#[error("component: {0:#?}")]
#[error("component failed: {0}")]
Inner(#[source] anyhow::Error),
/// Error that occurred during the run phase of a component.
#[error("component: {run:#?}")]
#[error("component(s) failed: {run}")]
RunError { run: anyhow::Error },
/// Error that occurred during the close phase of a component.
#[error("component(s) failed: {close}")]
CloseError { close: anyhow::Error },
/// Multiple errors from different components.
///
/// This is used when multiple components fail simultaneously,
/// allowing all errors to be reported rather than just the first one.
#[error("component(s): {0}")]
#[error("component(s) failed: {0}")]
AggregateError(AggregateError),
/// Returned when a component doesn't implement the optional setup method.
///
/// This is not typically an error condition as setup is optional.
#[error("setup not defined")]
SetupNotDefined,
/// Returned when a component doesn't implement the optional close method.
///
/// This is not typically an error condition as close is optional.
#[error("close not defined")]
CloseNotDefined,
}
impl From<anyhow::Error> for MadError {
fn from(value: anyhow::Error) -> Self {
Self::Inner(value)
}
}
/// Container for multiple errors from different components.
///
/// When multiple components fail, their errors are collected
/// into this struct to provide complete error reporting.
#[derive(Debug)]
pub struct AggregateError {
errors: Vec<MadError>,
}
impl AggregateError {
/// Returns a slice of all contained errors.
///
/// # Example
///
/// ```rust,ignore
/// match result {
/// Err(notmad::MadError::AggregateError(agg)) => {
/// for error in agg.get_errors() {
/// eprintln!("Component error: {}", error);
/// }
/// }
/// _ => {}
/// }
/// ```
pub fn get_errors(&self) -> &[MadError] {
&self.errors
}
}
impl Display for AggregateError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.errors.is_empty() {
return Ok(());
}
if self.errors.len() == 1 {
return f.write_str(&self.errors.first().unwrap().to_string());
}
f.write_str("MadError::AggregateError: (")?;
for error in &self.errors {
@@ -183,59 +43,13 @@ impl Display for AggregateError {
}
}
/// The main lifecycle manager for running multiple components.
///
/// `Mad` orchestrates the lifecycle of multiple components, ensuring they
/// start up in order, run concurrently, and shut down gracefully.
///
/// # Example
///
/// ```rust
/// use notmad::{Component, Mad};
/// use async_trait::async_trait;
/// use tokio_util::sync::CancellationToken;
///
/// struct MyComponent;
///
/// #[async_trait]
/// impl Component for MyComponent {
/// async fn run(&self, _cancel: CancellationToken) -> Result<(), notmad::MadError> {
/// Ok(())
/// }
/// }
///
/// # async fn example() -> Result<(), notmad::MadError> {
/// Mad::builder()
/// .add(MyComponent)
/// .run()
/// .await?;
/// # Ok(())
/// # }
/// ```
pub struct Mad {
components: Vec<Arc<dyn Component + Send + Sync + 'static>>,
should_cancel: Option<std::time::Duration>,
}
struct CompletionResult {
res: Result<(), MadError>,
name: Option<String>,
}
impl Mad {
/// Creates a new `Mad` builder.
///
/// This is the entry point for constructing a MAD application.
/// Components are added using the builder pattern before calling `run()`.
///
/// # Example
///
/// ```rust
/// use notmad::Mad;
///
/// let mut app = Mad::builder();
/// ```
pub fn builder() -> Self {
Self {
components: Vec::default(),
@@ -244,190 +58,18 @@ impl Mad {
}
}
/// Adds a component to the MAD application.
///
/// Components will be set up in the order they are added,
/// run concurrently, and closed in the order they were added.
///
/// # Arguments
///
/// * `component` - Any type that implements `Component` or `IntoComponent`
///
/// # Example
///
/// ```rust
/// use notmad::{Component, Mad};
/// # use async_trait::async_trait;
/// # use tokio_util::sync::CancellationToken;
/// # struct MyService;
/// # #[async_trait]
/// # impl Component for MyService {
/// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) }
/// # }
///
/// Mad::builder()
/// .add(MyService)
/// .add(MyService);
/// ```
pub fn add(&mut self, component: impl IntoComponent) -> &mut Self {
self.components.push(component.into_component());
self
}
/// Conditionally adds a component based on a boolean condition.
///
/// If the condition is false, a waiter component is added instead,
/// which simply waits for cancellation without doing any work.
///
/// # Arguments
///
/// * `condition` - If true, adds the component; if false, adds a waiter
/// * `component` - The component to add if condition is true
///
/// # Example
///
/// ```rust
/// use notmad::Mad;
/// # use notmad::Component;
/// # use async_trait::async_trait;
/// # use tokio_util::sync::CancellationToken;
/// # struct DebugService;
/// # #[async_trait]
/// # impl Component for DebugService {
/// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) }
/// # }
///
/// let enable_debug = std::env::var("DEBUG").is_ok();
///
/// Mad::builder()
/// .add_conditional(enable_debug, DebugService);
/// ```
pub fn add_conditional(&mut self, condition: bool, component: impl IntoComponent) -> &mut Self {
if condition {
self.components.push(component.into_component());
} else {
self.components
.push(Waiter::new(component.into_component()).into_component())
}
self
}
/// Adds a waiter component that does nothing but wait for cancellation.
///
/// This is useful when you need a placeholder component or want
/// the application to keep running without any specific work.
///
/// # Example
///
/// ```rust,no_run
/// # async fn example() {
/// use notmad::Mad;
///
/// Mad::builder()
/// .add_wait() // Keeps the app running until shutdown signal
/// .run()
/// .await;
/// # }
/// ```
pub fn add_wait(&mut self) -> &mut Self {
self.components.push(Waiter::default().into_component());
self
}
/// Adds a closure or function as a component.
///
/// This is a convenient way to add simple components without
/// creating a full struct that implements `Component`.
///
/// # Arguments
///
/// * `f` - A closure that takes a `CancellationToken` and returns a future
///
/// # Example
///
/// ```rust
/// use notmad::Mad;
/// use tokio_util::sync::CancellationToken;
///
/// Mad::builder()
/// .add_fn(|cancel: CancellationToken| async move {
/// while !cancel.is_cancelled() {
/// println!("Working...");
/// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
/// }
/// Ok(())
/// });
/// ```
pub fn add_fn<F, Fut>(&mut self, f: F) -> &mut Self
where
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
Fut: futures::Future<Output = Result<(), MadError>> + Send + 'static,
{
let comp = ClosureComponent { inner: Box::new(f) };
self.add(comp)
}
/// Configures the cancellation timeout behavior.
///
/// When a shutdown signal is received, MAD will:
/// 1. Send cancellation tokens to all components
/// 2. Wait for the specified duration
/// 3. Force shutdown if components haven't stopped
///
/// # Arguments
///
/// * `should_cancel` - Duration to wait after cancellation before forcing shutdown.
/// Pass `None` to wait indefinitely.
///
/// # Example
///
/// ```rust,no_run
/// # async fn example() {
/// use notmad::Mad;
/// use std::time::Duration;
///
/// Mad::builder()
/// .cancellation(Some(Duration::from_secs(30))) // 30 second grace period
/// .run()
/// .await;
/// # }
/// ```
pub fn cancellation(&mut self, should_cancel: Option<std::time::Duration>) -> &mut Self {
self.should_cancel = should_cancel;
self
}
/// Runs all components until completion or shutdown.
///
/// This method:
/// 1. Calls `setup()` on all components (in order)
/// 2. Starts all components concurrently
/// 3. Waits for shutdown signal (SIGTERM, Ctrl+C) or component failure
/// 4. Sends cancellation to all components
/// 5. Calls `close()` on all components (in order)
///
/// # Returns
///
/// * `Ok(())` if all components shut down cleanly
/// * `Err(MadError)` if any component fails
///
/// # Example
///
/// ```rust,no_run
/// # use notmad::Mad;
/// # async fn example() -> Result<(), notmad::MadError> {
/// Mad::builder()
/// .add_wait()
/// .run()
/// .await?;
/// # Ok(())
/// # }
/// ```
pub async fn run(&mut self) -> Result<(), MadError> {
tracing::info!("running mad setup");
@@ -437,12 +79,11 @@ impl Mad {
let close_result = self.close_components().await;
tracing::info!("mad is closed down");
match (run_result, close_result) {
(Err(run), Err(close)) => {
return Err(MadError::AggregateError(AggregateError {
errors: vec![run, close],
}));
}))
}
(Ok(_), Ok(_)) => {}
(Ok(_), Err(close)) => return Err(close),
@@ -475,95 +116,54 @@ impl Mad {
let mut channels = Vec::new();
let cancellation_token = CancellationToken::new();
let job_cancellation = CancellationToken::new();
let job_done = CancellationToken::new();
for comp in &self.components {
let comp = comp.clone();
let cancellation_token = cancellation_token.child_token();
let job_cancellation = job_cancellation.child_token();
let (error_tx, error_rx) = tokio::sync::mpsc::channel::<CompletionResult>(1);
let (error_tx, error_rx) = tokio::sync::mpsc::channel::<Result<(), MadError>>(1);
channels.push(error_rx);
tokio::spawn(async move {
let name = comp.name().clone();
tracing::debug!(component = name, "mad running");
tracing::debug!(component = &comp.name(), "mad running");
tokio::select! {
_ = cancellation_token.cancelled() => {
error_tx.send(CompletionResult { res: Ok(()) , name }).await
error_tx.send(Ok(())).await
}
res = comp.run(job_cancellation) => {
error_tx.send(CompletionResult { res , name }).await
error_tx.send(res).await
}
_ = tokio::signal::ctrl_c() => {
error_tx.send(Ok(())).await
}
}
});
}
tokio::spawn({
let cancellation_token = cancellation_token;
let job_done = job_done.child_token();
let wait_cancel = self.should_cancel;
async move {
let should_cancel =
|cancel: CancellationToken,
global_cancel: CancellationToken,
wait: Option<std::time::Duration>| async move {
if let Some(cancel_wait) = wait {
cancel.cancel();
tokio::time::sleep(cancel_wait).await;
global_cancel.cancel();
}
};
tokio::select! {
_ = cancellation_token.cancelled() => {
job_cancellation.cancel();
}
_ = job_done.cancelled() => {
should_cancel(job_cancellation, cancellation_token, wait_cancel).await;
}
_ = tokio::signal::ctrl_c() => {
should_cancel(job_cancellation, cancellation_token,wait_cancel).await;
}
_ = signal_unix_terminate() => {
should_cancel(job_cancellation, cancellation_token, wait_cancel).await;
}
}
}
});
let mut futures = FuturesUnordered::new();
for channel in channels.iter_mut() {
futures.push(channel.recv());
}
let mut errors = Vec::new();
while let Some(Some(msg)) = futures.next().await {
match msg.res {
Err(e) => {
tracing::debug!(
error = e.to_string(),
component = msg.name,
"component ran to completion with error"
);
errors.push(e);
}
Ok(_) => {
tracing::debug!(component = msg.name, "component ran to completion");
tracing::trace!("received end signal from a component");
if let Err(e) = msg {
tracing::debug!(error = e.to_string(), "stopping running components");
job_cancellation.cancel();
if let Some(cancel_wait) = self.should_cancel {
tokio::time::sleep(cancel_wait).await;
cancellation_token.cancel();
}
}
job_done.cancel();
}
tracing::debug!("ran components");
if !errors.is_empty() {
return Err(MadError::AggregateError(AggregateError { errors }));
}
Ok(())
}
@@ -585,153 +185,24 @@ impl Mad {
}
}
async fn signal_unix_terminate() {
let mut sigterm = signal(SignalKind::terminate()).expect("Failed to bind SIGTERM handler");
sigterm.recv().await;
}
/// Trait for implementing MAD components.
///
/// Components represent individual services or tasks that run as part
/// of your application. Each component has its own lifecycle with
/// optional setup and cleanup phases.
///
/// # Example
///
/// ```rust
/// use notmad::{Component, MadError};
/// use async_trait::async_trait;
/// use tokio_util::sync::CancellationToken;
///
/// struct DatabaseConnection {
/// url: String,
/// }
///
/// #[async_trait]
/// impl Component for DatabaseConnection {
/// fn name(&self) -> Option<String> {
/// Some("database".to_string())
/// }
///
/// async fn setup(&self) -> Result<(), MadError> {
/// println!("Connecting to database...");
/// // Initialize connection pool
/// Ok(())
/// }
///
/// async fn run(&self, cancel: CancellationToken) -> Result<(), MadError> {
/// // Keep connection alive, handle queries
/// cancel.cancelled().await;
/// Ok(())
/// }
///
/// async fn close(&self) -> Result<(), MadError> {
/// println!("Closing database connection...");
/// // Clean up resources
/// Ok(())
/// }
/// }
/// ```
#[async_trait::async_trait]
pub trait Component {
/// Returns an optional name for the component.
///
/// This name is used in logging and error messages to identify
/// which component is being processed.
///
/// # Default
///
/// Returns `None` if not overridden.
fn name(&self) -> Option<String> {
None
}
/// Optional setup phase called before the component starts running.
///
/// Use this for initialization tasks like:
/// - Establishing database connections
/// - Loading configuration
/// - Preparing resources
///
/// # Default
///
/// Returns `MadError::SetupNotDefined` which is handled gracefully.
///
/// # Errors
///
/// If setup fails with an error other than `SetupNotDefined`,
/// the entire application will stop before any components start running.
async fn setup(&self) -> Result<(), MadError> {
Err(MadError::SetupNotDefined)
}
/// Main execution phase of the component.
///
/// This method should contain the primary logic of your component.
/// It should respect the cancellation token and shut down gracefully
/// when cancellation is requested.
///
/// # Arguments
///
/// * `cancellation_token` - Signal for graceful shutdown
///
/// # Implementation Guidelines
///
/// - Check `cancellation_token.is_cancelled()` periodically
/// - Use `tokio::select!` with `cancellation_token.cancelled()` for async operations
/// - Clean up resources before returning
///
/// # Errors
///
/// Any error returned will trigger shutdown of all other components.
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError>;
/// Optional cleanup phase called after the component stops.
///
/// Use this for cleanup tasks like:
/// - Flushing buffers
/// - Closing connections
/// - Saving state
///
/// # Default
///
/// Returns `MadError::CloseNotDefined` which is handled gracefully.
///
/// # Errors
///
/// Errors during close are logged but don't prevent other components
/// from closing.
async fn close(&self) -> Result<(), MadError> {
Err(MadError::CloseNotDefined)
}
}
/// Trait for converting types into components.
///
/// This trait is automatically implemented for all types that implement
/// `Component + Send + Sync + 'static`, allowing them to be added to MAD
/// directly without manual conversion.
///
/// # Example
///
/// ```rust
/// use notmad::{Component, IntoComponent, Mad};
/// # use async_trait::async_trait;
/// # use tokio_util::sync::CancellationToken;
///
/// struct MyService;
///
/// # #[async_trait]
/// # impl Component for MyService {
/// # async fn run(&self, _: CancellationToken) -> Result<(), notmad::MadError> { Ok(()) }
/// # }
///
/// // MyService automatically implements IntoComponent
/// Mad::builder()
/// .add(MyService); // Works directly
/// ```
pub trait IntoComponent {
/// Converts self into an Arc-wrapped component.
fn into_component(self) -> Arc<dyn Component + Send + Sync + 'static>;
}
@@ -740,34 +211,3 @@ impl<T: Component + Send + Sync + 'static> IntoComponent for T {
Arc::new(self)
}
}
struct ClosureComponent<F, Fut>
where
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
Fut: futures::Future<Output = Result<(), MadError>> + Send + 'static,
{
inner: Box<F>,
}
impl<F, Fut> ClosureComponent<F, Fut>
where
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
Fut: futures::Future<Output = Result<(), MadError>> + Send + 'static,
{
pub async fn execute(&self, cancellation_token: CancellationToken) -> Result<(), MadError> {
(*self.inner)(cancellation_token).await?;
Ok(())
}
}
#[async_trait::async_trait]
impl<F, Fut> Component for ClosureComponent<F, Fut>
where
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
Fut: futures::Future<Output = Result<(), MadError>> + Send + 'static,
{
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError> {
self.execute(cancellation_token).await
}
}

View File

@@ -1,84 +0,0 @@
//! Waiter components for MAD.
//!
//! This module provides waiter components that simply wait for cancellation
//! without performing any work. Useful for keeping the application alive
//! or as placeholders in conditional component loading.
use std::sync::Arc;
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
use crate::{Component, MadError};
/// A default waiter component that panics if run.
///
/// This is used internally as a placeholder that should never
/// actually be executed.
pub struct DefaultWaiter {}
#[async_trait]
impl Component for DefaultWaiter {
async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), MadError> {
panic!("should never be called");
}
}
/// A wrapper component that waits for cancellation.
///
/// Instead of running the wrapped component's logic, this simply
/// waits for the cancellation token. This is useful for conditionally
/// disabling components while keeping the same structure.
///
/// # Example
///
/// ```rust,ignore
/// use mad::Waiter;
///
/// // Instead of running the service, just wait
/// let waiter = Waiter::new(service.into_component());
/// ```
pub struct Waiter {
comp: Arc<dyn Component + Send + Sync + 'static>,
}
impl Default for Waiter {
fn default() -> Self {
Self {
comp: Arc::new(DefaultWaiter {}),
}
}
}
impl Waiter {
/// Creates a new waiter that wraps the given component.
///
/// The wrapped component's name will be used (prefixed with "waiter/"),
/// but its run method will not be called.
pub fn new(c: Arc<dyn Component + Send + Sync + 'static>) -> Self {
Self { comp: c }
}
}
#[async_trait]
impl Component for Waiter {
/// Returns the name of the waiter, prefixed with "waiter/".
///
/// If the wrapped component has a name, it will be "waiter/{name}".
/// Otherwise, returns "waiter".
fn name(&self) -> Option<String> {
match self.comp.name() {
Some(name) => Some(format!("waiter/{name}")),
None => Some("waiter".into()),
}
}
/// Waits for cancellation without performing any work.
///
/// This method simply waits for the cancellation token to be triggered,
/// then returns successfully.
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError> {
cancellation_token.cancelled().await;
Ok(())
}
}

View File

@@ -1,9 +1,6 @@
use std::sync::Arc;
use async_trait::async_trait;
use notmad::{Component, Mad, MadError};
use mad::{Component, Mad};
use rand::Rng;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use tracing_test::traced_test;
@@ -15,7 +12,7 @@ impl Component for NeverEndingRun {
Some("NeverEndingRun".into())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), notmad::MadError> {
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
let millis_wait = rand::thread_rng().gen_range(50..1000);
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
@@ -89,68 +86,3 @@ async fn test_can_run_components() -> anyhow::Result<()> {
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_can_shutdown_gracefully() -> anyhow::Result<()> {
let check = Arc::new(Mutex::new(None));
Mad::builder()
.add_fn({
let check = check.clone();
move |cancel| {
let check = check.clone();
async move {
let start = std::time::SystemTime::now();
tracing::info!("waiting for cancel");
cancel.cancelled().await;
tracing::info!("submitting check");
let mut check = check.lock().await;
let elapsed = start.elapsed().expect("to be able to get elapsed");
*check = Some(elapsed);
tracing::info!("check submitted");
Ok(())
}
}
})
.add_fn(|_| async move {
tracing::info!("starting sleep");
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
tracing::info!("sleep ended");
Ok(())
})
.run()
.await?;
let check = check
.lock()
.await
.expect("to be able to get a duration from cancel");
// We default wait 100 ms for graceful shutdown, and we explicitly wait 100ms in the sleep routine
tracing::info!("check millis: {}", check.as_millis());
assert!(check.as_millis() < 250);
Ok(())
}
#[test]
fn test_can_easily_transform_error() -> anyhow::Result<()> {
fn fallible() -> anyhow::Result<()> {
Ok(())
}
fn inner() -> Result<(), MadError> {
fallible()?;
Ok(())
}
inner()?;
Ok(())
}

View File

@@ -1,6 +1,6 @@
# yaml-language-server: $schema=https://git.kjuulh.io/kjuulh/cuddle/raw/branch/main/schemas/base.json
# yaml-language-server: $schema=https://git.front.kjuulh.io/kjuulh/cuddle/raw/branch/main/schemas/base.json
base: "git@git.kjuulh.io:kjuulh/cuddle-rust-lib-plan.git"
base: "git@git.front.kjuulh.io:kjuulh/cuddle-rust-lib-plan.git"
vars:
service: "mad"
@@ -12,6 +12,6 @@ please:
repository: "mad"
branch: main
settings:
api_url: "https://git.kjuulh.io"
api_url: "https://git.front.kjuulh.io"
actions:
rust:

View File

@@ -1,3 +0,0 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
}