9 Commits

Author SHA1 Message Date
72755f9cf1 chore(release): v0.7.1 (#12)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
chore(release): 0.7.1

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: https://git.front.kjuulh.io/kjuulh/mad/pulls/12
2024-11-24 11:28:02 +01:00
ae0b8b703e fix: make sure to close on final
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 11:24:13 +01:00
3c3f638004 chore(release): v0.7.0 (#11)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
chore(release): 0.7.0

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: https://git.front.kjuulh.io/kjuulh/mad/pulls/11
2024-11-24 10:49:56 +01:00
ea5287152c feat: actually bubble up errors
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 10:47:56 +01:00
14371cdfd7 fix(deps): update rust crate thiserror to v2 (#9)
All checks were successful
continuous-integration/drone/push Build is passing
2024-11-24 02:52:50 +01:00
3a1b1673ef chore(release): v0.6.0 (#8)
All checks were successful
continuous-integration/drone/tag Build is passing
continuous-integration/drone/push Build is passing
chore(release): 0.6.0

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: https://git.front.kjuulh.io/kjuulh/mad/pulls/8
2024-11-24 00:26:50 +01:00
89cbae24d0 chore: Configure Renovate (#7)
All checks were successful
continuous-integration/drone/push Build is passing
Add renovate.json

Reviewed-on: https://git.front.kjuulh.io/kjuulh/mad/pulls/7
2024-11-24 00:26:37 +01:00
7c1b317d08 feat: adding test to make sure we can gracefully shutdown
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 00:25:02 +01:00
1fec4e3708 feat: make sure to close down properly
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 00:08:03 +01:00
7 changed files with 122 additions and 19 deletions

View File

@@ -6,6 +6,25 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
## [0.7.1] - 2024-11-24
### Fixed
- make sure to close on final
## [0.7.0] - 2024-11-24
### Added
- actually bubble up errors
### Fixed
- *(deps)* update rust crate thiserror to v2 (#9)
## [0.6.0] - 2024-11-23
### Added
- adding test to make sure we can gracefully shutdown
- make sure to close down properly
## [0.5.0] - 2024-11-19 ## [0.5.0] - 2024-11-19
### Added ### Added

10
Cargo.lock generated
View File

@@ -266,7 +266,7 @@ dependencies = [
[[package]] [[package]]
name = "notmad" name = "notmad"
version = "0.4.0" version = "0.7.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
@@ -525,18 +525,18 @@ dependencies = [
[[package]] [[package]]
name = "thiserror" name = "thiserror"
version = "1.0.69" version = "2.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" checksum = "c006c85c7651b3cf2ada4584faa36773bd07bac24acfb39f3c431b36d7e667aa"
dependencies = [ dependencies = [
"thiserror-impl", "thiserror-impl",
] ]
[[package]] [[package]]
name = "thiserror-impl" name = "thiserror-impl"
version = "1.0.69" version = "2.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",

View File

@@ -3,7 +3,7 @@ members = ["crates/*"]
resolver = "2" resolver = "2"
[workspace.package] [workspace.package]
version = "0.5.0" version = "0.7.1"
[workspace.dependencies] [workspace.dependencies]
mad = { path = "crates/mad" } mad = { path = "crates/mad" }

View File

@@ -13,7 +13,7 @@ async-trait = "0.1.81"
futures = "0.3.30" futures = "0.3.30"
futures-util = "0.3.30" futures-util = "0.3.30"
rand = "0.8.5" rand = "0.8.5"
thiserror = "1.0.63" thiserror = "2.0.0"
tokio.workspace = true tokio.workspace = true
tokio-util = "0.7.11" tokio-util = "0.7.11"
tracing.workspace = true tracing.workspace = true

View File

@@ -133,6 +133,7 @@ impl Mad {
let mut channels = Vec::new(); let mut channels = Vec::new();
let cancellation_token = CancellationToken::new(); let cancellation_token = CancellationToken::new();
let job_cancellation = CancellationToken::new(); let job_cancellation = CancellationToken::new();
let job_done = CancellationToken::new();
for comp in &self.components { for comp in &self.components {
let comp = comp.clone(); let comp = comp.clone();
@@ -154,21 +155,51 @@ impl Mad {
res = comp.run(job_cancellation) => { res = comp.run(job_cancellation) => {
error_tx.send(CompletionResult { res , name }).await error_tx.send(CompletionResult { res , name }).await
} }
_ = tokio::signal::ctrl_c() => {
error_tx.send(CompletionResult { res: Ok(()) , name }).await
}
_ = signal_unix_terminate() => {
error_tx.send(CompletionResult { res: Ok(()) , name }).await
}
} }
}); });
} }
tokio::spawn({
let cancellation_token = cancellation_token;
let job_done = job_done.child_token();
let wait_cancel = self.should_cancel;
async move {
let should_cancel =
|cancel: CancellationToken,
global_cancel: CancellationToken,
wait: Option<std::time::Duration>| async move {
if let Some(cancel_wait) = wait {
cancel.cancel();
tokio::time::sleep(cancel_wait).await;
global_cancel.cancel();
}
};
tokio::select! {
_ = cancellation_token.cancelled() => {
job_cancellation.cancel();
}
_ = job_done.cancelled() => {
should_cancel(job_cancellation, cancellation_token, wait_cancel).await;
}
_ = tokio::signal::ctrl_c() => {
should_cancel(job_cancellation, cancellation_token,wait_cancel).await;
}
_ = signal_unix_terminate() => {
should_cancel(job_cancellation, cancellation_token, wait_cancel).await;
}
}
}
});
let mut futures = FuturesUnordered::new(); let mut futures = FuturesUnordered::new();
for channel in channels.iter_mut() { for channel in channels.iter_mut() {
futures.push(channel.recv()); futures.push(channel.recv());
} }
let mut errors = Vec::new();
while let Some(Some(msg)) = futures.next().await { while let Some(Some(msg)) = futures.next().await {
match msg.res { match msg.res {
Err(e) => { Err(e) => {
@@ -177,21 +208,20 @@ impl Mad {
component = msg.name, component = msg.name,
"component ran to completion with error" "component ran to completion with error"
); );
errors.push(e);
} }
Ok(_) => { Ok(_) => {
tracing::debug!(component = msg.name, "component ran to completion"); tracing::debug!(component = msg.name, "component ran to completion");
} }
} }
job_cancellation.cancel(); job_done.cancel();
if let Some(cancel_wait) = self.should_cancel {
tokio::time::sleep(cancel_wait).await;
cancellation_token.cancel();
}
} }
tracing::debug!("ran components"); tracing::debug!("ran components");
if !errors.is_empty() {
return Err(MadError::AggregateError(AggregateError { errors }));
}
Ok(()) Ok(())
} }

View File

@@ -1,6 +1,9 @@
use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use notmad::{Component, Mad}; use notmad::{Component, Mad};
use rand::Rng; use rand::Rng;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing_test::traced_test; use tracing_test::traced_test;
@@ -86,3 +89,51 @@ async fn test_can_run_components() -> anyhow::Result<()> {
Ok(()) Ok(())
} }
#[tokio::test]
#[traced_test]
async fn test_can_shutdown_gracefully() -> anyhow::Result<()> {
let check = Arc::new(Mutex::new(None));
Mad::builder()
.add_fn({
let check = check.clone();
move |cancel| {
let check = check.clone();
async move {
let start = std::time::SystemTime::now();
tracing::info!("waiting for cancel");
cancel.cancelled().await;
tracing::info!("submitting check");
let mut check = check.lock().await;
let elapsed = start.elapsed().expect("to be able to get elapsed");
*check = Some(elapsed);
tracing::info!("check submitted");
Ok(())
}
}
})
.add_fn(|_| async move {
tracing::info!("starting sleep");
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
tracing::info!("sleep ended");
Ok(())
})
.run()
.await?;
let check = check
.lock()
.await
.expect("to be able to get a duration from cancel");
// We default wait 100 ms for graceful shutdown, and we explicitly wait 100ms in the sleep routine
tracing::info!("check millis: {}", check.as_millis());
assert!(check.as_millis() < 250);
Ok(())
}

3
renovate.json Normal file
View File

@@ -0,0 +1,3 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
}