feat: add cron
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2025-03-27 14:33:56 +01:00
parent 9c39c0daa2
commit af10f48d8d
3 changed files with 155 additions and 4 deletions

View File

@@ -9,6 +9,7 @@ edition = "2021"
anyhow.workspace = true
async-trait = "0.1.81"
chrono = "0.4.40"
cron = "0.15.0"
thiserror = "2.0.0"
tokio.workspace = true
tokio-util = "0.7.11"

View File

@@ -1,8 +1,7 @@
use std::{sync::Arc, time::Duration};
use std::{str::FromStr, sync::Arc, time::Duration};
use anyhow::Context;
use async_trait::async_trait;
use chrono::{DateTime, Local, TimeDelta};
use chrono::{DateTime, Local, TimeDelta, Utc};
use std::future::Future;
use tokio::time;
use tokio_util::sync::CancellationToken;
@@ -23,6 +22,86 @@ where
schedule_drifter(interval, drifter)
}
pub fn schedule_cron<F, Fut>(cron: &str, func: F) -> anyhow::Result<CancellationToken>
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), DriftError>> + Send + 'static,
{
let drifter = FuncDrifter::new(func);
schedule_drifter_cron(cron, drifter)
}
pub fn schedule_drifter_cron<FDrifter>(
cron: &str,
drifter: FDrifter,
) -> anyhow::Result<CancellationToken>
where
FDrifter: Drifter + Send + 'static,
FDrifter: Clone,
{
let schedule = ::cron::Schedule::from_str(cron)?;
let cancellation_token = CancellationToken::new();
tokio::spawn({
let cancellation_token = cancellation_token.clone();
let drifter = drifter.clone();
async move {
let upcoming = schedule.upcoming(Utc {});
let child_token = cancellation_token.child_token();
for datetime in upcoming {
let now = Utc::now();
let diff = datetime - now;
if diff <= TimeDelta::zero() {
tracing::info!(
"job schedule for {} was in the past: {}, skipping iteration",
datetime.to_string(),
now.to_string()
);
continue;
}
let diff = diff.to_std().expect("to be able to get diff time");
let sleep = time::sleep(diff);
tokio::pin!(sleep);
tracing::debug!(
"schedule job: {}, waiting: {}s for execution",
datetime.to_string(),
diff.as_secs()
);
tokio::select! {
_ = cancellation_token.cancelled() => {
tracing::trace!("stopping drift job");
break
}
_ = &mut sleep => {
let start = std::time::Instant::now();
tracing::debug!("running job");
if let Err(e) = drifter.execute(child_token.child_token()).await {
tracing::error!("drift job failed with error: {}", e);
continue
}
let elapsed = start.elapsed();
tracing::debug!("job took: {}ms ", elapsed.as_millis());
}
}
}
}
});
Ok(cancellation_token)
}
pub fn schedule_drifter<FDrifter>(interval: Duration, drifter: FDrifter) -> CancellationToken
where
FDrifter: Drifter + Send + 'static,
@@ -231,4 +310,54 @@ mod tests {
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_cron() -> anyhow::Result<()> {
let token = schedule_cron("* * * * * *", || async {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
Ok(())
})?;
tokio::time::sleep(Duration::from_secs(5)).await;
assert!(!token.is_cancelled());
assert!(logs_contain("running job"));
assert!(logs_contain("job took:"));
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_cron_no_wait() -> anyhow::Result<()> {
let token = schedule_cron("* * * * * *", || async { Ok(()) })?;
tokio::time::sleep(Duration::from_secs(5)).await;
assert!(!token.is_cancelled());
assert!(logs_contain("running job"));
assert!(logs_contain("job took:"));
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_cron_job_taking_longer_than_cycle() -> anyhow::Result<()> {
let token = schedule_cron("* * * * * *", || async {
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
Ok(())
})?;
tokio::time::sleep(Duration::from_secs(5)).await;
assert!(!token.is_cancelled());
Ok(())
}
}