From f0d2180c4b5b3c646e193174570e7f37ec7f5e0a Mon Sep 17 00:00:00 2001 From: kjuulh Date: Fri, 9 Jan 2026 10:14:41 +0100 Subject: [PATCH] feat: add ubernetes like example Signed-off-by: kjuulh --- Cargo.lock | 136 ++++++++++++++- Cargo.toml | 1 + crates/nocontrol/Cargo.toml | 11 +- examples/kubernetes-like/Cargo.toml | 2 + examples/kubernetes-like/src/main.rs | 243 +++++++++++++++++++++++---- 5 files changed, 346 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1c8c596..97c148a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -242,12 +242,43 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-core", + "futures-macro", + "futures-task", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "fuzzy-matcher" version = "0.3.7" @@ -267,6 +298,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "getrandom" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "getrandom" version = "0.3.4" @@ -415,9 +457,11 @@ dependencies = [ "anyhow", "nocontrol", "nocontrol-tui", - "rand", + "noprocess", + "rand 0.9.2", "serde", "tokio", + "tokio-util", "tracing", "tracing-subscriber", "uuid", @@ -507,7 +551,7 @@ dependencies = [ "hex", "insta", "jiff", - "rand", + "rand 0.9.2", "serde", "serde_json", "sha2", @@ -531,6 +575,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "noprocess" +version = "0.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71c463606f7455e29b38995bea22b44df256dd5578df6cdb39126fb0ef40f53c" +dependencies = [ + "rand 0.8.5", + "thiserror", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -581,6 +638,12 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "portable-atomic" version = "1.13.0" @@ -629,14 +692,35 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + [[package]] name = "rand" version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" dependencies = [ - "rand_chacha", - "rand_core", + "rand_chacha 0.9.0", + "rand_core 0.9.3", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core 0.6.4", ] [[package]] @@ -646,7 +730,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.9.3", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.16", ] [[package]] @@ -655,7 +748,7 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" dependencies = [ - "getrandom", + "getrandom 0.3.4", ] [[package]] @@ -849,6 +942,12 @@ version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbbb5d9659141646ae647b42fe094daf6c6192d1620870b449d9557f748b2daa" +[[package]] +name = "slab" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" + [[package]] name = "smallvec" version = "1.15.1" @@ -917,12 +1016,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "655da9c7eb6305c55742045d5a8d2037996d61d8de95806335c7c86ce0f82e9c" dependencies = [ "fastrand", - "getrandom", + "getrandom 0.3.4", "once_cell", "rustix 1.1.3", "windows-sys 0.61.2", ] +[[package]] +name = "thiserror" +version = "2.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.1.9" @@ -969,6 +1088,7 @@ dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", "pin-project-lite", "tokio", ] @@ -1103,7 +1223,7 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a" dependencies = [ - "getrandom", + "getrandom 0.3.4", "js-sys", "serde_core", "wasm-bindgen", diff --git a/Cargo.toml b/Cargo.toml index 0dcac3b..c5a4456 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ version = "0.0.1" nocontrol = { path = "crates/nocontrol" } nocontrol-tui = { path = "crates/nocontrol-tui" } +noprocess = { version = "0.0.2" } anyhow = { version = "1.0.71" } tokio = { version = "1", features = ["full"] } tracing = { version = "0.1", features = ["log"] } diff --git a/crates/nocontrol/Cargo.toml b/crates/nocontrol/Cargo.toml index 4a582d7..d5d65b5 100644 --- a/crates/nocontrol/Cargo.toml +++ b/crates/nocontrol/Cargo.toml @@ -11,17 +11,18 @@ publish = true [dependencies] anyhow.workspace = true +rand.workspace = true +serde.workspace = true +tokio.workspace = true +tracing.workspace = true +uuid.workspace = true + async-trait = "0.1.89" hex = "0.4.3" jiff = { version = "0.2.17", features = ["serde"] } -rand.workspace = true -serde.workspace = true serde_json = "1.0.148" sha2 = "0.10.9" -tokio.workspace = true tokio-util = "0.7.18" -tracing.workspace = true -uuid.workspace = true [dev-dependencies] insta = "1.46.0" diff --git a/examples/kubernetes-like/Cargo.toml b/examples/kubernetes-like/Cargo.toml index 01494ce..b7896f5 100644 --- a/examples/kubernetes-like/Cargo.toml +++ b/examples/kubernetes-like/Cargo.toml @@ -15,3 +15,5 @@ tracing-subscriber.workspace = true tracing.workspace = true rand.workspace = true uuid.workspace = true +noprocess.workspace = true +tokio-util = { version = "0.7", features = ["rt"] } diff --git a/examples/kubernetes-like/src/main.rs b/examples/kubernetes-like/src/main.rs index 86ebf40..fc255b2 100644 --- a/examples/kubernetes-like/src/main.rs +++ b/examples/kubernetes-like/src/main.rs @@ -1,15 +1,17 @@ -use std::time::Duration; +use std::{collections::HashMap, future::Future, sync::Arc, time::Duration}; use nocontrol::{ Operator, OperatorState, Specification, - manifests::{Action, Manifest, ManifestMetadata, ManifestState}, + manifests::{Action, Manifest, ManifestMetadata, ManifestState, ManifestStatusState}, }; +use noprocess::{HandleID, Process, ProcessHandler, ProcessManager, ProcessResult, ProcessState}; use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; +use tokio_util::sync::CancellationToken; use tracing_subscriber::EnvFilter; #[tokio::main] async fn main() -> anyhow::Result<()> { - // Setup logging to file let output_file = std::fs::File::create("target/nocontrol.log")?; tracing_subscriber::fmt() @@ -21,41 +23,46 @@ async fn main() -> anyhow::Result<()> { .without_time() .init(); - let operator = OperatorState::new(MyOperator {}); + let process_manager = ProcessManager::new(); + let operator = OperatorState::new(ProcessOperator::new(process_manager)); let control_plane = nocontrol::ControlPlane::new(operator); - // Add initial manifest + // Add initial process manifest - desired state is Running control_plane .add_manifest(Manifest { - name: "initial-deployment".into(), + name: "worker-process".into(), metadata: ManifestMetadata {}, - spec: Specifications::Deployment(DeploymentControllerManifest { - name: "initial-app".into(), + spec: Specifications::Process(ProcessManifest { + name: "data-worker".into(), + desired_state: DesiredProcessState::Running, }), }) .await?; - // Spawn random manifest updater + // Spawn a task that toggles the desired state periodically tokio::spawn({ let control_plane = control_plane.clone(); async move { + let mut running = true; loop { - let rand = { - use rand::Rng; - let mut rng = rand::rng(); - rng.random_range(3..8) + tokio::time::sleep(Duration::from_secs(10)).await; + + running = !running; + let desired_state = if running { + DesiredProcessState::Running + } else { + DesiredProcessState::Stopped }; - tokio::time::sleep(Duration::from_secs(rand)).await; - - let random = uuid::Uuid::now_v7(); + tracing::info!("Toggling worker-process to {:?}", desired_state); let _ = control_plane .add_manifest(Manifest { - name: "initial-deployment".into(), + name: "worker-process".into(), metadata: ManifestMetadata {}, - spec: Specifications::Deployment(DeploymentControllerManifest { - name: format!("app-{}", &random.to_string()), + spec: Specifications::Process(ProcessManifest { + name: "data-worker".into(), + desired_state, }), }) .await; @@ -77,44 +84,212 @@ async fn main() -> anyhow::Result<()> { Ok(()) } +// The operator that manages processes via noprocess #[derive(Clone)] -pub struct MyOperator {} +pub struct ProcessOperator { + process_manager: ProcessManager, + // Maps manifest name -> process handle ID in the ProcessManager + process_ids: Arc>>, +} -impl Operator for MyOperator { +impl ProcessOperator { + pub fn new(process_manager: ProcessManager) -> Self { + Self { + process_manager, + process_ids: Arc::new(RwLock::new(HashMap::new())), + } + } +} + +impl Operator for ProcessOperator { type Specifications = Specifications; async fn reconcile( &self, - desired_manifest: &mut ManifestState, + manifest_state: &mut ManifestState, ) -> anyhow::Result { - match &desired_manifest.manifest.spec { - Specifications::Deployment(spec) => { - tracing::info!( - "reconciliation was called for name = {}, value = {}", - desired_manifest.manifest.name, - spec.name - ) + match &manifest_state.manifest.spec { + Specifications::Process(spec) => { + let manifest_name = &manifest_state.manifest.name; + + // Get or create the process + let process_id = { + let ids = self.process_ids.read().await; + ids.get(manifest_name).cloned() + }; + + match spec.desired_state { + DesiredProcessState::Running => { + if let Some(id) = &process_id { + // Check if process is already running + let status = self.process_manager.get_process_state(id).await; + match status { + Some(ProcessState::Running) => { + // Process is running as desired + manifest_state.status.status = ManifestStatusState::Running; + tracing::info!( + "Process {} is running as desired", + spec.name + ); + } + Some(ProcessState::Pending) | Some(ProcessState::Stopped) => { + // Process is pending or stopped, start it + tracing::info!( + "Process {} is pending/stopped, starting", + spec.name + ); + self.process_manager.start_process(id).await?; + manifest_state.status.status = ManifestStatusState::Started; + } + Some(ProcessState::Errored) => { + // Process errored, update status and try to restart + tracing::warn!( + "Process {} errored, attempting restart", + spec.name + ); + manifest_state.status.status = ManifestStatusState::Errored; + // Try to restart + let _ = self.process_manager.restart_process(id).await; + } + None => { + // Process doesn't exist in manager, recreate + tracing::info!( + "Process {} not found, creating new", + spec.name + ); + let new_id = self + .process_manager + .add_process(Process::new(WorkerProcess { + name: spec.name.clone(), + })) + .await; + self.process_manager.start_process(&new_id).await?; + self.process_ids + .write() + .await + .insert(manifest_name.clone(), new_id); + manifest_state.status.status = ManifestStatusState::Started; + } + } + } else { + // No process exists yet, create and start one + tracing::info!("Creating and starting process {}", spec.name); + let id = self + .process_manager + .add_process(Process::new(WorkerProcess { + name: spec.name.clone(), + })) + .await; + self.process_manager.start_process(&id).await?; + self.process_ids + .write() + .await + .insert(manifest_name.clone(), id); + manifest_state.status.status = ManifestStatusState::Started; + } + } + DesiredProcessState::Stopped => { + if let Some(id) = &process_id { + let status = self.process_manager.get_process_state(id).await; + match status { + Some(ProcessState::Running) => { + // Process is running but should be stopped + tracing::info!( + "Stopping process {} as requested", + spec.name + ); + manifest_state.status.status = ManifestStatusState::Stopping; + self.process_manager.stop_process(id).await?; + } + Some(ProcessState::Pending) | Some(ProcessState::Stopped) => { + // Already pending/stopped as desired + manifest_state.status.status = ManifestStatusState::Pending; + tracing::info!( + "Process {} is pending/stopped as desired", + spec.name + ); + } + Some(ProcessState::Errored) => { + // Errored and should be stopped - that's fine + tracing::info!( + "Process {} errored, desired state is stopped", + spec.name + ); + manifest_state.status.status = ManifestStatusState::Errored; + } + None => { + // Doesn't exist, which is fine for stopped state + manifest_state.status.status = ManifestStatusState::Pending; + } + } + } else { + // No process exists, which is fine for stopped state + manifest_state.status.status = ManifestStatusState::Pending; + } + } + } } } - Ok(Action::Requeue(std::time::Duration::from_secs(10))) + Ok(Action::Requeue(Duration::from_secs(5))) } } +// A simple worker process that does periodic work +#[derive(Clone)] +struct WorkerProcess { + name: String, +} + +impl ProcessHandler for WorkerProcess { + fn call(&self, cancel: CancellationToken) -> impl Future + Send { + let name = self.name.clone(); + async move { + tracing::info!("Worker {} started", name); + let mut iteration = 0u64; + + loop { + tokio::select! { + _ = cancel.cancelled() => { + tracing::info!("Worker {} received cancellation, shutting down gracefully", name); + break; + } + _ = tokio::time::sleep(Duration::from_secs(2)) => { + iteration += 1; + tracing::info!("Worker {} iteration {}: doing work...", name, iteration); + } + } + } + + tracing::info!("Worker {} stopped", name); + Ok(()) + } + } +} + +// Specifications enum for the operator #[derive(Clone, Serialize, Deserialize)] pub enum Specifications { - Deployment(DeploymentControllerManifest), + Process(ProcessManifest), } impl Specification for Specifications { fn kind(&self) -> &'static str { match self { - Specifications::Deployment(_) => "deployment", + Specifications::Process(_) => "process", } } } +// The manifest specification for a process #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct DeploymentControllerManifest { - name: String, +pub struct ProcessManifest { + pub name: String, + pub desired_state: DesiredProcessState, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +pub enum DesiredProcessState { + Running, + Stopped, }