diff --git a/Cargo.lock b/Cargo.lock index 34ee2c7..8d52c08 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1647,6 +1647,20 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "rebalancing-stress" +version = "0.1.0" +dependencies = [ + "anyhow", + "nocontrol", + "serde", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", + "uuid", +] + [[package]] name = "redox_syscall" version = "0.5.18" diff --git a/crates/nocontrol-tui/src/lib.rs b/crates/nocontrol-tui/src/lib.rs index 5160098..9f358d5 100644 --- a/crates/nocontrol-tui/src/lib.rs +++ b/crates/nocontrol-tui/src/lib.rs @@ -323,6 +323,7 @@ async fn run_app< ) -> anyhow::Result<()> where TOperator::Specifications: Send + Sync, + ::Error: Send + Sync + 'static, { // Spawn refresh task let app_clone = app.clone(); diff --git a/crates/nocontrol/src/control_plane/backing_store.rs b/crates/nocontrol/src/control_plane/backing_store.rs index 2caa008..db6b19a 100644 --- a/crates/nocontrol/src/control_plane/backing_store.rs +++ b/crates/nocontrol/src/control_plane/backing_store.rs @@ -6,6 +6,7 @@ use crate::{ Specification, control_plane::backing_store::in_process::BackingStoreInProcess, manifests::{Manifest, ManifestState, WorkerId}, + operator_state::ClusterStats, }; pub mod in_process; @@ -96,4 +97,14 @@ pub trait BackingStoreEdge: Send + Sync + Clone { &self, manifest: &ManifestState, ) -> impl std::future::Future> + Send; + + /// Returns cluster-wide weight distribution statistics. + /// Returns None when cluster stats are not available (disables fair-share). + fn get_cluster_stats( + &self, + worker_id: &Uuid, + ) -> impl std::future::Future>> + Send { + let _ = worker_id; + async { Ok(None) } + } } diff --git a/crates/nocontrol/src/control_plane/backing_store/in_process.rs b/crates/nocontrol/src/control_plane/backing_store/in_process.rs index 49d426d..b880c09 100644 --- a/crates/nocontrol/src/control_plane/backing_store/in_process.rs +++ b/crates/nocontrol/src/control_plane/backing_store/in_process.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; use jiff::ToSpan; use sha2::{Digest, Sha256}; @@ -11,6 +11,7 @@ use crate::{ Manifest, ManifestChangeEvent, ManifestChangeEventType, ManifestLease, ManifestState, ManifestStatus, ManifestStatusState, WorkerId, }, + operator_state::ClusterStats, }; #[derive(Clone, Default)] @@ -64,12 +65,13 @@ impl BackingStoreEdge for BackingStoreInProcess { .find(|m| m.manifest.name == manifest_state.manifest.name) { Some(manifest) => { - let mut manifest_state = manifest_state.clone(); - if let Some(lease) = manifest_state.lease.as_mut() { - lease.last_seen = jiff::Timestamp::now(); + if manifest.generation != manifest_state.generation { + anyhow::bail!("failed to update lease, generation mismatch"); } - manifest.lease = manifest_state.lease + if let Some(lease) = manifest.lease.as_mut() { + lease.last_seen = jiff::Timestamp::now(); + } } None => anyhow::bail!("manifest is not found"), } @@ -90,12 +92,20 @@ impl BackingStoreEdge for BackingStoreInProcess { .find(|m| m.manifest.name == manifest_state.manifest.name) { Some(manifest) => { - let mut manifest_state = manifest_state.clone(); - manifest_state.lease = Some(ManifestLease { + // CAS: only acquire if generation matches (prevents race conditions) + if manifest.generation != manifest_state.generation { + anyhow::bail!( + "failed to acquire lease: generation mismatch (expected {}, got {})", + manifest_state.generation, + manifest.generation + ); + } + + manifest.lease = Some(ManifestLease { owner: *worker_id, last_seen: jiff::Timestamp::now(), }); - manifest.lease = manifest_state.lease + manifest.generation += 1; } None => anyhow::bail!("manifest is not found"), } @@ -183,6 +193,37 @@ impl BackingStoreEdge for BackingStoreInProcess { Ok(()) } + + async fn get_cluster_stats( + &self, + worker_id: &WorkerId, + ) -> anyhow::Result> { + let now = jiff::Timestamp::now().checked_sub(10.second())?; + let manifests = self.manifests.read().await; + + let mut total_weight = 0u64; + let mut my_weight = 0u64; + let mut active_workers = HashSet::new(); + + for m in manifests.iter() { + if let Some(lease) = &m.lease { + if lease.last_seen > now { + let w = m.manifest.spec.weight(); + total_weight += w; + active_workers.insert(lease.owner); + if &lease.owner == worker_id { + my_weight += w; + } + } + } + } + + Ok(Some(ClusterStats { + total_weight, + active_workers: active_workers.len(), + my_weight, + })) + } } impl BackingStoreInProcess { diff --git a/crates/nocontrol/src/control_plane/backing_store/postgres.rs b/crates/nocontrol/src/control_plane/backing_store/postgres.rs index e2a727a..187acc6 100644 --- a/crates/nocontrol/src/control_plane/backing_store/postgres.rs +++ b/crates/nocontrol/src/control_plane/backing_store/postgres.rs @@ -11,6 +11,7 @@ use crate::{ manifests::{ Manifest, ManifestLease, ManifestState, ManifestStatus, ManifestStatusState, WorkerId, }, + operator_state::ClusterStats, stores::BackingStoreEdge, }; @@ -70,9 +71,12 @@ impl BackingStorePostgres { created TIMESTAMPTZ NOT NULL, updated TIMESTAMPTZ NOT NULL, lease_owner_id UUID, - lease_last_updated TIMESTAMPTZ + lease_last_updated TIMESTAMPTZ, + weight BIGINT NOT NULL DEFAULT 1 ); CREATE UNIQUE INDEX IF NOT EXISTS idx_manifest_name ON manifests(name, kind); + -- Migration for existing tables + ALTER TABLE manifests ADD COLUMN IF NOT EXISTS weight BIGINT NOT NULL DEFAULT 1; "#, ) .await @@ -296,6 +300,7 @@ impl BackingStoreEdge for BackingStorePostgres { let id = uuid::Uuid::now_v7(); let name = &manifest.name; let kind = manifest.spec.kind(); + let weight = manifest.spec.weight() as i64; let content = serde_json::to_value(&manifest)?; let hash = &sha2::Sha256::digest(serde_json::to_vec(&content)?)[..]; let status = serde_json::to_value(ManifestStatus { @@ -311,16 +316,17 @@ impl BackingStoreEdge for BackingStorePostgres { id, generation, name, kind, status, manifest_content, manifest_hash, lease_owner_id, lease_last_updated, - created, updated + created, updated, weight ) VALUES ( - $1, 0, $2, $3, $4, $5, $6, NULL, NULL, now(), now() + $1, 0, $2, $3, $4, $5, $6, NULL, NULL, now(), now(), $7 ) ON CONFLICT (name, kind) DO UPDATE SET manifest_content = $5, - updated = now() + updated = now(), + weight = $7 "#, - &[&id, &name, &kind, &status, &content, &hash], + &[&id, &name, &kind, &status, &content, &hash, &weight], ) .await .context("failed to upsert manifest")?; @@ -397,4 +403,36 @@ impl BackingStoreEdge for BackingStorePostgres { Ok(()) } + + async fn get_cluster_stats( + &self, + worker_id: &uuid::Uuid, + ) -> anyhow::Result> { + let row = self + .client + .query_one( + r#" + SELECT + COALESCE(SUM(weight), 0)::BIGINT AS total_weight, + COUNT(DISTINCT lease_owner_id)::BIGINT AS active_workers, + COALESCE(SUM(CASE WHEN lease_owner_id = $1 THEN weight ELSE 0 END), 0)::BIGINT AS my_weight + FROM manifests + WHERE lease_owner_id IS NOT NULL + AND lease_last_updated > now() - INTERVAL '30 seconds' + "#, + &[worker_id], + ) + .await + .context("failed to get cluster stats")?; + + let total_weight: i64 = row.get("total_weight"); + let active_workers: i64 = row.get("active_workers"); + let my_weight: i64 = row.get("my_weight"); + + Ok(Some(ClusterStats { + total_weight: total_weight as u64, + active_workers: active_workers as usize, + my_weight: my_weight as u64, + })) + } } diff --git a/crates/nocontrol/src/control_plane/reconciler.rs b/crates/nocontrol/src/control_plane/reconciler.rs index 19baebd..3842c0b 100644 --- a/crates/nocontrol/src/control_plane/reconciler.rs +++ b/crates/nocontrol/src/control_plane/reconciler.rs @@ -2,10 +2,13 @@ use anyhow::Context; use jiff::Timestamp; use tokio_util::sync::CancellationToken; +use rand::seq::SliceRandom; + use crate::{ - Operator, OperatorState, + Operator, OperatorState, Specification, control_plane::backing_store::{BackingStore, BackingStoreEdge}, manifests::{Action, ManifestName, ManifestState, ManifestStatusState, WorkerId}, + operator_state::{ClusterStats, RebalancePolicy}, reconcile_queue::ReconcileQueue, }; @@ -102,6 +105,10 @@ impl> Reconciler anyhow::Result<()> { let manifests = self .store @@ -110,37 +117,93 @@ impl> Reconciler, Vec<_>) = + manifests.into_iter().partition(|m| { + matches!(&m.lease, Some(lease) if lease.owner == self.worker_id) + }); + + let mut owned_weight: u64 = owned.iter().map(|m| m.manifest.spec.weight()).sum(); + + // Layer 2: Fetch cluster stats for fair share calculation + let cluster_stats = self + .store + .get_cluster_stats(&self.worker_id) + .await + .unwrap_or_else(|e| { + tracing::warn!(error = %e, "failed to get cluster stats, proceeding without fair share"); + None + }); + + // Layer 3: Voluntary shedding (before heartbeat, so shed manifests + // don't get their leases refreshed) + if let Err(e) = self + .maybe_shed_manifests(&mut owned, &mut owned_weight, &cluster_stats) + .await + { + tracing::warn!(error = %e, "failed during manifest shedding"); + } + + // Heartbeat and check changes for remaining owned manifests + for manifest_state in &owned { let manifest_name = manifest_state.manifest.name.clone(); - match &manifest_state.lease { - Some(lease) if lease.owner == self.worker_id => { - tracing::trace!("updating lease"); + tracing::trace!(%manifest_name, "updating lease"); + if let Err(e) = self.store.update_lease(manifest_state).await { + // Generation mismatch means someone else took the lease — expected contention + tracing::trace!(error = %e, %manifest_name, "failed to update lease, likely lost"); + continue; + } - // We own the lease, update it - self.store - .update_lease(&manifest_state) - .await - .context("update lease")?; + if self.needs_reconciliation(manifest_state) { + self.reconcile_queue.enqueue(manifest_name).await; + } + } - // Check if there are unhandled changes - if self.needs_reconciliation(&manifest_state) { - self.reconcile_queue.enqueue(manifest_name).await; - } - } - _ => { - tracing::trace!("acquiring lease"); + // Shuffle unowned to prevent thundering herd + unowned.shuffle(&mut rand::rng()); - // No lease, try to acquire - self.store - .acquire_lease(&manifest_state, &self.worker_id) - .await - .context("acquire lease")?; + // Layers 1+2: Compute effective capacity limit + let effective_limit = self.effective_capacity_limit(owned_weight, &cluster_stats); - // Enqueue for reconciliation - self.reconcile_queue.enqueue(manifest_name).await; + tracing::trace!( + owned_weight, + ?effective_limit, + unowned_count = unowned.len(), + "capacity check before acquisition" + ); + + // Acquire unowned manifests, respecting capacity + for manifest_state in unowned { + let manifest_name = manifest_state.manifest.name.clone(); + let manifest_weight = manifest_state.manifest.spec.weight(); + + if let Some(limit) = effective_limit { + if owned_weight + manifest_weight > limit { + tracing::trace!( + owned_weight, + manifest_weight, + limit, + %manifest_name, + "skipping acquisition: would exceed capacity" + ); + continue; } } + + tracing::trace!(%manifest_name, "acquiring lease"); + if let Err(e) = self + .store + .acquire_lease(&manifest_state, &self.worker_id) + .await + { + // CAS failure means another worker grabbed it first — expected contention + tracing::trace!(error = %e, %manifest_name, "failed to acquire lease, likely contention"); + continue; + } + + owned_weight += manifest_weight; + self.reconcile_queue.enqueue(manifest_name).await; } Ok(()) @@ -204,6 +267,152 @@ impl> Reconciler, + ) -> Option { + let max_capacity = self.operator.config().max_capacity; + + let headroom = match &self.operator.config().rebalance_policy { + RebalancePolicy::FairShare { headroom } => *headroom, + RebalancePolicy::Disabled => 0, + }; + + match (max_capacity, cluster_stats) { + (Some(max_cap), Some(stats)) if stats.active_workers > 0 => { + // Account for cold start: if we have no weight, we're not yet + // counted in active_workers, so add ourselves + let effective_workers = if stats.my_weight == 0 { + stats.active_workers + 1 + } else { + stats.active_workers + }; + let fair_share = stats.total_weight / effective_workers as u64; + let fair_limit = fair_share.saturating_add(headroom); + Some(max_cap.min(fair_limit)) + } + (Some(max_cap), _) => Some(max_cap), + (None, Some(stats)) if stats.active_workers > 0 => { + let effective_workers = if stats.my_weight == 0 { + stats.active_workers + 1 + } else { + stats.active_workers + }; + let fair_share = stats.total_weight / effective_workers as u64; + Some(fair_share.saturating_add(headroom)) + } + _ => None, + } + } + + /// Voluntarily shed manifests when owned weight exceeds fair share + headroom. + async fn maybe_shed_manifests( + &self, + owned: &mut Vec>, + owned_weight: &mut u64, + cluster_stats: &Option, + ) -> anyhow::Result<()> { + let headroom = match &self.operator.config().rebalance_policy { + RebalancePolicy::FairShare { headroom } => *headroom, + RebalancePolicy::Disabled => return Ok(()), + }; + + let Some(stats) = cluster_stats else { + return Ok(()); + }; + + if stats.active_workers == 0 { + return Ok(()); + } + + let fair_share = stats.total_weight / stats.active_workers as u64; + let target = fair_share.saturating_add(headroom); + + if *owned_weight <= target { + return Ok(()); + } + + tracing::info!( + owned_weight = *owned_weight, + target, + fair_share, + headroom, + active_workers = stats.active_workers, + "owned weight exceeds target, shedding manifests" + ); + + // Sort by weight descending — shed fewer, larger manifests first + owned.sort_by(|a, b| b.manifest.spec.weight().cmp(&a.manifest.spec.weight())); + + // Pick manifests to shed that bring us closest to target + let mut to_shed: Vec = Vec::new(); + let mut projected_weight = *owned_weight; + + for (i, manifest) in owned.iter().enumerate() { + if projected_weight <= target { + break; + } + + let w = manifest.manifest.spec.weight(); + let after_shed = projected_weight.saturating_sub(w); + let overshoot = projected_weight.saturating_sub(target); + let undershoot = target.saturating_sub(after_shed); + + // Shed if we'd still be at or above target, or if overshoot is worse than undershoot + if after_shed >= target || overshoot > undershoot { + to_shed.push(i); + projected_weight = after_shed; + } + } + + // Execute sheds in reverse order to maintain index validity + for &idx in to_shed.iter().rev() { + let manifest = &owned[idx]; + let w = manifest.manifest.spec.weight(); + + tracing::info!( + manifest = %manifest.manifest.name, + weight = w, + "shedding manifest for rebalancing" + ); + + if let Err(e) = self.store.delete_lease(manifest, &self.worker_id).await { + tracing::warn!( + error = %e, + manifest = %manifest.manifest.name, + "failed to delete lease during shedding" + ); + continue; + } + + if let Err(_e) = self.operator.on_lease_lost(manifest).await { + tracing::warn!( + manifest = %manifest.manifest.name, + "on_lease_lost failed during shedding" + ); + } + + *owned_weight -= w; + } + + // Remove shed manifests from the owned list + for &idx in to_shed.iter().rev() { + owned.remove(idx); + } + + tracing::info!( + owned_weight = *owned_weight, + target, + shed_count = to_shed.len(), + "shedding complete" + ); + + Ok(()) + } + /// Process the reconciliation queue. /// Takes items from the queue and reconciles them, re-enqueuing with delay if needed. async fn process_queue(&self, cancellation_token: &CancellationToken) -> anyhow::Result<()> { @@ -243,7 +452,7 @@ impl> Reconciler {} _ => { - tracing::debug!(%manifest_name, "we don't own the lease, shutting down owned resources"); + tracing::trace!(%manifest_name, "we don't own the lease, shutting down owned resources"); self.operator .on_lease_lost(&manifest) diff --git a/crates/nocontrol/src/operator.rs b/crates/nocontrol/src/operator.rs index 595f63d..8a53092 100644 --- a/crates/nocontrol/src/operator.rs +++ b/crates/nocontrol/src/operator.rs @@ -4,6 +4,13 @@ use crate::manifests::{Action, ManifestState}; pub trait Specification: Clone + Serialize + DeserializeOwned + Send + Sync + 'static { fn kind(&self) -> &'static str; + + /// Returns the weight of this specification for capacity management. + /// Higher weight means this manifest consumes more of a worker's capacity budget. + /// Default is 1. + fn weight(&self) -> u64 { + 1 + } } #[allow(dead_code, unused_variables)] diff --git a/crates/nocontrol/src/operator_state.rs b/crates/nocontrol/src/operator_state.rs index 03da63e..10f904e 100644 --- a/crates/nocontrol/src/operator_state.rs +++ b/crates/nocontrol/src/operator_state.rs @@ -67,6 +67,10 @@ pub struct OperatorConfig { /// Interval at which all manifests are re-enqueued for reconciliation. /// Default is 5 minutes. pub resync_interval: Duration, + /// Maximum total weight this worker will manage. None means unlimited. + pub max_capacity: Option, + /// Policy for active rebalancing of manifests across workers. + pub rebalance_policy: RebalancePolicy, } impl Default for OperatorConfig { @@ -77,6 +81,8 @@ impl Default for OperatorConfig { }, reconcile_on: Default::default(), resync_interval: Duration::from_secs(5 * 60), + max_capacity: None, + rebalance_policy: RebalancePolicy::default(), } } } @@ -103,3 +109,34 @@ impl Default for BackoffPolicy { } } } + +/// Policy for active rebalancing of manifests across workers. +#[derive(Clone, Debug)] +pub enum RebalancePolicy { + /// No active rebalancing. Workers only limit acquisition via max_capacity. + Disabled, + /// Fair-share rebalancing. When a worker's owned weight exceeds + /// fair_share + headroom, it voluntarily releases excess manifests. + FairShare { + /// Extra weight budget above fair share before shedding begins. + /// Prevents thrashing when weights don't divide evenly. + headroom: u64, + }, +} + +impl Default for RebalancePolicy { + fn default() -> Self { + RebalancePolicy::Disabled + } +} + +/// Statistics about the cluster's current weight distribution. +#[derive(Debug, Clone)] +pub struct ClusterStats { + /// Total weight of all manifests with active leases across the cluster. + pub total_weight: u64, + /// Number of workers with active leases. + pub active_workers: usize, + /// Total weight of manifests owned by the requesting worker. + pub my_weight: u64, +} diff --git a/examples/postgres-backed/src/main.rs b/examples/postgres-backed/src/main.rs index 3fc72b1..d263a28 100644 --- a/examples/postgres-backed/src/main.rs +++ b/examples/postgres-backed/src/main.rs @@ -29,7 +29,6 @@ async fn main() -> anyhow::Result<()> { EnvFilter::from_default_env() .add_directive("nocontrol=trace".parse().unwrap()) .add_directive("postgres_backend=trace".parse().unwrap()) - .add_directive("debug".parse().unwrap()), ) .with_file(false) diff --git a/examples/rebalancing-stress/Cargo.toml b/examples/rebalancing-stress/Cargo.toml new file mode 100644 index 0000000..51de4e9 --- /dev/null +++ b/examples/rebalancing-stress/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "rebalancing-stress" +version = "0.1.0" +edition = "2024" +publish = false + +[dependencies] +nocontrol.workspace = true + +anyhow.workspace = true +tokio.workspace = true +serde.workspace = true +tracing-subscriber.workspace = true +tracing.workspace = true +uuid.workspace = true +tokio-util = { version = "0.7", features = ["rt"] } diff --git a/examples/rebalancing-stress/src/main.rs b/examples/rebalancing-stress/src/main.rs new file mode 100644 index 0000000..f1cd931 --- /dev/null +++ b/examples/rebalancing-stress/src/main.rs @@ -0,0 +1,268 @@ +//! Stress test for weight-based rebalancing. +//! +//! Simulates multiple workers sharing an in-process backing store. +//! Manifests have varying weights. Workers have capacity limits and +//! use FairShare rebalancing to redistribute work as nodes join/leave. +//! +//! Run with: RUST_LOG=info cargo run -p rebalancing-stress + +use std::time::Duration; + +use nocontrol::{ + ControlPlane, Operator, OperatorConfig, OperatorState, RebalancePolicy, Specification, + manifests::{Action, Manifest, ManifestMetadata, ManifestState}, + stores::{BackingStore, BackingStoreEdge}, +}; +use serde::{Deserialize, Serialize}; +use tokio_util::sync::CancellationToken; +use tracing_subscriber::EnvFilter; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt() + .with_env_filter( + EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")), + ) + .with_target(false) + .without_time() + .init(); + + let store = BackingStore::in_process(); + + // Create 20 manifests with varying weights (total weight = 110) + let manifests = vec![ + ("heavy-job-1", 10), + ("heavy-job-2", 10), + ("heavy-job-3", 10), + ("medium-job-1", 5), + ("medium-job-2", 5), + ("medium-job-3", 5), + ("medium-job-4", 5), + ("medium-job-5", 5), + ("medium-job-6", 5), + ("light-job-1", 1), + ("light-job-2", 1), + ("light-job-3", 1), + ("light-job-4", 1), + ("light-job-5", 1), + ("light-job-6", 1), + ("light-job-7", 1), + ("light-job-8", 1), + ("light-job-9", 1), + ("light-job-10", 1), + ("tiny-job-1", 0), + ]; + + let total_weight: u64 = manifests.iter().map(|(_, w)| *w).sum(); + tracing::info!( + manifest_count = manifests.len(), + total_weight, + "creating manifests" + ); + + // Insert all manifests into the shared store using a temporary control plane + let seed_operator = OperatorState::new(StressOperator); + let seed_cp = ControlPlane::new(seed_operator, store.clone()); + for (name, weight) in &manifests { + seed_cp + .add_manifest(Manifest { + name: name.to_string(), + metadata: ManifestMetadata {}, + spec: WeightedJob { + weight: *weight, + name: name.to_string(), + }, + }) + .await?; + } + + let cancellation = CancellationToken::new(); + + // --- Phase 1: Start 2 workers --- + tracing::info!("=== PHASE 1: Starting 2 workers (capacity=60 each, headroom=5) ==="); + + let worker1 = spawn_worker("worker-1", store.clone(), 60, 5, cancellation.child_token()); + let worker2 = spawn_worker("worker-2", store.clone(), 60, 5, cancellation.child_token()); + + // Let them stabilize + tokio::time::sleep(Duration::from_secs(15)).await; + print_distribution(&seed_cp).await; + + // --- Phase 2: Add a 3rd worker --- + tracing::info!("=== PHASE 2: Adding worker-3 ==="); + let worker3 = spawn_worker("worker-3", store.clone(), 60, 5, cancellation.child_token()); + + // Let rebalancing happen + tokio::time::sleep(Duration::from_secs(25)).await; + print_distribution(&seed_cp).await; + + // --- Phase 3: Add a 4th worker with low capacity --- + tracing::info!("=== PHASE 3: Adding worker-4 (capacity=15) ==="); + let worker4 = spawn_worker("worker-4", store.clone(), 15, 2, cancellation.child_token()); + + tokio::time::sleep(Duration::from_secs(25)).await; + print_distribution(&seed_cp).await; + + // --- Phase 4: Kill worker-1, observe redistribution --- + tracing::info!("=== PHASE 4: Killing worker-1, observing redistribution ==="); + worker1.cancel(); + + // Wait for lease expiry (10s in-process) + sync cycles + tokio::time::sleep(Duration::from_secs(25)).await; + print_distribution(&seed_cp).await; + + // Cleanup + tracing::info!("=== DONE: Shutting down all workers ==="); + cancellation.cancel(); + worker2.cancel(); + worker3.cancel(); + worker4.cancel(); + + // Give workers time to shut down gracefully + tokio::time::sleep(Duration::from_secs(2)).await; + + Ok(()) +} + +fn spawn_worker + 'static>( + name: &'static str, + store: BackingStore, + max_capacity: u64, + headroom: u64, + cancellation: CancellationToken, +) -> CancellationToken { + let worker_cancel = CancellationToken::new(); + + tokio::spawn({ + let cancel = worker_cancel.clone(); + async move { + let config = OperatorConfig { + max_capacity: Some(max_capacity), + rebalance_policy: RebalancePolicy::FairShare { headroom }, + resync_interval: Duration::from_secs(60), + ..Default::default() + }; + + let operator = OperatorState::new_with_config(StressOperator, config); + let cp = ControlPlane::new(operator, store); + + tracing::info!(%name, max_capacity, headroom, "worker started"); + + let combined = CancellationToken::new(); + let combined_child = combined.child_token(); + + tokio::spawn({ + let combined = combined.clone(); + async move { + tokio::select! { + _ = cancel.cancelled() => {} + _ = cancellation.cancelled() => {} + } + combined.cancel(); + } + }); + + if let Err(e) = cp.execute_with_cancellation(combined_child).await { + tracing::error!(%name, error = %e, "worker failed"); + } + + tracing::info!(%name, "worker stopped"); + } + }); + + worker_cancel +} + +async fn print_distribution(cp: &ControlPlane) +where + TOperator: Operator, + TStore: BackingStoreEdge, +{ + let manifests = cp.get_manifests().await.unwrap_or_default(); + + let mut by_worker: std::collections::HashMap = + std::collections::HashMap::new(); + let mut unowned = Vec::new(); + + for m in &manifests { + let w = m.manifest.spec.weight; + match &m.lease { + Some(lease) => { + let entry = by_worker + .entry(format!("{}", lease.owner)) + .or_insert((0, 0)); + entry.0 += 1; + entry.1 += w; + } + None => { + unowned.push(m.manifest.name.as_str()); + } + } + } + + tracing::info!("--- Distribution ---"); + let mut workers: Vec<_> = by_worker.into_iter().collect(); + workers.sort_by_key(|(id, _)| id.clone()); + for (worker_id, (count, weight)) in &workers { + tracing::info!(worker = %worker_id, count, weight, ""); + } + if !unowned.is_empty() { + tracing::info!(count = unowned.len(), "unowned manifests"); + } + let total_owned_weight: u64 = workers.iter().map(|(_, (_, w))| w).sum(); + tracing::info!( + total_owned_weight, + total_manifests = manifests.len(), + workers = workers.len(), + "summary" + ); + tracing::info!("--------------------"); +} + +// --- Operator and Specification --- + +#[derive(Clone)] +struct StressOperator; + +impl Operator for StressOperator { + type Specifications = WeightedJob; + type Error = anyhow::Error; + + async fn reconcile( + &self, + manifest: &mut ManifestState, + ) -> Result { + // Simulate work proportional to weight + let work_ms = manifest.manifest.spec.weight * 10; + tokio::time::sleep(Duration::from_millis(work_ms)).await; + + Ok(Action::Requeue(Duration::from_secs(5))) + } + + async fn on_lease_lost( + &self, + manifest: &ManifestState, + ) -> Result<(), Self::Error> { + tracing::debug!( + manifest = %manifest.manifest.name, + "lease lost, cleaning up" + ); + Ok(()) + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct WeightedJob { + pub name: String, + pub weight: u64, +} + +impl Specification for WeightedJob { + fn kind(&self) -> &'static str { + "weighted-job" + } + + fn weight(&self) -> u64 { + self.weight + } +}