From dfe81d979e20eba3624fe84aead53b282defec69 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Tue, 6 Jan 2026 22:13:49 +0100 Subject: [PATCH] feat: add global retry Signed-off-by: kjuulh --- .../nocontrol/src/control_plane/reconciler.rs | 48 ++++++++++++++++++- crates/nocontrol/src/operator_state.rs | 4 ++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/crates/nocontrol/src/control_plane/reconciler.rs b/crates/nocontrol/src/control_plane/reconciler.rs index 15df271..640ea69 100644 --- a/crates/nocontrol/src/control_plane/reconciler.rs +++ b/crates/nocontrol/src/control_plane/reconciler.rs @@ -31,9 +31,10 @@ impl Reconciler { } } - /// Run the reconciler. This starts two concurrent tasks: + /// Run the reconciler. This starts three concurrent tasks: /// 1. A sync task that periodically checks for new/changed manifests and enqueues them /// 2. A worker task that processes the queue and runs reconciliations + /// 3. A periodic resync task that enqueues all manifests at a configurable interval pub async fn run(&self, cancellation_token: &CancellationToken) -> anyhow::Result<()> { let now = Timestamp::now(); tracing::debug!(%self.worker_id, %now, "starting reconciler"); @@ -45,6 +46,9 @@ impl Reconciler { result = self.process_queue(cancellation_token) => { result.context("process queue task failed")?; } + result = self.periodic_resync(cancellation_token) => { + result.context("periodic resync task failed")?; + } _ = cancellation_token.cancelled() => { tracing::debug!("reconciler received cancellation"); } @@ -113,6 +117,48 @@ impl Reconciler { Ok(()) } + /// Periodically enqueue all manifests for reconciliation. + /// This ensures manifests are reconciled at least every resync_interval. + async fn periodic_resync(&self, cancellation_token: &CancellationToken) -> anyhow::Result<()> { + let resync_interval = self.operator.config().resync_interval; + tracing::debug!(%self.worker_id, ?resync_interval, "starting periodic resync"); + + loop { + tokio::select! { + _ = tokio::time::sleep(resync_interval) => {} + _ = cancellation_token.cancelled() => break, + } + + if cancellation_token.is_cancelled() { + break; + } + + tracing::debug!(%self.worker_id, "periodic resync: enqueuing all manifests"); + + if let Err(e) = self.enqueue_all_manifests().await { + tracing::warn!(error = %e, "failed to enqueue all manifests for resync"); + } + } + + Ok(()) + } + + /// Enqueue all manifests that we own for reconciliation. + async fn enqueue_all_manifests(&self) -> anyhow::Result<()> { + for manifest_state in self.store.get_owned_and_potential_leases().await? { + // Only enqueue manifests we own + if let Some(lease) = &manifest_state.lease { + if lease.owner == self.worker_id { + self.reconcile_queue + .enqueue(manifest_state.manifest.name.clone()) + .await; + } + } + } + + Ok(()) + } + /// Check if a manifest needs reconciliation. fn needs_reconciliation(&self, manifest: &ManifestState) -> bool { // Has unhandled changes diff --git a/crates/nocontrol/src/operator_state.rs b/crates/nocontrol/src/operator_state.rs index ed467cf..c970543 100644 --- a/crates/nocontrol/src/operator_state.rs +++ b/crates/nocontrol/src/operator_state.rs @@ -57,6 +57,9 @@ impl Operator for OperatorState { pub struct OperatorConfig { pub backoff_policy: BackoffPolicy, pub reconcile_on: Option>, + /// Interval at which all manifests are re-enqueued for reconciliation. + /// Default is 5 minutes. + pub resync_interval: Duration, } impl Default for OperatorConfig { @@ -66,6 +69,7 @@ impl Default for OperatorConfig { delay: Duration::from_secs(30), }, reconcile_on: Default::default(), + resync_interval: Duration::from_secs(5 * 60), } } }