@@ -147,12 +147,12 @@ impl<T: Operator> Reconciler<T> {
|
|||||||
async fn enqueue_all_manifests(&self) -> anyhow::Result<()> {
|
async fn enqueue_all_manifests(&self) -> anyhow::Result<()> {
|
||||||
for manifest_state in self.store.get_owned_and_potential_leases().await? {
|
for manifest_state in self.store.get_owned_and_potential_leases().await? {
|
||||||
// Only enqueue manifests we own
|
// Only enqueue manifests we own
|
||||||
if let Some(lease) = &manifest_state.lease {
|
if let Some(lease) = &manifest_state.lease
|
||||||
if lease.owner == self.worker_id {
|
&& lease.owner == self.worker_id
|
||||||
self.reconcile_queue
|
{
|
||||||
.enqueue(manifest_state.manifest.name.clone())
|
self.reconcile_queue
|
||||||
.await;
|
.enqueue(manifest_state.manifest.name.clone())
|
||||||
}
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -167,11 +167,13 @@ impl<T: Specification> ReconcileQueue<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
/// Returns the number of pending jobs in the queue.
|
/// Returns the number of pending jobs in the queue.
|
||||||
pub async fn len(&self) -> usize {
|
pub async fn len(&self) -> usize {
|
||||||
self.inner.lock().await.queue.len()
|
self.inner.lock().await.queue.len()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
/// Returns true if the queue is empty.
|
/// Returns true if the queue is empty.
|
||||||
pub async fn is_empty(&self) -> bool {
|
pub async fn is_empty(&self) -> bool {
|
||||||
self.inner.lock().await.queue.is_empty()
|
self.inner.lock().await.queue.is_empty()
|
||||||
|
|||||||
Reference in New Issue
Block a user