From 681f1f3932d84269de71af12076a970840201591 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Mon, 12 Jan 2026 12:46:39 +0100 Subject: [PATCH] feat: add owned manifest for consideration --- crates/nocontrol/src/control_plane/backing_store.rs | 6 +++++- crates/nocontrol/src/control_plane/reconciler.rs | 12 ++++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/crates/nocontrol/src/control_plane/backing_store.rs b/crates/nocontrol/src/control_plane/backing_store.rs index a1f034f..7a905cf 100644 --- a/crates/nocontrol/src/control_plane/backing_store.rs +++ b/crates/nocontrol/src/control_plane/backing_store.rs @@ -24,7 +24,10 @@ impl BackingStore { } } - pub async fn get_owned_and_potential_leases(&self) -> anyhow::Result>> { + pub async fn get_owned_and_potential_leases( + &self, + worker_id: &WorkerId, + ) -> anyhow::Result>> { let now = jiff::Timestamp::now().checked_sub(1.second())?; let manifests = self .manifests @@ -33,6 +36,7 @@ impl BackingStore { .iter() .filter(|m| match &m.lease { Some(lease) if lease.last_seen < now => true, + Some(lease) if &lease.owner == worker_id => true, Some(_lease) => false, None => true, }) diff --git a/crates/nocontrol/src/control_plane/reconciler.rs b/crates/nocontrol/src/control_plane/reconciler.rs index d071c7f..f77ce0c 100644 --- a/crates/nocontrol/src/control_plane/reconciler.rs +++ b/crates/nocontrol/src/control_plane/reconciler.rs @@ -81,7 +81,11 @@ impl Reconciler { /// Single sync iteration - check for manifests, acquire leases, enqueue work. async fn sync_once(&self) -> anyhow::Result<()> { - for manifest_state in self.store.get_owned_and_potential_leases().await? { + for manifest_state in self + .store + .get_owned_and_potential_leases(&self.worker_id) + .await? + { let manifest_name = manifest_state.manifest.name.clone(); match &manifest_state.lease { @@ -145,7 +149,11 @@ impl Reconciler { /// Enqueue all manifests that we own for reconciliation. async fn enqueue_all_manifests(&self) -> anyhow::Result<()> { - for manifest_state in self.store.get_owned_and_potential_leases().await? { + for manifest_state in self + .store + .get_owned_and_potential_leases(&self.worker_id) + .await? + { // Only enqueue manifests we own if let Some(lease) = &manifest_state.lease && lease.owner == self.worker_id