Compare commits
4 Commits
Author | SHA1 | Date | |
---|---|---|---|
3a1b1673ef | |||
89cbae24d0 | |||
7c1b317d08
|
|||
1fec4e3708
|
@@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [0.6.0] - 2024-11-23
|
||||
|
||||
### Added
|
||||
- adding test to make sure we can gracefully shutdown
|
||||
- make sure to close down properly
|
||||
|
||||
## [0.5.0] - 2024-11-19
|
||||
|
||||
### Added
|
||||
|
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -266,7 +266,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "notmad"
|
||||
version = "0.4.0"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
|
@@ -3,7 +3,7 @@ members = ["crates/*"]
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
version = "0.5.0"
|
||||
version = "0.6.0"
|
||||
|
||||
[workspace.dependencies]
|
||||
mad = { path = "crates/mad" }
|
||||
|
@@ -133,6 +133,7 @@ impl Mad {
|
||||
let mut channels = Vec::new();
|
||||
let cancellation_token = CancellationToken::new();
|
||||
let job_cancellation = CancellationToken::new();
|
||||
let job_done = CancellationToken::new();
|
||||
|
||||
for comp in &self.components {
|
||||
let comp = comp.clone();
|
||||
@@ -154,16 +155,43 @@ impl Mad {
|
||||
res = comp.run(job_cancellation) => {
|
||||
error_tx.send(CompletionResult { res , name }).await
|
||||
}
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
error_tx.send(CompletionResult { res: Ok(()) , name }).await
|
||||
}
|
||||
_ = signal_unix_terminate() => {
|
||||
error_tx.send(CompletionResult { res: Ok(()) , name }).await
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
tokio::spawn({
|
||||
let cancellation_token = cancellation_token.child_token();
|
||||
let job_done = job_done.child_token();
|
||||
|
||||
let wait_cancel = self.should_cancel;
|
||||
|
||||
async move {
|
||||
let should_cancel =
|
||||
|cancel: CancellationToken, wait: Option<std::time::Duration>| async move {
|
||||
if let Some(cancel_wait) = wait {
|
||||
tokio::time::sleep(cancel_wait).await;
|
||||
|
||||
cancel.cancel();
|
||||
}
|
||||
};
|
||||
|
||||
tokio::select! {
|
||||
_ = cancellation_token.cancelled() => {
|
||||
job_cancellation.cancel();
|
||||
}
|
||||
_ = job_done.cancelled() => {
|
||||
should_cancel(job_cancellation, wait_cancel).await;
|
||||
}
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
should_cancel(job_cancellation, wait_cancel).await;
|
||||
}
|
||||
_ = signal_unix_terminate() => {
|
||||
should_cancel(job_cancellation, wait_cancel).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut futures = FuturesUnordered::new();
|
||||
for channel in channels.iter_mut() {
|
||||
futures.push(channel.recv());
|
||||
@@ -183,12 +211,7 @@ impl Mad {
|
||||
}
|
||||
}
|
||||
|
||||
job_cancellation.cancel();
|
||||
if let Some(cancel_wait) = self.should_cancel {
|
||||
tokio::time::sleep(cancel_wait).await;
|
||||
|
||||
cancellation_token.cancel();
|
||||
}
|
||||
job_done.cancel();
|
||||
}
|
||||
|
||||
tracing::debug!("ran components");
|
||||
|
@@ -1,6 +1,9 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use notmad::{Component, Mad};
|
||||
use rand::Rng;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing_test::traced_test;
|
||||
|
||||
@@ -86,3 +89,51 @@ async fn test_can_run_components() -> anyhow::Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn test_can_shutdown_gracefully() -> anyhow::Result<()> {
|
||||
let check = Arc::new(Mutex::new(None));
|
||||
|
||||
Mad::builder()
|
||||
.add_fn({
|
||||
let check = check.clone();
|
||||
|
||||
move |cancel| {
|
||||
let check = check.clone();
|
||||
|
||||
async move {
|
||||
let start = std::time::SystemTime::now();
|
||||
tracing::info!("waiting for cancel");
|
||||
cancel.cancelled().await;
|
||||
tracing::info!("submitting check");
|
||||
let mut check = check.lock().await;
|
||||
let elapsed = start.elapsed().expect("to be able to get elapsed");
|
||||
*check = Some(elapsed);
|
||||
tracing::info!("check submitted");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
})
|
||||
.add_fn(|_| async move {
|
||||
tracing::info!("starting sleep");
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
tracing::info!("sleep ended");
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.run()
|
||||
.await?;
|
||||
|
||||
let check = check
|
||||
.lock()
|
||||
.await
|
||||
.expect("to be able to get a duration from cancel");
|
||||
|
||||
// We default wait 100 ms for graceful shutdown, and we explicitly wait 100ms in the sleep routine
|
||||
tracing::info!("check millis: {}", check.as_millis());
|
||||
assert!(check.as_millis() < 250);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
3
renovate.json
Normal file
3
renovate.json
Normal file
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
|
||||
}
|
Reference in New Issue
Block a user