From 41b64791375e9fee75c66376fbf51290bc527eca Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sat, 17 Jan 2026 00:19:18 +0100 Subject: [PATCH] feat: prepare for alternative stores Signed-off-by: kjuulh --- crates/nocontrol-tui/src/lib.rs | 65 ++++-- crates/nocontrol/src/control_plane.rs | 21 +- .../src/control_plane/backing_store.rs | 206 +++++------------- .../control_plane/backing_store/in_process.rs | 177 +++++++++++++++ .../nocontrol/src/control_plane/reconciler.rs | 10 +- crates/nocontrol/src/lib.rs | 4 + crates/nocontrol/src/operator.rs | 2 +- crates/nocontrol/tests/mod.rs | 5 +- examples/kubernetes-like/src/main.rs | 3 +- mise.toml | 2 +- 10 files changed, 316 insertions(+), 179 deletions(-) create mode 100644 crates/nocontrol/src/control_plane/backing_store/in_process.rs diff --git a/crates/nocontrol-tui/src/lib.rs b/crates/nocontrol-tui/src/lib.rs index 58a8b79..5160098 100644 --- a/crates/nocontrol-tui/src/lib.rs +++ b/crates/nocontrol-tui/src/lib.rs @@ -11,7 +11,9 @@ use crossterm::{ }; use fuzzy_matcher::FuzzyMatcher; use fuzzy_matcher::skim::SkimMatcherV2; -use nocontrol::{ControlPlane, Operator, Specification, manifests::ManifestState}; +use nocontrol::{ + ControlPlane, Operator, Specification, manifests::ManifestState, stores::BackingStoreEdge, +}; use ratatui::{ Frame, Terminal, backend::{Backend, CrosstermBackend}, @@ -28,8 +30,8 @@ enum InputMode { Search, } -struct App { - control_plane: ControlPlane, +struct App> { + control_plane: ControlPlane, manifests: Vec>, filtered_indices: Vec, list_state: ListState, @@ -47,8 +49,10 @@ struct App { fuzzy_matcher: SkimMatcherV2, } -impl App { - fn new(control_plane: ControlPlane) -> Self { +impl> + App +{ + fn new(control_plane: ControlPlane) -> Self { let mut list_state = ListState::default(); list_state.select(Some(0)); @@ -309,9 +313,13 @@ impl App { } } -async fn run_app( +async fn run_app< + B: Backend, + TOperator: Operator + Send + Sync + 'static, + TStore: BackingStoreEdge + Send + Sync + 'static, +>( terminal: &mut Terminal, - app: Arc>>, + app: Arc>>, ) -> anyhow::Result<()> where TOperator::Specifications: Send + Sync, @@ -414,7 +422,10 @@ where Ok(()) } -fn ui(f: &mut Frame, app: &mut App) { +fn ui>( + f: &mut Frame, + app: &mut App, +) { // Create layout let chunks = Layout::default() .direction(Direction::Vertical) @@ -472,7 +483,14 @@ fn ui(f: &mut Frame, app: &mut App) { render_command_input(f, app, chunks[3]); } -fn render_manifest_list(f: &mut Frame, app: &mut App, area: Rect) { +fn render_manifest_list< + TOperator: Operator, + TStore: BackingStoreEdge, +>( + f: &mut Frame, + app: &mut App, + area: Rect, +) { // Collect filtered manifests data before borrowing list_state let filtered_data: Vec<_> = app .filtered_indices @@ -530,7 +548,14 @@ fn render_manifest_list(f: &mut Frame, app: &mut App(f: &mut Frame, app: &App, area: Rect) { +fn render_manifest_details< + TOperator: Operator, + TStore: BackingStoreEdge, +>( + f: &mut Frame, + app: &App, + area: Rect, +) { let content = if let Some(manifest) = app.get_selected_manifest() { let mut lines = vec![ Line::from(vec![ @@ -599,7 +624,11 @@ fn render_manifest_details(f: &mut Frame, app: &App(f: &mut Frame, app: &App, area: Rect) { +fn render_messages>( + f: &mut Frame, + app: &App, + area: Rect, +) { let messages: Vec = app .messages .iter() @@ -616,7 +645,14 @@ fn render_messages(f: &mut Frame, app: &App, are f.render_widget(paragraph, area); } -fn render_command_input(f: &mut Frame, app: &App, area: Rect) { +fn render_command_input< + TOperator: Operator, + TStore: BackingStoreEdge, +>( + f: &mut Frame, + app: &App, + area: Rect, +) { let (title, input_text, style) = match app.input_mode { InputMode::Normal => { let hist_info = if let Some(idx) = app.history_index { @@ -650,10 +686,13 @@ fn render_command_input(f: &mut Frame, app: &App } /// Run the TUI with the given control plane -pub async fn run(control_plane: ControlPlane) -> anyhow::Result<()> +pub async fn run( + control_plane: ControlPlane, +) -> anyhow::Result<()> where TOperator: Operator + Send + Sync + 'static, TOperator::Specifications: Send + Sync, + TStore: BackingStoreEdge + Send + Sync + 'static, { // Setup terminal enable_raw_mode()?; diff --git a/crates/nocontrol/src/control_plane.rs b/crates/nocontrol/src/control_plane.rs index 67afd14..6ac4c59 100644 --- a/crates/nocontrol/src/control_plane.rs +++ b/crates/nocontrol/src/control_plane.rs @@ -2,7 +2,10 @@ use tokio_util::sync::CancellationToken; use crate::{ Operator, OperatorState, - control_plane::{backing_store::BackingStore, reconciler::Reconciler}, + control_plane::{ + backing_store::{BackingStore, BackingStoreEdge}, + reconciler::Reconciler, + }, manifests::{Manifest, ManifestState}, }; @@ -10,18 +13,22 @@ pub mod backing_store; pub mod reconciler; #[derive(Clone)] -pub struct ControlPlane { - reconciler: Reconciler, +pub struct ControlPlane> { + reconciler: Reconciler, worker_id: uuid::Uuid, - store: BackingStore, + store: BackingStore, deadline: Option, } -impl ControlPlane { - pub fn new(operator: OperatorState) -> Self { +impl> + ControlPlane +{ + pub fn new( + operator: OperatorState, + store: BackingStore, + ) -> Self { let worker_id = uuid::Uuid::now_v7(); - let store = BackingStore::::new(); let reconciler = Reconciler::new(worker_id, &store, operator); diff --git a/crates/nocontrol/src/control_plane/backing_store.rs b/crates/nocontrol/src/control_plane/backing_store.rs index c4c6d5d..eb580ff 100644 --- a/crates/nocontrol/src/control_plane/backing_store.rs +++ b/crates/nocontrol/src/control_plane/backing_store.rs @@ -1,174 +1,80 @@ -use std::sync::Arc; +use std::{marker::PhantomData, ops::Deref}; -use jiff::ToSpan; -use sha2::{Digest, Sha256}; -use tokio::sync::RwLock; +use uuid::Uuid; use crate::{ Specification, - manifests::{ - Manifest, ManifestChangeEvent, ManifestChangeEventType, ManifestLease, ManifestState, - ManifestStatus, ManifestStatusState, WorkerId, - }, + control_plane::backing_store::in_process::BackingStoreInProcess, + manifests::{Manifest, ManifestState, WorkerId}, }; +pub mod in_process; + #[derive(Clone)] -pub struct BackingStore { - manifests: Arc>>>, +pub struct BackingStore> { + inner: TStore, + + _marker: PhantomData, +} +impl> BackingStore { + pub fn new(store: TStore) -> Self { + Self { + inner: store, + _marker: PhantomData, + } + } } -impl BackingStore { - pub fn new() -> Self { +impl> Deref for BackingStore { + type Target = TStore; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl BackingStore> { + pub fn in_process() -> Self { Self { - manifests: Arc::new(RwLock::new(Vec::new())), + inner: BackingStoreInProcess::new(), + _marker: PhantomData, } } +} - pub async fn get_owned_and_potential_leases( +pub trait BackingStoreEdge: Send + Sync + Clone { + fn get_owned_and_potential_leases( &self, - worker_id: &WorkerId, - ) -> anyhow::Result>> { - let now = jiff::Timestamp::now().checked_sub(10.second())?; - let manifests = self - .manifests - .read() - .await - .iter() - .filter(|m| match &m.lease { - Some(lease) if lease.last_seen < now => true, - Some(lease) if &lease.owner == worker_id && lease.last_seen > now => true, - Some(_lease) => false, - None => true, - }) - .cloned() - .collect::>(); + worker_id: &Uuid, + ) -> impl std::future::Future>>> + Send; - Ok(manifests) - } + fn get_manifests( + &self, + ) -> impl std::future::Future>>> + Send; - pub async fn get_manifests(&self) -> anyhow::Result>> { - Ok(self.manifests.read().await.clone()) - } + fn get( + &self, + name: &str, + ) -> impl std::future::Future>>> + Send; - pub async fn get(&self, name: &str) -> anyhow::Result>> { - Ok(self - .manifests - .read() - .await - .iter() - .find(|m| m.manifest.name == name) - .cloned()) - } + fn update_lease( + &self, + manifest_state: &ManifestState, + ) -> impl std::future::Future> + Send; - pub async fn update_lease(&self, manifest_state: &ManifestState) -> anyhow::Result<()> { - tracing::trace!(manifest_state.manifest.name, "updating lease"); - let mut manifests = self.manifests.write().await; - - match manifests - .iter_mut() - .find(|m| m.manifest.name == manifest_state.manifest.name) - { - Some(manifest) => { - let mut manifest_state = manifest_state.clone(); - if let Some(lease) = manifest_state.lease.as_mut() { - lease.last_seen = jiff::Timestamp::now(); - } - - manifest.lease = manifest_state.lease - } - None => anyhow::bail!("manifest is not found"), - } - - Ok(()) - } - - pub async fn acquire_lease( + fn acquire_lease( &self, manifest_state: &ManifestState, worker_id: &WorkerId, - ) -> anyhow::Result<()> { - tracing::trace!(manifest_state.manifest.name, "acquiring lease"); - let mut manifests = self.manifests.write().await; + ) -> impl std::future::Future> + Send; - match manifests - .iter_mut() - .find(|m| m.manifest.name == manifest_state.manifest.name) - { - Some(manifest) => { - let mut manifest_state = manifest_state.clone(); - manifest_state.lease = Some(ManifestLease { - owner: *worker_id, - last_seen: jiff::Timestamp::now(), - }); - manifest.lease = manifest_state.lease - } - None => anyhow::bail!("manifest is not found"), - } + fn upsert_manifest( + &self, + manifest: Manifest, + ) -> impl std::future::Future> + Send; - Ok(()) - } - - pub async fn upsert_manifest(&self, manifest: Manifest) -> anyhow::Result<()> { - let mut manifests = self.manifests.write().await; - - let now = jiff::Timestamp::now(); - match manifests - .iter_mut() - .find(|m| m.manifest.name == manifest.name) - { - Some(current_manifest) => { - tracing::debug!("updating manifest"); - - current_manifest.manifest = manifest; - current_manifest.updated = now; - } - None => { - tracing::debug!("adding manifest"); - let content = serde_json::to_vec(&manifest)?; - let output = Sha256::digest(&content); - - manifests.push(ManifestState { - manifest, - manifest_hash: output[..].to_vec(), - generation: 0, - status: ManifestStatus { - status: ManifestStatusState::Pending, - events: Vec::default(), - changes: vec![ManifestChangeEvent { - created: now, - handled: false, - event: ManifestChangeEventType::Changed, - }], - }, - created: now, - updated: now, - lease: None, - }); - } - } - - Ok(()) - } - - pub async fn update_state(&self, manifest: &ManifestState) -> anyhow::Result<()> { - let mut manifests = self.manifests.write().await; - - let Some(current_manifest) = manifests - .iter_mut() - .find(|m| m.manifest.name == manifest.manifest.name) - else { - anyhow::bail!( - "manifest state was not found for {}", - manifest.manifest.name - ) - }; - - let manifest = manifest.clone(); - - current_manifest.generation += 1; - current_manifest.status = manifest.status; - current_manifest.updated = manifest.updated; - - Ok(()) - } + fn update_state( + &self, + manifest: &ManifestState, + ) -> impl std::future::Future> + Send; } diff --git a/crates/nocontrol/src/control_plane/backing_store/in_process.rs b/crates/nocontrol/src/control_plane/backing_store/in_process.rs new file mode 100644 index 0000000..90c646d --- /dev/null +++ b/crates/nocontrol/src/control_plane/backing_store/in_process.rs @@ -0,0 +1,177 @@ +use std::sync::Arc; + +use jiff::ToSpan; +use sha2::{Digest, Sha256}; +use tokio::sync::RwLock; + +use crate::{ + Specification, + control_plane::backing_store::BackingStoreEdge, + manifests::{ + Manifest, ManifestChangeEvent, ManifestChangeEventType, ManifestLease, ManifestState, + ManifestStatus, ManifestStatusState, WorkerId, + }, +}; + +#[derive(Clone, Default)] +pub struct BackingStoreInProcess { + manifests: Arc>>>, +} + +impl BackingStoreEdge for BackingStoreInProcess { + async fn get_owned_and_potential_leases( + &self, + worker_id: &WorkerId, + ) -> anyhow::Result>> { + let now = jiff::Timestamp::now().checked_sub(10.second())?; + let manifests = self + .manifests + .read() + .await + .iter() + .filter(|m| match &m.lease { + Some(lease) if lease.last_seen < now => true, + Some(lease) if &lease.owner == worker_id && lease.last_seen > now => true, + Some(_lease) => false, + None => true, + }) + .cloned() + .collect::>(); + + Ok(manifests) + } + + async fn get_manifests(&self) -> anyhow::Result>> { + Ok(self.manifests.read().await.clone()) + } + + async fn get(&self, name: &str) -> anyhow::Result>> { + Ok(self + .manifests + .read() + .await + .iter() + .find(|m| m.manifest.name == name) + .cloned()) + } + + async fn update_lease(&self, manifest_state: &ManifestState) -> anyhow::Result<()> { + tracing::trace!(manifest_state.manifest.name, "updating lease"); + let mut manifests = self.manifests.write().await; + + match manifests + .iter_mut() + .find(|m| m.manifest.name == manifest_state.manifest.name) + { + Some(manifest) => { + let mut manifest_state = manifest_state.clone(); + if let Some(lease) = manifest_state.lease.as_mut() { + lease.last_seen = jiff::Timestamp::now(); + } + + manifest.lease = manifest_state.lease + } + None => anyhow::bail!("manifest is not found"), + } + + Ok(()) + } + + async fn acquire_lease( + &self, + manifest_state: &ManifestState, + worker_id: &WorkerId, + ) -> anyhow::Result<()> { + tracing::trace!(manifest_state.manifest.name, "acquiring lease"); + let mut manifests = self.manifests.write().await; + + match manifests + .iter_mut() + .find(|m| m.manifest.name == manifest_state.manifest.name) + { + Some(manifest) => { + let mut manifest_state = manifest_state.clone(); + manifest_state.lease = Some(ManifestLease { + owner: *worker_id, + last_seen: jiff::Timestamp::now(), + }); + manifest.lease = manifest_state.lease + } + None => anyhow::bail!("manifest is not found"), + } + + Ok(()) + } + + async fn upsert_manifest(&self, manifest: Manifest) -> anyhow::Result<()> { + let mut manifests = self.manifests.write().await; + + let now = jiff::Timestamp::now(); + match manifests + .iter_mut() + .find(|m| m.manifest.name == manifest.name) + { + Some(current_manifest) => { + tracing::debug!("updating manifest"); + + current_manifest.manifest = manifest; + current_manifest.updated = now; + } + None => { + tracing::debug!("adding manifest"); + let content = serde_json::to_vec(&manifest)?; + let output = Sha256::digest(&content); + + manifests.push(ManifestState { + manifest, + manifest_hash: output[..].to_vec(), + generation: 0, + status: ManifestStatus { + status: ManifestStatusState::Pending, + events: Vec::default(), + changes: vec![ManifestChangeEvent { + created: now, + handled: false, + event: ManifestChangeEventType::Changed, + }], + }, + created: now, + updated: now, + lease: None, + }); + } + } + + Ok(()) + } + + async fn update_state(&self, manifest: &ManifestState) -> anyhow::Result<()> { + let mut manifests = self.manifests.write().await; + + let Some(current_manifest) = manifests + .iter_mut() + .find(|m| m.manifest.name == manifest.manifest.name) + else { + anyhow::bail!( + "manifest state was not found for {}", + manifest.manifest.name + ) + }; + + let manifest = manifest.clone(); + + current_manifest.generation += 1; + current_manifest.status = manifest.status; + current_manifest.updated = manifest.updated; + + Ok(()) + } +} + +impl BackingStoreInProcess { + pub fn new() -> Self { + Self { + manifests: Arc::new(RwLock::new(Vec::new())), + } + } +} diff --git a/crates/nocontrol/src/control_plane/reconciler.rs b/crates/nocontrol/src/control_plane/reconciler.rs index f77ce0c..039090b 100644 --- a/crates/nocontrol/src/control_plane/reconciler.rs +++ b/crates/nocontrol/src/control_plane/reconciler.rs @@ -4,23 +4,23 @@ use tokio_util::sync::CancellationToken; use crate::{ Operator, OperatorState, - control_plane::backing_store::BackingStore, + control_plane::backing_store::{BackingStore, BackingStoreEdge}, manifests::{Action, ManifestName, ManifestState, ManifestStatusState, WorkerId}, reconcile_queue::ReconcileQueue, }; #[derive(Clone)] -pub struct Reconciler { +pub struct Reconciler> { worker_id: WorkerId, - store: BackingStore, + store: BackingStore, operator: OperatorState, reconcile_queue: ReconcileQueue, } -impl Reconciler { +impl> Reconciler { pub fn new( worker_id: WorkerId, - store: &BackingStore, + store: &BackingStore, operator: OperatorState, ) -> Self { Self { diff --git a/crates/nocontrol/src/lib.rs b/crates/nocontrol/src/lib.rs index cb6d0b2..c3ddd3a 100644 --- a/crates/nocontrol/src/lib.rs +++ b/crates/nocontrol/src/lib.rs @@ -1,6 +1,10 @@ mod control_plane; pub use control_plane::ControlPlane; +pub mod stores { + pub use crate::control_plane::backing_store::*; +} + pub mod manifests; mod operator_state; diff --git a/crates/nocontrol/src/operator.rs b/crates/nocontrol/src/operator.rs index 2f20fb8..094f9c8 100644 --- a/crates/nocontrol/src/operator.rs +++ b/crates/nocontrol/src/operator.rs @@ -2,7 +2,7 @@ use serde::{Serialize, de::DeserializeOwned}; use crate::manifests::{Action, ManifestState}; -pub trait Specification: Clone + Serialize + DeserializeOwned { +pub trait Specification: Clone + Serialize + DeserializeOwned + Send + Sync + 'static { fn kind(&self) -> &'static str; } diff --git a/crates/nocontrol/tests/mod.rs b/crates/nocontrol/tests/mod.rs index e918b0e..3660683 100644 --- a/crates/nocontrol/tests/mod.rs +++ b/crates/nocontrol/tests/mod.rs @@ -1,6 +1,7 @@ use nocontrol::{ Operator, OperatorState, Specification, manifests::{Action, Manifest, ManifestMetadata, ManifestState}, + stores::BackingStore, }; use serde::{Deserialize, Serialize}; use tracing_test::traced_test; @@ -9,7 +10,9 @@ use tracing_test::traced_test; #[traced_test] async fn test_can_run_reconciler() -> anyhow::Result<()> { let operator = OperatorState::new(MyOperator {}); - let mut control_plane = nocontrol::ControlPlane::new(operator); + let store = BackingStore::in_process(); + + let mut control_plane = nocontrol::ControlPlane::new(operator, store); control_plane.with_deadline(std::time::Duration::from_secs(3)); tokio::spawn({ diff --git a/examples/kubernetes-like/src/main.rs b/examples/kubernetes-like/src/main.rs index a76573b..f5defa5 100644 --- a/examples/kubernetes-like/src/main.rs +++ b/examples/kubernetes-like/src/main.rs @@ -3,6 +3,7 @@ use std::{collections::HashMap, future::Future, sync::Arc, time::Duration}; use nocontrol::{ Operator, OperatorState, Specification, manifests::{Action, Manifest, ManifestMetadata, ManifestState, ManifestStatusState}, + stores::BackingStore, }; use noprocess::{HandleID, Process, ProcessHandler, ProcessManager, ProcessResult, ProcessState}; use serde::{Deserialize, Serialize}; @@ -25,7 +26,7 @@ async fn main() -> anyhow::Result<()> { let process_manager = ProcessManager::new(); let operator = OperatorState::new(ProcessOperator::new(process_manager)); - let control_plane = nocontrol::ControlPlane::new(operator); + let control_plane = nocontrol::ControlPlane::new(operator, BackingStore::in_process()); // Add initial process manifest - desired state is Running control_plane diff --git a/mise.toml b/mise.toml index 0d41068..d416c01 100644 --- a/mise.toml +++ b/mise.toml @@ -7,4 +7,4 @@ run = "cargo nextest run" [tasks.example] alias = "e" -run = "cargo run --example kubernetes-like" +run = "cargo run --bin kubernetes-like"