nodrift
A lightweight Rust library for scheduling recurring jobs with automatic drift correction. When a job takes longer than expected, nodrift ensures the next execution adjusts accordingly to maintain consistent intervals.
Features
- Drift correction: Automatically adjusts scheduling when jobs run longer than the interval
- Async/await support: Built on Tokio for efficient async job execution
- Cancellation support: Gracefully stop scheduled jobs using cancellation tokens
- Custom job types: Implement the
Drifter
trait for complex scheduling needs - Error handling: Jobs can fail without crashing the scheduler
- Detailed logging: Built-in tracing support for debugging
Installation
Add this to your Cargo.toml
:
[dependencies]
nodrift = "0.3.1"
tokio = { version = "1", features = ["full"] }
Quick Start
Simple Function Scheduling
The easiest way to schedule a recurring job is using the schedule
function:
use nodrift::{schedule, DriftError};
use std::time::Duration;
#[tokio::main]
async fn main() {
// Schedule a job to run every 5 seconds
let cancellation_token = schedule(Duration::from_secs(5), || async {
println!("Job executing!");
// Your job logic here
do_work().await?;
Ok(())
});
// Let it run for a while
tokio::time::sleep(Duration::from_secs(30)).await;
// Cancel the job when done
cancellation_token.cancel();
}
async fn do_work() -> Result<(), DriftError> {
// Simulate some work
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
Custom Drifter Implementation
For more complex scheduling needs, implement the Drifter
trait:
use nodrift::{Drifter, schedule_drifter, DriftError};
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
use std::time::Duration;
#[derive(Clone)]
struct MyJob {
name: String,
}
#[async_trait]
impl Drifter for MyJob {
async fn execute(&self, token: CancellationToken) -> anyhow::Result<()> {
println!("Executing job: {}", self.name);
// Check for cancellation during long-running operations
if token.is_cancelled() {
return Ok(());
}
// Your job logic here
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
}
#[tokio::main]
async fn main() {
let job = MyJob {
name: "data-sync".to_string(),
};
// Schedule the custom job
let token = schedule_drifter(Duration::from_secs(10), job);
// Run for a minute
tokio::time::sleep(Duration::from_secs(60)).await;
// Stop the job
token.cancel();
}
How Drift Correction Works
nodrift automatically handles timing drift that occurs when jobs take longer than expected:
- If a job is scheduled to run every 5 seconds but takes 3 seconds to complete, the next run will start 2 seconds after completion
- If a job takes 7 seconds to complete (longer than the 5-second interval), the next run starts immediately after completion
- This ensures your jobs maintain their intended frequency without overlapping executions
Error Handling
Jobs that return errors will stop the scheduling routine:
use nodrift::{schedule, DriftError};
use std::time::Duration;
let token = schedule(Duration::from_secs(1), || async {
// This will stop the scheduler after the first execution
Err(DriftError::JobError(anyhow::anyhow!("Something went wrong")))
});
Logging
nodrift uses the tracing
crate for logging. Enable logging to see job execution details:
use tracing_subscriber;
fn main() {
// Initialize tracing
tracing_subscriber::fmt::init();
// Your application code
}
Log output includes:
- Job start times
- Execution duration
- Wait time until next run
- Next scheduled run time
- Any errors that occur
Examples
Database Cleanup Job
use nodrift::{schedule, DriftError};
use std::time::Duration;
async fn cleanup_old_records() -> Result<(), DriftError> {
// Connect to database and clean up old records
println!("Cleaning up old records...");
tokio::time::sleep(Duration::from_secs(1)).await;
Ok(())
}
#[tokio::main]
async fn main() {
// Run cleanup every hour
let token = schedule(Duration::from_secs(3600), || async {
cleanup_old_records().await
});
// Keep the application running
tokio::signal::ctrl_c().await.unwrap();
// Gracefully shutdown
token.cancel();
}
Metrics Collection
use nodrift::{Drifter, schedule_drifter};
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Clone)]
struct MetricsCollector {
counter: Arc<AtomicU64>,
}
#[async_trait]
impl Drifter for MetricsCollector {
async fn execute(&self, _token: CancellationToken) -> anyhow::Result<()> {
// Collect metrics
let value = self.counter.fetch_add(1, Ordering::Relaxed);
println!("Collected metric: {}", value);
// Send to monitoring service
// send_to_monitoring(value).await?;
Ok(())
}
}
#[tokio::main]
async fn main() {
let collector = MetricsCollector {
counter: Arc::new(AtomicU64::new(0)),
};
// Collect metrics every 30 seconds
let token = schedule_drifter(
Duration::from_secs(30),
collector
);
// Run indefinitely
tokio::signal::ctrl_c().await.unwrap();
token.cancel();
}
API Reference
Functions
schedule<F, Fut>(interval: Duration, func: F) -> CancellationToken
Schedules a function to run at the specified interval.
interval
: Time between job executionsfunc
: Async function that returnsResult<(), DriftError>
- Returns:
CancellationToken
to stop the scheduled job
schedule_drifter<FDrifter>(interval: Duration, drifter: FDrifter) -> CancellationToken
Schedules a custom Drifter
implementation.
interval
: Time between job executionsdrifter
: Implementation of theDrifter
trait- Returns:
CancellationToken
to stop the scheduled job
Traits
Drifter
#[async_trait]
pub trait Drifter {
async fn execute(&self, token: CancellationToken) -> anyhow::Result<()>;
}
Implement this trait for custom job types that need access to state or complex initialization.
Types
DriftError
Error type for job execution failures:
pub enum DriftError {
JobError(#[source] anyhow::Error),
}
Requirements
- Rust 1.75 or later
- Tokio runtime
License
This project is licensed under the MIT License.
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Support
For issues and questions, please file an issue on the GitHub repository.