cuddle-please 2622b7ae21
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
chore(release): 0.4.0
2025-08-07 09:23:41 +00:00
2025-05-26 13:17:08 +02:00
2024-08-01 21:53:58 +02:00
2024-08-01 21:53:58 +02:00
2025-08-07 11:21:55 +02:00
2025-08-07 09:23:41 +00:00
2025-08-07 09:23:41 +00:00
2025-08-07 11:21:55 +02:00
2025-08-07 11:21:55 +02:00
2024-12-13 23:31:58 +01:00

nodrift

A lightweight Rust library for scheduling recurring jobs with automatic drift correction. When a job takes longer than expected, nodrift ensures the next execution adjusts accordingly to maintain consistent intervals.

Features

  • Drift correction: Automatically adjusts scheduling when jobs run longer than the interval
  • Async/await support: Built on Tokio for efficient async job execution
  • Cancellation support: Gracefully stop scheduled jobs using cancellation tokens
  • Custom job types: Implement the Drifter trait for complex scheduling needs
  • Error handling: Jobs can fail without crashing the scheduler
  • Detailed logging: Built-in tracing support for debugging

Installation

Add this to your Cargo.toml:

[dependencies]
nodrift = "0.3.1"
tokio = { version = "1", features = ["full"] }

Quick Start

Simple Function Scheduling

The easiest way to schedule a recurring job is using the schedule function:

use nodrift::{schedule, DriftError};
use std::time::Duration;

#[tokio::main]
async fn main() {
    // Schedule a job to run every 5 seconds
    let cancellation_token = schedule(Duration::from_secs(5), || async {
        println!("Job executing!");
        
        // Your job logic here
        do_work().await?;
        
        Ok(())
    });

    // Let it run for a while
    tokio::time::sleep(Duration::from_secs(30)).await;
    
    // Cancel the job when done
    cancellation_token.cancel();
}

async fn do_work() -> Result<(), DriftError> {
    // Simulate some work
    tokio::time::sleep(Duration::from_millis(100)).await;
    Ok(())
}

Custom Drifter Implementation

For more complex scheduling needs, implement the Drifter trait:

use nodrift::{Drifter, schedule_drifter, DriftError};
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
use std::time::Duration;

#[derive(Clone)]
struct MyJob {
    name: String,
}

#[async_trait]
impl Drifter for MyJob {
    async fn execute(&self, token: CancellationToken) -> anyhow::Result<()> {
        println!("Executing job: {}", self.name);
        
        // Check for cancellation during long-running operations
        if token.is_cancelled() {
            return Ok(());
        }
        
        // Your job logic here
        tokio::time::sleep(Duration::from_millis(100)).await;
        
        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let job = MyJob {
        name: "data-sync".to_string(),
    };
    
    // Schedule the custom job
    let token = schedule_drifter(Duration::from_secs(10), job);
    
    // Run for a minute
    tokio::time::sleep(Duration::from_secs(60)).await;
    
    // Stop the job
    token.cancel();
}

How Drift Correction Works

nodrift automatically handles timing drift that occurs when jobs take longer than expected:

  1. If a job is scheduled to run every 5 seconds but takes 3 seconds to complete, the next run will start 2 seconds after completion
  2. If a job takes 7 seconds to complete (longer than the 5-second interval), the next run starts immediately after completion
  3. This ensures your jobs maintain their intended frequency without overlapping executions

Error Handling

Jobs that return errors will stop the scheduling routine:

use nodrift::{schedule, DriftError};
use std::time::Duration;

let token = schedule(Duration::from_secs(1), || async {
    // This will stop the scheduler after the first execution
    Err(DriftError::JobError(anyhow::anyhow!("Something went wrong")))
});

Logging

nodrift uses the tracing crate for logging. Enable logging to see job execution details:

use tracing_subscriber;

fn main() {
    // Initialize tracing
    tracing_subscriber::fmt::init();
    
    // Your application code
}

Log output includes:

  • Job start times
  • Execution duration
  • Wait time until next run
  • Next scheduled run time
  • Any errors that occur

Examples

Database Cleanup Job

use nodrift::{schedule, DriftError};
use std::time::Duration;

async fn cleanup_old_records() -> Result<(), DriftError> {
    // Connect to database and clean up old records
    println!("Cleaning up old records...");
    tokio::time::sleep(Duration::from_secs(1)).await;
    Ok(())
}

#[tokio::main]
async fn main() {
    // Run cleanup every hour
    let token = schedule(Duration::from_secs(3600), || async {
        cleanup_old_records().await
    });
    
    // Keep the application running
    tokio::signal::ctrl_c().await.unwrap();
    
    // Gracefully shutdown
    token.cancel();
}

Metrics Collection

use nodrift::{Drifter, schedule_drifter};
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};

#[derive(Clone)]
struct MetricsCollector {
    counter: Arc<AtomicU64>,
}

#[async_trait]
impl Drifter for MetricsCollector {
    async fn execute(&self, _token: CancellationToken) -> anyhow::Result<()> {
        // Collect metrics
        let value = self.counter.fetch_add(1, Ordering::Relaxed);
        println!("Collected metric: {}", value);
        
        // Send to monitoring service
        // send_to_monitoring(value).await?;
        
        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let collector = MetricsCollector {
        counter: Arc::new(AtomicU64::new(0)),
    };
    
    // Collect metrics every 30 seconds
    let token = schedule_drifter(
        Duration::from_secs(30),
        collector
    );
    
    // Run indefinitely
    tokio::signal::ctrl_c().await.unwrap();
    token.cancel();
}

API Reference

Functions

schedule<F, Fut>(interval: Duration, func: F) -> CancellationToken

Schedules a function to run at the specified interval.

  • interval: Time between job executions
  • func: Async function that returns Result<(), DriftError>
  • Returns: CancellationToken to stop the scheduled job

schedule_drifter<FDrifter>(interval: Duration, drifter: FDrifter) -> CancellationToken

Schedules a custom Drifter implementation.

  • interval: Time between job executions
  • drifter: Implementation of the Drifter trait
  • Returns: CancellationToken to stop the scheduled job

Traits

Drifter

#[async_trait]
pub trait Drifter {
    async fn execute(&self, token: CancellationToken) -> anyhow::Result<()>;
}

Implement this trait for custom job types that need access to state or complex initialization.

Types

DriftError

Error type for job execution failures:

pub enum DriftError {
    JobError(#[source] anyhow::Error),
}

Requirements

  • Rust 1.75 or later
  • Tokio runtime

License

This project is licensed under the MIT License.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Support

For issues and questions, please file an issue on the GitHub repository.

Description
No description provided
Readme 252 KiB
v0.3.5 Latest
2025-05-26 13:18:49 +02:00
Languages
Rust 100%