Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
72755f9cf1 | |||
ae0b8b703e
|
|||
3c3f638004 | |||
ea5287152c
|
|||
14371cdfd7 | |||
3a1b1673ef | |||
89cbae24d0 | |||
7c1b317d08
|
|||
1fec4e3708
|
19
CHANGELOG.md
19
CHANGELOG.md
@@ -6,6 +6,25 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
## [0.7.1] - 2024-11-24
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
- make sure to close on final
|
||||||
|
|
||||||
|
## [0.7.0] - 2024-11-24
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- actually bubble up errors
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
- *(deps)* update rust crate thiserror to v2 (#9)
|
||||||
|
|
||||||
|
## [0.6.0] - 2024-11-23
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- adding test to make sure we can gracefully shutdown
|
||||||
|
- make sure to close down properly
|
||||||
|
|
||||||
## [0.5.0] - 2024-11-19
|
## [0.5.0] - 2024-11-19
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -266,7 +266,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "notmad"
|
name = "notmad"
|
||||||
version = "0.4.0"
|
version = "0.7.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@@ -525,18 +525,18 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "thiserror"
|
name = "thiserror"
|
||||||
version = "1.0.69"
|
version = "2.0.3"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
|
checksum = "c006c85c7651b3cf2ada4584faa36773bd07bac24acfb39f3c431b36d7e667aa"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"thiserror-impl",
|
"thiserror-impl",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "thiserror-impl"
|
name = "thiserror-impl"
|
||||||
version = "1.0.69"
|
version = "2.0.3"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
|
checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
|
@@ -3,7 +3,7 @@ members = ["crates/*"]
|
|||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
version = "0.5.0"
|
version = "0.7.1"
|
||||||
|
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
mad = { path = "crates/mad" }
|
mad = { path = "crates/mad" }
|
||||||
|
@@ -13,7 +13,7 @@ async-trait = "0.1.81"
|
|||||||
futures = "0.3.30"
|
futures = "0.3.30"
|
||||||
futures-util = "0.3.30"
|
futures-util = "0.3.30"
|
||||||
rand = "0.8.5"
|
rand = "0.8.5"
|
||||||
thiserror = "1.0.63"
|
thiserror = "2.0.0"
|
||||||
tokio.workspace = true
|
tokio.workspace = true
|
||||||
tokio-util = "0.7.11"
|
tokio-util = "0.7.11"
|
||||||
tracing.workspace = true
|
tracing.workspace = true
|
||||||
|
@@ -133,6 +133,7 @@ impl Mad {
|
|||||||
let mut channels = Vec::new();
|
let mut channels = Vec::new();
|
||||||
let cancellation_token = CancellationToken::new();
|
let cancellation_token = CancellationToken::new();
|
||||||
let job_cancellation = CancellationToken::new();
|
let job_cancellation = CancellationToken::new();
|
||||||
|
let job_done = CancellationToken::new();
|
||||||
|
|
||||||
for comp in &self.components {
|
for comp in &self.components {
|
||||||
let comp = comp.clone();
|
let comp = comp.clone();
|
||||||
@@ -154,21 +155,51 @@ impl Mad {
|
|||||||
res = comp.run(job_cancellation) => {
|
res = comp.run(job_cancellation) => {
|
||||||
error_tx.send(CompletionResult { res , name }).await
|
error_tx.send(CompletionResult { res , name }).await
|
||||||
}
|
}
|
||||||
_ = tokio::signal::ctrl_c() => {
|
|
||||||
error_tx.send(CompletionResult { res: Ok(()) , name }).await
|
|
||||||
}
|
|
||||||
_ = signal_unix_terminate() => {
|
|
||||||
error_tx.send(CompletionResult { res: Ok(()) , name }).await
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tokio::spawn({
|
||||||
|
let cancellation_token = cancellation_token;
|
||||||
|
let job_done = job_done.child_token();
|
||||||
|
|
||||||
|
let wait_cancel = self.should_cancel;
|
||||||
|
|
||||||
|
async move {
|
||||||
|
let should_cancel =
|
||||||
|
|cancel: CancellationToken,
|
||||||
|
global_cancel: CancellationToken,
|
||||||
|
wait: Option<std::time::Duration>| async move {
|
||||||
|
if let Some(cancel_wait) = wait {
|
||||||
|
cancel.cancel();
|
||||||
|
tokio::time::sleep(cancel_wait).await;
|
||||||
|
global_cancel.cancel();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
_ = cancellation_token.cancelled() => {
|
||||||
|
job_cancellation.cancel();
|
||||||
|
}
|
||||||
|
_ = job_done.cancelled() => {
|
||||||
|
should_cancel(job_cancellation, cancellation_token, wait_cancel).await;
|
||||||
|
}
|
||||||
|
_ = tokio::signal::ctrl_c() => {
|
||||||
|
should_cancel(job_cancellation, cancellation_token,wait_cancel).await;
|
||||||
|
}
|
||||||
|
_ = signal_unix_terminate() => {
|
||||||
|
should_cancel(job_cancellation, cancellation_token, wait_cancel).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
let mut futures = FuturesUnordered::new();
|
let mut futures = FuturesUnordered::new();
|
||||||
for channel in channels.iter_mut() {
|
for channel in channels.iter_mut() {
|
||||||
futures.push(channel.recv());
|
futures.push(channel.recv());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut errors = Vec::new();
|
||||||
while let Some(Some(msg)) = futures.next().await {
|
while let Some(Some(msg)) = futures.next().await {
|
||||||
match msg.res {
|
match msg.res {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -177,21 +208,20 @@ impl Mad {
|
|||||||
component = msg.name,
|
component = msg.name,
|
||||||
"component ran to completion with error"
|
"component ran to completion with error"
|
||||||
);
|
);
|
||||||
|
errors.push(e);
|
||||||
}
|
}
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
tracing::debug!(component = msg.name, "component ran to completion");
|
tracing::debug!(component = msg.name, "component ran to completion");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
job_cancellation.cancel();
|
job_done.cancel();
|
||||||
if let Some(cancel_wait) = self.should_cancel {
|
|
||||||
tokio::time::sleep(cancel_wait).await;
|
|
||||||
|
|
||||||
cancellation_token.cancel();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::debug!("ran components");
|
tracing::debug!("ran components");
|
||||||
|
if !errors.is_empty() {
|
||||||
|
return Err(MadError::AggregateError(AggregateError { errors }));
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@@ -1,6 +1,9 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use notmad::{Component, Mad};
|
use notmad::{Component, Mad};
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing_test::traced_test;
|
use tracing_test::traced_test;
|
||||||
|
|
||||||
@@ -86,3 +89,51 @@ async fn test_can_run_components() -> anyhow::Result<()> {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
#[traced_test]
|
||||||
|
async fn test_can_shutdown_gracefully() -> anyhow::Result<()> {
|
||||||
|
let check = Arc::new(Mutex::new(None));
|
||||||
|
|
||||||
|
Mad::builder()
|
||||||
|
.add_fn({
|
||||||
|
let check = check.clone();
|
||||||
|
|
||||||
|
move |cancel| {
|
||||||
|
let check = check.clone();
|
||||||
|
|
||||||
|
async move {
|
||||||
|
let start = std::time::SystemTime::now();
|
||||||
|
tracing::info!("waiting for cancel");
|
||||||
|
cancel.cancelled().await;
|
||||||
|
tracing::info!("submitting check");
|
||||||
|
let mut check = check.lock().await;
|
||||||
|
let elapsed = start.elapsed().expect("to be able to get elapsed");
|
||||||
|
*check = Some(elapsed);
|
||||||
|
tracing::info!("check submitted");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.add_fn(|_| async move {
|
||||||
|
tracing::info!("starting sleep");
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||||
|
tracing::info!("sleep ended");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
.run()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let check = check
|
||||||
|
.lock()
|
||||||
|
.await
|
||||||
|
.expect("to be able to get a duration from cancel");
|
||||||
|
|
||||||
|
// We default wait 100 ms for graceful shutdown, and we explicitly wait 100ms in the sleep routine
|
||||||
|
tracing::info!("check millis: {}", check.as_millis());
|
||||||
|
assert!(check.as_millis() < 250);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
3
renovate.json
Normal file
3
renovate.json
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
{
|
||||||
|
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
|
||||||
|
}
|
Reference in New Issue
Block a user