Compare commits
1 Commits
v0.3.3
...
69c521c1b4
Author | SHA1 | Date | |
---|---|---|---|
|
69c521c1b4 |
11
CHANGELOG.md
11
CHANGELOG.md
@@ -6,17 +6,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
## [0.3.3] - 2025-03-27
|
|
||||||
|
|
||||||
### Added
|
|
||||||
- don't stop on error
|
|
||||||
- redo main part
|
|
||||||
|
|
||||||
## [0.3.2] - 2025-03-27
|
|
||||||
|
|
||||||
### Fixed
|
|
||||||
- time formatting
|
|
||||||
|
|
||||||
## [0.3.1] - 2025-03-27
|
## [0.3.1] - 2025-03-27
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -249,7 +249,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "nodrift"
|
name = "nodrift"
|
||||||
version = "0.3.2"
|
version = "0.3.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
@@ -3,7 +3,7 @@ members = ["crates/*"]
|
|||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
version = "0.3.3"
|
version = "0.3.1"
|
||||||
|
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
drift = { path = "crates/drift" }
|
drift = { path = "crates/drift" }
|
||||||
|
@@ -1,8 +1,7 @@
|
|||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
|
|
||||||
use anyhow::Context;
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use chrono::{DateTime, Local, TimeDelta};
|
use chrono::{DateTime, Local};
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use tokio::time;
|
use tokio::time;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
@@ -35,7 +34,22 @@ where
|
|||||||
let drifter = drifter.clone();
|
let drifter = drifter.clone();
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
let mut wait = Duration::default();
|
let start = std::time::Instant::now();
|
||||||
|
|
||||||
|
tracing::debug!("running job");
|
||||||
|
let child_token = cancellation_token.child_token();
|
||||||
|
if let Err(e) = drifter.execute(child_token).await {
|
||||||
|
tracing::error!("drift job failed with error: {}, stopping routine", e);
|
||||||
|
cancellation_token.cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
let elapsed = start.elapsed();
|
||||||
|
let mut wait = interval.saturating_sub(elapsed);
|
||||||
|
tracing::debug!(
|
||||||
|
"job took: {}ms, waiting: {}ms for next run",
|
||||||
|
elapsed.as_millis(),
|
||||||
|
wait.as_millis()
|
||||||
|
);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let child_token = cancellation_token.child_token();
|
let child_token = cancellation_token.child_token();
|
||||||
@@ -53,7 +67,8 @@ where
|
|||||||
|
|
||||||
tracing::debug!("running job");
|
tracing::debug!("running job");
|
||||||
if let Err(e) = drifter.execute(child_token).await {
|
if let Err(e) = drifter.execute(child_token).await {
|
||||||
tracing::error!("drift job failed with error: {}", e);
|
tracing::error!("drift job failed with error: {}, stopping routine", e);
|
||||||
|
cancellation_token.cancel();
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -61,9 +76,9 @@ where
|
|||||||
wait = interval.saturating_sub(elapsed);
|
wait = interval.saturating_sub(elapsed);
|
||||||
|
|
||||||
let now: DateTime<Local> = Local::now();
|
let now: DateTime<Local> = Local::now();
|
||||||
let next: Option<DateTime<Local>> = now.checked_add_signed(TimeDelta::from_std(wait).expect("to be able to convert duration into time delta"));
|
let next: Option<DateTime<Local>> = std::time::SystemTime::now().checked_add(wait).map(|next| next.into());
|
||||||
|
|
||||||
tracing::debug!(now=now.to_string(), next=next.map(|n| n.to_string()), "job took: {}ms, waiting: {}ms for next run", elapsed.as_millis(), wait.as_millis() );
|
tracing::debug!(?now, ?next, "job took: {}ms, waiting: {}ms for next run", elapsed.as_millis(), wait.as_millis() );
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -212,23 +227,6 @@ mod tests {
|
|||||||
assert!(logs_contain("running job"));
|
assert!(logs_contain("running job"));
|
||||||
assert!(logs_contain("job took:"));
|
assert!(logs_contain("job took:"));
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
#[tokio::test]
|
|
||||||
#[traced_test]
|
|
||||||
async fn test_calls_trace_on_start_and_end_long() -> anyhow::Result<()> {
|
|
||||||
let token = schedule(Duration::from_millis(100), || async {
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
});
|
|
||||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
|
||||||
|
|
||||||
assert!(!token.is_cancelled());
|
|
||||||
|
|
||||||
assert!(logs_contain("running job"));
|
|
||||||
assert!(logs_contain("job took:"));
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user