feat: add weight

This commit is contained in:
2026-03-06 12:54:41 +01:00
parent 0fa906a8cf
commit 5e1cd2b1e7
11 changed files with 680 additions and 39 deletions

View File

@@ -323,6 +323,7 @@ async fn run_app<
) -> anyhow::Result<()>
where
TOperator::Specifications: Send + Sync,
<B as Backend>::Error: Send + Sync + 'static,
{
// Spawn refresh task
let app_clone = app.clone();

View File

@@ -6,6 +6,7 @@ use crate::{
Specification,
control_plane::backing_store::in_process::BackingStoreInProcess,
manifests::{Manifest, ManifestState, WorkerId},
operator_state::ClusterStats,
};
pub mod in_process;
@@ -96,4 +97,14 @@ pub trait BackingStoreEdge<T: Specification>: Send + Sync + Clone {
&self,
manifest: &ManifestState<T>,
) -> impl std::future::Future<Output = anyhow::Result<()>> + Send;
/// Returns cluster-wide weight distribution statistics.
/// Returns None when cluster stats are not available (disables fair-share).
fn get_cluster_stats(
&self,
worker_id: &Uuid,
) -> impl std::future::Future<Output = anyhow::Result<Option<ClusterStats>>> + Send {
let _ = worker_id;
async { Ok(None) }
}
}

View File

@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{collections::HashSet, sync::Arc};
use jiff::ToSpan;
use sha2::{Digest, Sha256};
@@ -11,6 +11,7 @@ use crate::{
Manifest, ManifestChangeEvent, ManifestChangeEventType, ManifestLease, ManifestState,
ManifestStatus, ManifestStatusState, WorkerId,
},
operator_state::ClusterStats,
};
#[derive(Clone, Default)]
@@ -64,12 +65,13 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStoreInProcess<T> {
.find(|m| m.manifest.name == manifest_state.manifest.name)
{
Some(manifest) => {
let mut manifest_state = manifest_state.clone();
if let Some(lease) = manifest_state.lease.as_mut() {
lease.last_seen = jiff::Timestamp::now();
if manifest.generation != manifest_state.generation {
anyhow::bail!("failed to update lease, generation mismatch");
}
manifest.lease = manifest_state.lease
if let Some(lease) = manifest.lease.as_mut() {
lease.last_seen = jiff::Timestamp::now();
}
}
None => anyhow::bail!("manifest is not found"),
}
@@ -90,12 +92,20 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStoreInProcess<T> {
.find(|m| m.manifest.name == manifest_state.manifest.name)
{
Some(manifest) => {
let mut manifest_state = manifest_state.clone();
manifest_state.lease = Some(ManifestLease {
// CAS: only acquire if generation matches (prevents race conditions)
if manifest.generation != manifest_state.generation {
anyhow::bail!(
"failed to acquire lease: generation mismatch (expected {}, got {})",
manifest_state.generation,
manifest.generation
);
}
manifest.lease = Some(ManifestLease {
owner: *worker_id,
last_seen: jiff::Timestamp::now(),
});
manifest.lease = manifest_state.lease
manifest.generation += 1;
}
None => anyhow::bail!("manifest is not found"),
}
@@ -183,6 +193,37 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStoreInProcess<T> {
Ok(())
}
async fn get_cluster_stats(
&self,
worker_id: &WorkerId,
) -> anyhow::Result<Option<ClusterStats>> {
let now = jiff::Timestamp::now().checked_sub(10.second())?;
let manifests = self.manifests.read().await;
let mut total_weight = 0u64;
let mut my_weight = 0u64;
let mut active_workers = HashSet::new();
for m in manifests.iter() {
if let Some(lease) = &m.lease {
if lease.last_seen > now {
let w = m.manifest.spec.weight();
total_weight += w;
active_workers.insert(lease.owner);
if &lease.owner == worker_id {
my_weight += w;
}
}
}
}
Ok(Some(ClusterStats {
total_weight,
active_workers: active_workers.len(),
my_weight,
}))
}
}
impl<T: Specification> BackingStoreInProcess<T> {

View File

@@ -11,6 +11,7 @@ use crate::{
manifests::{
Manifest, ManifestLease, ManifestState, ManifestStatus, ManifestStatusState, WorkerId,
},
operator_state::ClusterStats,
stores::BackingStoreEdge,
};
@@ -70,9 +71,12 @@ impl<T: Specification> BackingStorePostgres<T> {
created TIMESTAMPTZ NOT NULL,
updated TIMESTAMPTZ NOT NULL,
lease_owner_id UUID,
lease_last_updated TIMESTAMPTZ
lease_last_updated TIMESTAMPTZ,
weight BIGINT NOT NULL DEFAULT 1
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_manifest_name ON manifests(name, kind);
-- Migration for existing tables
ALTER TABLE manifests ADD COLUMN IF NOT EXISTS weight BIGINT NOT NULL DEFAULT 1;
"#,
)
.await
@@ -296,6 +300,7 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
let id = uuid::Uuid::now_v7();
let name = &manifest.name;
let kind = manifest.spec.kind();
let weight = manifest.spec.weight() as i64;
let content = serde_json::to_value(&manifest)?;
let hash = &sha2::Sha256::digest(serde_json::to_vec(&content)?)[..];
let status = serde_json::to_value(ManifestStatus {
@@ -311,16 +316,17 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
id, generation, name, kind, status,
manifest_content, manifest_hash,
lease_owner_id, lease_last_updated,
created, updated
created, updated, weight
) VALUES (
$1, 0, $2, $3, $4, $5, $6, NULL, NULL, now(), now()
$1, 0, $2, $3, $4, $5, $6, NULL, NULL, now(), now(), $7
)
ON CONFLICT (name, kind) DO UPDATE
SET
manifest_content = $5,
updated = now()
updated = now(),
weight = $7
"#,
&[&id, &name, &kind, &status, &content, &hash],
&[&id, &name, &kind, &status, &content, &hash, &weight],
)
.await
.context("failed to upsert manifest")?;
@@ -397,4 +403,36 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
Ok(())
}
async fn get_cluster_stats(
&self,
worker_id: &uuid::Uuid,
) -> anyhow::Result<Option<ClusterStats>> {
let row = self
.client
.query_one(
r#"
SELECT
COALESCE(SUM(weight), 0)::BIGINT AS total_weight,
COUNT(DISTINCT lease_owner_id)::BIGINT AS active_workers,
COALESCE(SUM(CASE WHEN lease_owner_id = $1 THEN weight ELSE 0 END), 0)::BIGINT AS my_weight
FROM manifests
WHERE lease_owner_id IS NOT NULL
AND lease_last_updated > now() - INTERVAL '30 seconds'
"#,
&[worker_id],
)
.await
.context("failed to get cluster stats")?;
let total_weight: i64 = row.get("total_weight");
let active_workers: i64 = row.get("active_workers");
let my_weight: i64 = row.get("my_weight");
Ok(Some(ClusterStats {
total_weight: total_weight as u64,
active_workers: active_workers as usize,
my_weight: my_weight as u64,
}))
}
}

View File

@@ -2,10 +2,13 @@ use anyhow::Context;
use jiff::Timestamp;
use tokio_util::sync::CancellationToken;
use rand::seq::SliceRandom;
use crate::{
Operator, OperatorState,
Operator, OperatorState, Specification,
control_plane::backing_store::{BackingStore, BackingStoreEdge},
manifests::{Action, ManifestName, ManifestState, ManifestStatusState, WorkerId},
operator_state::{ClusterStats, RebalancePolicy},
reconcile_queue::ReconcileQueue,
};
@@ -102,6 +105,10 @@ impl<T: Operator, TStore: BackingStoreEdge<T::Specifications>> Reconciler<T, TSt
}
/// Single sync iteration - check for manifests, acquire leases, enqueue work.
/// Implements 3-layer capacity management:
/// 1. Hard capacity limit (max_capacity)
/// 2. Cluster-aware fair share (get_cluster_stats)
/// 3. Voluntary shedding (RebalancePolicy::FairShare)
async fn sync_once(&self) -> anyhow::Result<()> {
let manifests = self
.store
@@ -110,37 +117,93 @@ impl<T: Operator, TStore: BackingStoreEdge<T::Specifications>> Reconciler<T, TSt
tracing::trace!(manifests = manifests.len(), "sync once manifests");
for manifest_state in manifests {
// Partition into owned and unowned
let (mut owned, mut unowned): (Vec<_>, Vec<_>) =
manifests.into_iter().partition(|m| {
matches!(&m.lease, Some(lease) if lease.owner == self.worker_id)
});
let mut owned_weight: u64 = owned.iter().map(|m| m.manifest.spec.weight()).sum();
// Layer 2: Fetch cluster stats for fair share calculation
let cluster_stats = self
.store
.get_cluster_stats(&self.worker_id)
.await
.unwrap_or_else(|e| {
tracing::warn!(error = %e, "failed to get cluster stats, proceeding without fair share");
None
});
// Layer 3: Voluntary shedding (before heartbeat, so shed manifests
// don't get their leases refreshed)
if let Err(e) = self
.maybe_shed_manifests(&mut owned, &mut owned_weight, &cluster_stats)
.await
{
tracing::warn!(error = %e, "failed during manifest shedding");
}
// Heartbeat and check changes for remaining owned manifests
for manifest_state in &owned {
let manifest_name = manifest_state.manifest.name.clone();
match &manifest_state.lease {
Some(lease) if lease.owner == self.worker_id => {
tracing::trace!("updating lease");
tracing::trace!(%manifest_name, "updating lease");
if let Err(e) = self.store.update_lease(manifest_state).await {
// Generation mismatch means someone else took the lease — expected contention
tracing::trace!(error = %e, %manifest_name, "failed to update lease, likely lost");
continue;
}
// We own the lease, update it
self.store
.update_lease(&manifest_state)
.await
.context("update lease")?;
if self.needs_reconciliation(manifest_state) {
self.reconcile_queue.enqueue(manifest_name).await;
}
}
// Check if there are unhandled changes
if self.needs_reconciliation(&manifest_state) {
self.reconcile_queue.enqueue(manifest_name).await;
}
}
_ => {
tracing::trace!("acquiring lease");
// Shuffle unowned to prevent thundering herd
unowned.shuffle(&mut rand::rng());
// No lease, try to acquire
self.store
.acquire_lease(&manifest_state, &self.worker_id)
.await
.context("acquire lease")?;
// Layers 1+2: Compute effective capacity limit
let effective_limit = self.effective_capacity_limit(owned_weight, &cluster_stats);
// Enqueue for reconciliation
self.reconcile_queue.enqueue(manifest_name).await;
tracing::trace!(
owned_weight,
?effective_limit,
unowned_count = unowned.len(),
"capacity check before acquisition"
);
// Acquire unowned manifests, respecting capacity
for manifest_state in unowned {
let manifest_name = manifest_state.manifest.name.clone();
let manifest_weight = manifest_state.manifest.spec.weight();
if let Some(limit) = effective_limit {
if owned_weight + manifest_weight > limit {
tracing::trace!(
owned_weight,
manifest_weight,
limit,
%manifest_name,
"skipping acquisition: would exceed capacity"
);
continue;
}
}
tracing::trace!(%manifest_name, "acquiring lease");
if let Err(e) = self
.store
.acquire_lease(&manifest_state, &self.worker_id)
.await
{
// CAS failure means another worker grabbed it first — expected contention
tracing::trace!(error = %e, %manifest_name, "failed to acquire lease, likely contention");
continue;
}
owned_weight += manifest_weight;
self.reconcile_queue.enqueue(manifest_name).await;
}
Ok(())
@@ -204,6 +267,152 @@ impl<T: Operator, TStore: BackingStoreEdge<T::Specifications>> Reconciler<T, TSt
false
}
/// Compute the effective capacity limit for this worker.
/// Returns None if there is no capacity limit.
fn effective_capacity_limit(
&self,
_owned_weight: u64,
cluster_stats: &Option<ClusterStats>,
) -> Option<u64> {
let max_capacity = self.operator.config().max_capacity;
let headroom = match &self.operator.config().rebalance_policy {
RebalancePolicy::FairShare { headroom } => *headroom,
RebalancePolicy::Disabled => 0,
};
match (max_capacity, cluster_stats) {
(Some(max_cap), Some(stats)) if stats.active_workers > 0 => {
// Account for cold start: if we have no weight, we're not yet
// counted in active_workers, so add ourselves
let effective_workers = if stats.my_weight == 0 {
stats.active_workers + 1
} else {
stats.active_workers
};
let fair_share = stats.total_weight / effective_workers as u64;
let fair_limit = fair_share.saturating_add(headroom);
Some(max_cap.min(fair_limit))
}
(Some(max_cap), _) => Some(max_cap),
(None, Some(stats)) if stats.active_workers > 0 => {
let effective_workers = if stats.my_weight == 0 {
stats.active_workers + 1
} else {
stats.active_workers
};
let fair_share = stats.total_weight / effective_workers as u64;
Some(fair_share.saturating_add(headroom))
}
_ => None,
}
}
/// Voluntarily shed manifests when owned weight exceeds fair share + headroom.
async fn maybe_shed_manifests(
&self,
owned: &mut Vec<ManifestState<T::Specifications>>,
owned_weight: &mut u64,
cluster_stats: &Option<ClusterStats>,
) -> anyhow::Result<()> {
let headroom = match &self.operator.config().rebalance_policy {
RebalancePolicy::FairShare { headroom } => *headroom,
RebalancePolicy::Disabled => return Ok(()),
};
let Some(stats) = cluster_stats else {
return Ok(());
};
if stats.active_workers == 0 {
return Ok(());
}
let fair_share = stats.total_weight / stats.active_workers as u64;
let target = fair_share.saturating_add(headroom);
if *owned_weight <= target {
return Ok(());
}
tracing::info!(
owned_weight = *owned_weight,
target,
fair_share,
headroom,
active_workers = stats.active_workers,
"owned weight exceeds target, shedding manifests"
);
// Sort by weight descending — shed fewer, larger manifests first
owned.sort_by(|a, b| b.manifest.spec.weight().cmp(&a.manifest.spec.weight()));
// Pick manifests to shed that bring us closest to target
let mut to_shed: Vec<usize> = Vec::new();
let mut projected_weight = *owned_weight;
for (i, manifest) in owned.iter().enumerate() {
if projected_weight <= target {
break;
}
let w = manifest.manifest.spec.weight();
let after_shed = projected_weight.saturating_sub(w);
let overshoot = projected_weight.saturating_sub(target);
let undershoot = target.saturating_sub(after_shed);
// Shed if we'd still be at or above target, or if overshoot is worse than undershoot
if after_shed >= target || overshoot > undershoot {
to_shed.push(i);
projected_weight = after_shed;
}
}
// Execute sheds in reverse order to maintain index validity
for &idx in to_shed.iter().rev() {
let manifest = &owned[idx];
let w = manifest.manifest.spec.weight();
tracing::info!(
manifest = %manifest.manifest.name,
weight = w,
"shedding manifest for rebalancing"
);
if let Err(e) = self.store.delete_lease(manifest, &self.worker_id).await {
tracing::warn!(
error = %e,
manifest = %manifest.manifest.name,
"failed to delete lease during shedding"
);
continue;
}
if let Err(_e) = self.operator.on_lease_lost(manifest).await {
tracing::warn!(
manifest = %manifest.manifest.name,
"on_lease_lost failed during shedding"
);
}
*owned_weight -= w;
}
// Remove shed manifests from the owned list
for &idx in to_shed.iter().rev() {
owned.remove(idx);
}
tracing::info!(
owned_weight = *owned_weight,
target,
shed_count = to_shed.len(),
"shedding complete"
);
Ok(())
}
/// Process the reconciliation queue.
/// Takes items from the queue and reconciles them, re-enqueuing with delay if needed.
async fn process_queue(&self, cancellation_token: &CancellationToken) -> anyhow::Result<()> {
@@ -243,7 +452,7 @@ impl<T: Operator, TStore: BackingStoreEdge<T::Specifications>> Reconciler<T, TSt
match &manifest.lease {
Some(lease) if lease.owner == self.worker_id => {}
_ => {
tracing::debug!(%manifest_name, "we don't own the lease, shutting down owned resources");
tracing::trace!(%manifest_name, "we don't own the lease, shutting down owned resources");
self.operator
.on_lease_lost(&manifest)

View File

@@ -4,6 +4,13 @@ use crate::manifests::{Action, ManifestState};
pub trait Specification: Clone + Serialize + DeserializeOwned + Send + Sync + 'static {
fn kind(&self) -> &'static str;
/// Returns the weight of this specification for capacity management.
/// Higher weight means this manifest consumes more of a worker's capacity budget.
/// Default is 1.
fn weight(&self) -> u64 {
1
}
}
#[allow(dead_code, unused_variables)]

View File

@@ -67,6 +67,10 @@ pub struct OperatorConfig {
/// Interval at which all manifests are re-enqueued for reconciliation.
/// Default is 5 minutes.
pub resync_interval: Duration,
/// Maximum total weight this worker will manage. None means unlimited.
pub max_capacity: Option<u64>,
/// Policy for active rebalancing of manifests across workers.
pub rebalance_policy: RebalancePolicy,
}
impl Default for OperatorConfig {
@@ -77,6 +81,8 @@ impl Default for OperatorConfig {
},
reconcile_on: Default::default(),
resync_interval: Duration::from_secs(5 * 60),
max_capacity: None,
rebalance_policy: RebalancePolicy::default(),
}
}
}
@@ -103,3 +109,34 @@ impl Default for BackoffPolicy {
}
}
}
/// Policy for active rebalancing of manifests across workers.
#[derive(Clone, Debug)]
pub enum RebalancePolicy {
/// No active rebalancing. Workers only limit acquisition via max_capacity.
Disabled,
/// Fair-share rebalancing. When a worker's owned weight exceeds
/// fair_share + headroom, it voluntarily releases excess manifests.
FairShare {
/// Extra weight budget above fair share before shedding begins.
/// Prevents thrashing when weights don't divide evenly.
headroom: u64,
},
}
impl Default for RebalancePolicy {
fn default() -> Self {
RebalancePolicy::Disabled
}
}
/// Statistics about the cluster's current weight distribution.
#[derive(Debug, Clone)]
pub struct ClusterStats {
/// Total weight of all manifests with active leases across the cluster.
pub total_weight: u64,
/// Number of workers with active leases.
pub active_workers: usize,
/// Total weight of manifests owned by the requesting worker.
pub my_weight: u64,
}