feat: add owned manifest for consideration
This commit is contained in:
@@ -24,7 +24,10 @@ impl<T: Specification> BackingStore<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_owned_and_potential_leases(&self) -> anyhow::Result<Vec<ManifestState<T>>> {
|
pub async fn get_owned_and_potential_leases(
|
||||||
|
&self,
|
||||||
|
worker_id: &WorkerId,
|
||||||
|
) -> anyhow::Result<Vec<ManifestState<T>>> {
|
||||||
let now = jiff::Timestamp::now().checked_sub(1.second())?;
|
let now = jiff::Timestamp::now().checked_sub(1.second())?;
|
||||||
let manifests = self
|
let manifests = self
|
||||||
.manifests
|
.manifests
|
||||||
@@ -33,6 +36,7 @@ impl<T: Specification> BackingStore<T> {
|
|||||||
.iter()
|
.iter()
|
||||||
.filter(|m| match &m.lease {
|
.filter(|m| match &m.lease {
|
||||||
Some(lease) if lease.last_seen < now => true,
|
Some(lease) if lease.last_seen < now => true,
|
||||||
|
Some(lease) if &lease.owner == worker_id => true,
|
||||||
Some(_lease) => false,
|
Some(_lease) => false,
|
||||||
None => true,
|
None => true,
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -81,7 +81,11 @@ impl<T: Operator> Reconciler<T> {
|
|||||||
|
|
||||||
/// Single sync iteration - check for manifests, acquire leases, enqueue work.
|
/// Single sync iteration - check for manifests, acquire leases, enqueue work.
|
||||||
async fn sync_once(&self) -> anyhow::Result<()> {
|
async fn sync_once(&self) -> anyhow::Result<()> {
|
||||||
for manifest_state in self.store.get_owned_and_potential_leases().await? {
|
for manifest_state in self
|
||||||
|
.store
|
||||||
|
.get_owned_and_potential_leases(&self.worker_id)
|
||||||
|
.await?
|
||||||
|
{
|
||||||
let manifest_name = manifest_state.manifest.name.clone();
|
let manifest_name = manifest_state.manifest.name.clone();
|
||||||
|
|
||||||
match &manifest_state.lease {
|
match &manifest_state.lease {
|
||||||
@@ -145,7 +149,11 @@ impl<T: Operator> Reconciler<T> {
|
|||||||
|
|
||||||
/// Enqueue all manifests that we own for reconciliation.
|
/// Enqueue all manifests that we own for reconciliation.
|
||||||
async fn enqueue_all_manifests(&self) -> anyhow::Result<()> {
|
async fn enqueue_all_manifests(&self) -> anyhow::Result<()> {
|
||||||
for manifest_state in self.store.get_owned_and_potential_leases().await? {
|
for manifest_state in self
|
||||||
|
.store
|
||||||
|
.get_owned_and_potential_leases(&self.worker_id)
|
||||||
|
.await?
|
||||||
|
{
|
||||||
// Only enqueue manifests we own
|
// Only enqueue manifests we own
|
||||||
if let Some(lease) = &manifest_state.lease
|
if let Some(lease) = &manifest_state.lease
|
||||||
&& lease.owner == self.worker_id
|
&& lease.owner == self.worker_id
|
||||||
|
|||||||
Reference in New Issue
Block a user