@@ -31,9 +31,10 @@ impl<T: Operator> Reconciler<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the reconciler. This starts two concurrent tasks:
|
||||
/// Run the reconciler. This starts three concurrent tasks:
|
||||
/// 1. A sync task that periodically checks for new/changed manifests and enqueues them
|
||||
/// 2. A worker task that processes the queue and runs reconciliations
|
||||
/// 3. A periodic resync task that enqueues all manifests at a configurable interval
|
||||
pub async fn run(&self, cancellation_token: &CancellationToken) -> anyhow::Result<()> {
|
||||
let now = Timestamp::now();
|
||||
tracing::debug!(%self.worker_id, %now, "starting reconciler");
|
||||
@@ -45,6 +46,9 @@ impl<T: Operator> Reconciler<T> {
|
||||
result = self.process_queue(cancellation_token) => {
|
||||
result.context("process queue task failed")?;
|
||||
}
|
||||
result = self.periodic_resync(cancellation_token) => {
|
||||
result.context("periodic resync task failed")?;
|
||||
}
|
||||
_ = cancellation_token.cancelled() => {
|
||||
tracing::debug!("reconciler received cancellation");
|
||||
}
|
||||
@@ -113,6 +117,48 @@ impl<T: Operator> Reconciler<T> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Periodically enqueue all manifests for reconciliation.
|
||||
/// This ensures manifests are reconciled at least every resync_interval.
|
||||
async fn periodic_resync(&self, cancellation_token: &CancellationToken) -> anyhow::Result<()> {
|
||||
let resync_interval = self.operator.config().resync_interval;
|
||||
tracing::debug!(%self.worker_id, ?resync_interval, "starting periodic resync");
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(resync_interval) => {}
|
||||
_ = cancellation_token.cancelled() => break,
|
||||
}
|
||||
|
||||
if cancellation_token.is_cancelled() {
|
||||
break;
|
||||
}
|
||||
|
||||
tracing::debug!(%self.worker_id, "periodic resync: enqueuing all manifests");
|
||||
|
||||
if let Err(e) = self.enqueue_all_manifests().await {
|
||||
tracing::warn!(error = %e, "failed to enqueue all manifests for resync");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Enqueue all manifests that we own for reconciliation.
|
||||
async fn enqueue_all_manifests(&self) -> anyhow::Result<()> {
|
||||
for manifest_state in self.store.get_owned_and_potential_leases().await? {
|
||||
// Only enqueue manifests we own
|
||||
if let Some(lease) = &manifest_state.lease {
|
||||
if lease.owner == self.worker_id {
|
||||
self.reconcile_queue
|
||||
.enqueue(manifest_state.manifest.name.clone())
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if a manifest needs reconciliation.
|
||||
fn needs_reconciliation(&self, manifest: &ManifestState<T::Specifications>) -> bool {
|
||||
// Has unhandled changes
|
||||
|
||||
@@ -57,6 +57,9 @@ impl<T: Operator> Operator for OperatorState<T> {
|
||||
pub struct OperatorConfig {
|
||||
pub backoff_policy: BackoffPolicy,
|
||||
pub reconcile_on: Option<tokio::sync::mpsc::Receiver<()>>,
|
||||
/// Interval at which all manifests are re-enqueued for reconciliation.
|
||||
/// Default is 5 minutes.
|
||||
pub resync_interval: Duration,
|
||||
}
|
||||
|
||||
impl Default for OperatorConfig {
|
||||
@@ -66,6 +69,7 @@ impl Default for OperatorConfig {
|
||||
delay: Duration::from_secs(30),
|
||||
},
|
||||
reconcile_on: Default::default(),
|
||||
resync_interval: Duration::from_secs(5 * 60),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user