From f73b8c7796c0dbf832249f741556b1aa6e48e857 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Tue, 6 Jan 2026 22:06:54 +0100 Subject: [PATCH] feat: add reconciliation loop Signed-off-by: kjuulh --- crates/nocontrol-tui/Cargo.toml | 2 +- crates/nocontrol-tui/src/lib.rs | 259 +++++++++------- crates/nocontrol/Cargo.toml | 2 +- .../examples/kubernetes-like/main.rs | 22 +- crates/nocontrol/src/control_plane.rs | 4 +- .../src/control_plane/backing_store.rs | 12 +- .../nocontrol/src/control_plane/reconciler.rs | 248 +++++++++++---- crates/nocontrol/src/lib.rs | 25 +- crates/nocontrol/src/manifests.rs | 13 +- crates/nocontrol/src/operator.rs | 26 ++ crates/nocontrol/src/operator_state.rs | 94 ++++++ crates/nocontrol/src/reconcile_queue.rs | 284 ++++++++++++++++++ crates/nocontrol/tests/mod.rs | 20 +- 13 files changed, 790 insertions(+), 221 deletions(-) create mode 100644 crates/nocontrol/src/operator.rs create mode 100644 crates/nocontrol/src/operator_state.rs create mode 100644 crates/nocontrol/src/reconcile_queue.rs diff --git a/crates/nocontrol-tui/Cargo.toml b/crates/nocontrol-tui/Cargo.toml index b5cb693..075653a 100644 --- a/crates/nocontrol-tui/Cargo.toml +++ b/crates/nocontrol-tui/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "nocontrol-tui" version = "0.1.0" -edition = "2021" +edition = "2024" [lib] name = "nocontrol_tui" diff --git a/crates/nocontrol-tui/src/lib.rs b/crates/nocontrol-tui/src/lib.rs index 7367bd0..631c8d4 100644 --- a/crates/nocontrol-tui/src/lib.rs +++ b/crates/nocontrol-tui/src/lib.rs @@ -3,20 +3,22 @@ use std::sync::Arc; use std::time::Duration; use crossterm::{ - event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode, KeyEventKind, KeyModifiers}, + event::{ + self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode, KeyEventKind, KeyModifiers, + }, execute, - terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, + terminal::{EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode}, }; use fuzzy_matcher::FuzzyMatcher; use fuzzy_matcher::skim::SkimMatcherV2; -use nocontrol::{manifests::ManifestState, ControlPlane, Operator, Specification}; +use nocontrol::{ControlPlane, Operator, Specification, manifests::ManifestState}; use ratatui::{ + Frame, Terminal, backend::{Backend, CrosstermBackend}, layout::{Constraint, Direction, Layout, Rect}, style::{Color, Modifier, Style}, text::{Line, Span, Text}, widgets::{Block, Borders, List, ListItem, ListState, Paragraph, Wrap}, - Frame, Terminal, }; use tokio::sync::Mutex; @@ -265,22 +267,36 @@ impl App { } ["help"] => { self.messages.push("Commands:".to_string()); - self.messages.push(" get, list - Refresh manifest list".to_string()); - self.messages.push(" describe - Show selected manifest details".to_string()); - self.messages.push(" history - Show command history".to_string()); - self.messages.push(" clear - Clear output messages".to_string()); - self.messages.push(" quit, exit, q - Exit application".to_string()); - self.messages.push(" help - Show this help".to_string()); + self.messages + .push(" get, list - Refresh manifest list".to_string()); + self.messages + .push(" describe - Show selected manifest details".to_string()); + self.messages + .push(" history - Show command history".to_string()); + self.messages + .push(" clear - Clear output messages".to_string()); + self.messages + .push(" quit, exit, q - Exit application".to_string()); + self.messages + .push(" help - Show this help".to_string()); self.messages.push("".to_string()); self.messages.push("Keys:".to_string()); - self.messages.push(" / - Toggle search mode".to_string()); - self.messages.push(" ↑/↓ - Navigate list OR cycle command history".to_string()); - self.messages.push(" Enter - Execute command (or exit search)".to_string()); - self.messages.push(" Esc - Clear input / exit search".to_string()); - self.messages.push(" q - Quick quit (when input empty)".to_string()); + self.messages + .push(" / - Toggle search mode".to_string()); + self.messages + .push(" ↑/↓ - Navigate list OR cycle command history".to_string()); + self.messages + .push(" Enter - Execute command (or exit search)".to_string()); + self.messages + .push(" Esc - Clear input / exit search".to_string()); + self.messages + .push(" q - Quick quit (when input empty)".to_string()); } _ => { - self.messages.push(format!("Unknown command: {}. Type 'help' for commands.", cmd)); + self.messages.push(format!( + "Unknown command: {}. Type 'help' for commands.", + cmd + )); } } @@ -321,76 +337,76 @@ where } // Handle input with timeout - if event::poll(Duration::from_millis(100))? { - if let Event::Key(key) = event::read()? { - if key.kind == KeyEventKind::Press { - let mut app = app.lock().await; + if event::poll(Duration::from_millis(100))? + && let Event::Key(key) = event::read()? + && key.kind == KeyEventKind::Press + { + let mut app = app.lock().await; - match (key.code, key.modifiers, app.input_mode) { - // Quick quit with 'q' when input is empty - (KeyCode::Char('q'), KeyModifiers::NONE, InputMode::Normal) - if app.command_input.is_empty() => { - app.should_quit = true; - } - - // Toggle search mode with '/' - (KeyCode::Char('/'), KeyModifiers::NONE, _) => { - app.toggle_search_mode(); - } - - // Character input - (KeyCode::Char(c), _, _) => { - app.command_input.push(c); - app.update_search(); - } - - // Backspace - (KeyCode::Backspace, _, _) => { - app.command_input.pop(); - app.update_search(); - } - - // Enter key - (KeyCode::Enter, _, InputMode::Search) => { - // Exit search mode but keep filter - app.input_mode = InputMode::Normal; - app.command_input.clear(); - } - (KeyCode::Enter, _, InputMode::Normal) => { - app.execute_command().await; - } - - // Arrow keys - (KeyCode::Up, _, InputMode::Normal) if !app.command_input.is_empty() => { - // Navigate history when typing a command - app.history_previous(); - } - (KeyCode::Down, _, InputMode::Normal) if !app.command_input.is_empty() => { - // Navigate history when typing a command - app.history_next(); - } - (KeyCode::Up, _, _) => { - // Navigate manifest list - app.previous(); - } - (KeyCode::Down, _, _) => { - // Navigate manifest list - app.next(); - } - - // Escape key - (KeyCode::Esc, _, InputMode::Search) => { - app.toggle_search_mode(); - app.command_input.clear(); - } - (KeyCode::Esc, _, InputMode::Normal) => { - app.command_input.clear(); - app.history_index = None; - } - - _ => {} - } + match (key.code, key.modifiers, app.input_mode) { + // Quick quit with 'q' when input is empty + (KeyCode::Char('q'), KeyModifiers::NONE, InputMode::Normal) + if app.command_input.is_empty() => + { + app.should_quit = true; } + + // Toggle search mode with '/' + (KeyCode::Char('/'), KeyModifiers::NONE, _) => { + app.toggle_search_mode(); + } + + // Character input + (KeyCode::Char(c), _, _) => { + app.command_input.push(c); + app.update_search(); + } + + // Backspace + (KeyCode::Backspace, _, _) => { + app.command_input.pop(); + app.update_search(); + } + + // Enter key + (KeyCode::Enter, _, InputMode::Search) => { + // Exit search mode but keep filter + app.input_mode = InputMode::Normal; + app.command_input.clear(); + } + (KeyCode::Enter, _, InputMode::Normal) => { + app.execute_command().await; + } + + // Arrow keys + (KeyCode::Up, _, InputMode::Normal) if !app.command_input.is_empty() => { + // Navigate history when typing a command + app.history_previous(); + } + (KeyCode::Down, _, InputMode::Normal) if !app.command_input.is_empty() => { + // Navigate history when typing a command + app.history_next(); + } + (KeyCode::Up, _, _) => { + // Navigate manifest list + app.previous(); + } + (KeyCode::Down, _, _) => { + // Navigate manifest list + app.next(); + } + + // Escape key + (KeyCode::Esc, _, InputMode::Search) => { + app.toggle_search_mode(); + app.command_input.clear(); + } + (KeyCode::Esc, _, InputMode::Normal) => { + app.command_input.clear(); + app.history_index = None; + } + + _ => {} } } } @@ -403,10 +419,10 @@ fn ui(f: &mut Frame, app: &mut App) { let chunks = Layout::default() .direction(Direction::Vertical) .constraints([ - Constraint::Length(3), // Title - Constraint::Min(10), // Main content - Constraint::Length(10), // Messages - Constraint::Length(3), // Command input + Constraint::Length(3), // Title + Constraint::Min(10), // Main content + Constraint::Length(10), // Messages + Constraint::Length(3), // Command input ]) .split(f.area()); @@ -415,21 +431,25 @@ fn ui(f: &mut Frame, app: &mut App) { InputMode::Normal => "NORMAL", InputMode::Search => "SEARCH", }; - let title = Paragraph::new(vec![ - Line::from(vec![ - Span::styled( - "NoControl - Kubernetes-like Control Plane", - Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD), - ), - Span::raw(" "), - Span::styled( - format!("[{}]", mode_text), - Style::default() - .fg(if app.input_mode == InputMode::Search { Color::Yellow } else { Color::Green }) - .add_modifier(Modifier::BOLD), - ), - ]), - ]) + let title = Paragraph::new(vec![Line::from(vec![ + Span::styled( + "NoControl - Kubernetes-like Control Plane", + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD), + ), + Span::raw(" "), + Span::styled( + format!("[{}]", mode_text), + Style::default() + .fg(if app.input_mode == InputMode::Search { + Color::Yellow + } else { + Color::Green + }) + .add_modifier(Modifier::BOLD), + ), + ])]) .block(Block::default().borders(Borders::ALL)); f.render_widget(title, chunks[0]); @@ -454,7 +474,8 @@ fn ui(f: &mut Frame, app: &mut App) { fn render_manifest_list(f: &mut Frame, app: &mut App, area: Rect) { // Collect filtered manifests data before borrowing list_state - let filtered_data: Vec<_> = app.filtered_indices + let filtered_data: Vec<_> = app + .filtered_indices .iter() .filter_map(|&idx| app.manifests.get(idx)) .map(|m| (m.manifest.name.clone(), m.status.status.clone())) @@ -469,14 +490,12 @@ fn render_manifest_list(f: &mut Frame, app: &mut App Color::Gray, nocontrol::manifests::ManifestStatusState::Stopping => Color::Magenta, nocontrol::manifests::ManifestStatusState::Deleting => Color::Red, + nocontrol::manifests::ManifestStatusState::Errored => Color::Red, }; let status_text = format!("{:?}", status); let content = Line::from(vec![ - Span::styled( - "● ", - Style::default().fg(status_color), - ), + Span::styled("● ", Style::default().fg(status_color)), Span::raw(name), Span::raw(" "), Span::styled( @@ -526,7 +545,10 @@ fn render_manifest_details(f: &mut Frame, app: &App(f: &mut Frame, app: &App(f: &mut Frame, app: &App(f: &mut Frame, app: &App String::new() }; ( - format!(" Command{} (/ to search, ↑↓ for history, 'help' for commands) ", hist_info), + format!( + " Command{} (/ to search, ↑↓ for history, 'help' for commands) ", + hist_info + ), app.command_input.as_str(), Style::default().fg(Color::Yellow), ) @@ -604,7 +635,9 @@ fn render_command_input(f: &mut Frame, app: &App InputMode::Search => ( " Search (fuzzy) - Enter to apply, Esc to cancel ".to_string(), app.command_input.as_str(), - Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD), + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD), ), }; diff --git a/crates/nocontrol/Cargo.toml b/crates/nocontrol/Cargo.toml index 82dcd00..f03ad6c 100644 --- a/crates/nocontrol/Cargo.toml +++ b/crates/nocontrol/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "nocontrol" version = "0.1.0" -edition = "2021" +edition = "2024" [dependencies] anyhow.workspace = true diff --git a/crates/nocontrol/examples/kubernetes-like/main.rs b/crates/nocontrol/examples/kubernetes-like/main.rs index 9814077..86ebf40 100644 --- a/crates/nocontrol/examples/kubernetes-like/main.rs +++ b/crates/nocontrol/examples/kubernetes-like/main.rs @@ -1,9 +1,8 @@ use std::time::Duration; -use async_trait::async_trait; use nocontrol::{ - manifests::{Manifest, ManifestMetadata, ManifestState}, - Operator, Specification, + Operator, OperatorState, Specification, + manifests::{Action, Manifest, ManifestMetadata, ManifestState}, }; use serde::{Deserialize, Serialize}; use tracing_subscriber::EnvFilter; @@ -22,7 +21,7 @@ async fn main() -> anyhow::Result<()> { .without_time() .init(); - let operator = MyOperator {}; + let operator = OperatorState::new(MyOperator {}); let control_plane = nocontrol::ControlPlane::new(operator); // Add initial manifest @@ -56,7 +55,7 @@ async fn main() -> anyhow::Result<()> { name: "initial-deployment".into(), metadata: ManifestMetadata {}, spec: Specifications::Deployment(DeploymentControllerManifest { - name: format!("app-{}", &random.to_string()[..8]), + name: format!("app-{}", &random.to_string()), }), }) .await; @@ -81,19 +80,13 @@ async fn main() -> anyhow::Result<()> { #[derive(Clone)] pub struct MyOperator {} -#[async_trait] impl Operator for MyOperator { type Specifications = Specifications; async fn reconcile( &self, desired_manifest: &mut ManifestState, - ) -> anyhow::Result<()> { - let now = jiff::Timestamp::now(); - - desired_manifest.status.status = nocontrol::manifests::ManifestStatusState::Started; - desired_manifest.updated = now; - + ) -> anyhow::Result { match &desired_manifest.manifest.spec { Specifications::Deployment(spec) => { tracing::info!( @@ -104,10 +97,7 @@ impl Operator for MyOperator { } } - desired_manifest.status.status = nocontrol::manifests::ManifestStatusState::Running; - desired_manifest.updated = now; - - Ok(()) + Ok(Action::Requeue(std::time::Duration::from_secs(10))) } } diff --git a/crates/nocontrol/src/control_plane.rs b/crates/nocontrol/src/control_plane.rs index aa3f448..95a872a 100644 --- a/crates/nocontrol/src/control_plane.rs +++ b/crates/nocontrol/src/control_plane.rs @@ -1,9 +1,9 @@ use tokio_util::sync::CancellationToken; use crate::{ + Operator, OperatorState, control_plane::{backing_store::BackingStore, reconciler::Reconciler}, manifests::{Manifest, ManifestState}, - Operator, }; pub mod backing_store; @@ -19,7 +19,7 @@ pub struct ControlPlane { } impl ControlPlane { - pub fn new(operator: TOperator) -> Self { + pub fn new(operator: OperatorState) -> Self { let worker_id = uuid::Uuid::now_v7(); let store = BackingStore::::new(); diff --git a/crates/nocontrol/src/control_plane/backing_store.rs b/crates/nocontrol/src/control_plane/backing_store.rs index 8248e4b..a1f034f 100644 --- a/crates/nocontrol/src/control_plane/backing_store.rs +++ b/crates/nocontrol/src/control_plane/backing_store.rs @@ -5,11 +5,11 @@ use sha2::{Digest, Sha256}; use tokio::sync::RwLock; use crate::{ + Specification, manifests::{ Manifest, ManifestChangeEvent, ManifestChangeEventType, ManifestLease, ManifestState, ManifestStatus, ManifestStatusState, WorkerId, }, - Specification, }; #[derive(Clone)] @@ -46,6 +46,16 @@ impl BackingStore { Ok(self.manifests.read().await.clone()) } + pub async fn get(&self, name: &str) -> anyhow::Result>> { + Ok(self + .manifests + .read() + .await + .iter() + .find(|m| m.manifest.name == name) + .cloned()) + } + pub async fn update_lease(&self, manifest_state: &ManifestState) -> anyhow::Result<()> { tracing::trace!(manifest_state.manifest.name, "updating lease"); let mut manifests = self.manifests.write().await; diff --git a/crates/nocontrol/src/control_plane/reconciler.rs b/crates/nocontrol/src/control_plane/reconciler.rs index 3fba8fe..15df271 100644 --- a/crates/nocontrol/src/control_plane/reconciler.rs +++ b/crates/nocontrol/src/control_plane/reconciler.rs @@ -1,88 +1,228 @@ use anyhow::Context; +use jiff::Timestamp; use tokio_util::sync::CancellationToken; -use crate::{control_plane::backing_store::BackingStore, manifests::WorkerId, Operator}; +use crate::{ + Operator, OperatorState, + control_plane::backing_store::BackingStore, + manifests::{Action, ManifestName, ManifestState, ManifestStatusState, WorkerId}, + reconcile_queue::ReconcileQueue, +}; #[derive(Clone)] pub struct Reconciler { worker_id: WorkerId, store: BackingStore, - operator: T, + operator: OperatorState, + reconcile_queue: ReconcileQueue, } impl Reconciler { - pub fn new(worker_id: WorkerId, store: &BackingStore, operator: T) -> Self { + pub fn new( + worker_id: WorkerId, + store: &BackingStore, + operator: OperatorState, + ) -> Self { Self { worker_id, store: store.clone(), operator, + reconcile_queue: ReconcileQueue::new(), } } - pub async fn reconcile(&self, cancellation_token: &CancellationToken) -> anyhow::Result<()> { - let now = jiff::Timestamp::now(); - tracing::debug!(%self.worker_id, %now, "running reconciler"); + /// Run the reconciler. This starts two concurrent tasks: + /// 1. A sync task that periodically checks for new/changed manifests and enqueues them + /// 2. A worker task that processes the queue and runs reconciliations + pub async fn run(&self, cancellation_token: &CancellationToken) -> anyhow::Result<()> { + let now = Timestamp::now(); + tracing::debug!(%self.worker_id, %now, "starting reconciler"); + tokio::select! { + result = self.sync_manifests(cancellation_token) => { + result.context("sync manifests task failed")?; + } + result = self.process_queue(cancellation_token) => { + result.context("process queue task failed")?; + } + _ = cancellation_token.cancelled() => { + tracing::debug!("reconciler received cancellation"); + } + } + + tracing::debug!("reconciler shutting down"); + Ok(()) + } + + /// Periodically sync manifests from the backing store. + /// Acquires leases and enqueues manifests that need reconciliation. + async fn sync_manifests(&self, cancellation_token: &CancellationToken) -> anyhow::Result<()> { loop { - let now = jiff::Timestamp::now(); if cancellation_token.is_cancelled() { break; } - tracing::trace!(%self.worker_id, %now, "reconciler iteration"); - let mut our_manifests = Vec::new(); - // 1. read manifests from a backing store - for manifest_state in self.store.get_owned_and_potential_leases().await? { - // 3. Lease the manifest - match &manifest_state.lease { - Some(lease) if lease.owner == self.worker_id => { - // We own the lease, update - self.store - .update_lease(&manifest_state) - .await - .context("update lease")?; - our_manifests.push(manifest_state.clone()); - } - None => { - // 2. If no lease - // Acquire lease - self.store - .acquire_lease(&manifest_state, &self.worker_id) - .await - .context("acquire lease")?; - our_manifests.push(manifest_state.clone()); - } - _ => { - // Skipping manifest, as it is not vaid - continue; - } - } - } + let now = Timestamp::now(); + tracing::trace!(%self.worker_id, %now, "syncing manifests"); - // 4. Check desired vs actual - 'manifest: for manifest in our_manifests.iter_mut() { - // Currently periodic sync, - // TODO: this should also be made event based - - if let Some(change) = manifest.status.changes.first() { - if change.handled { - continue 'manifest; - } - } - - self.operator.reconcile(manifest).await?; - self.store.update_state(manifest).await?; - - if let Some(change) = manifest.status.changes.first_mut() { - change.handled = true - } + if let Err(e) = self.sync_once().await { + tracing::warn!(error = %e, "failed to sync manifests"); } tokio::time::sleep(std::time::Duration::from_millis(500)).await; } - tracing::debug!("reconciler shutting down"); + Ok(()) + } + + /// Single sync iteration - check for manifests, acquire leases, enqueue work. + async fn sync_once(&self) -> anyhow::Result<()> { + for manifest_state in self.store.get_owned_and_potential_leases().await? { + let manifest_name = manifest_state.manifest.name.clone(); + + match &manifest_state.lease { + Some(lease) if lease.owner == self.worker_id => { + // We own the lease, update it + self.store + .update_lease(&manifest_state) + .await + .context("update lease")?; + + // Check if there are unhandled changes + if self.needs_reconciliation(&manifest_state) { + self.reconcile_queue.enqueue(manifest_name).await; + } + } + None => { + // No lease, try to acquire + self.store + .acquire_lease(&manifest_state, &self.worker_id) + .await + .context("acquire lease")?; + + // Enqueue for reconciliation + self.reconcile_queue.enqueue(manifest_name).await; + } + _ => { + // Someone else owns the lease, skip + continue; + } + } + } Ok(()) } + + /// Check if a manifest needs reconciliation. + fn needs_reconciliation(&self, manifest: &ManifestState) -> bool { + // Has unhandled changes + if let Some(change) = manifest.status.changes.first() + && !change.handled + { + return true; + } + + false + } + + /// Process the reconciliation queue. + /// Takes items from the queue and reconciles them, re-enqueuing with delay if needed. + async fn process_queue(&self, cancellation_token: &CancellationToken) -> anyhow::Result<()> { + loop { + if cancellation_token.is_cancelled() { + break; + } + + // Wait for the next item from the queue + let manifest_name = tokio::select! { + name = self.reconcile_queue.take() => name, + _ = cancellation_token.cancelled() => break, + }; + + tracing::trace!(%self.worker_id, %manifest_name, "processing manifest from queue"); + + if let Err(e) = self.reconcile_manifest(&manifest_name).await { + tracing::warn!(error = %e, %manifest_name, "failed to reconcile manifest"); + // On error, requeue with a short delay + self.requeue_with_delay(&manifest_name, std::time::Duration::from_secs(5)) + .await; + } + } + + Ok(()) + } + + /// Reconcile a single manifest by name. + async fn reconcile_manifest(&self, manifest_name: &ManifestName) -> anyhow::Result<()> { + // Fetch the current manifest state + let Some(mut manifest) = self.store.get(manifest_name).await? else { + tracing::debug!(%manifest_name, "manifest not found, skipping"); + return Ok(()); + }; + + // Verify we still own the lease + match &manifest.lease { + Some(lease) if lease.owner == self.worker_id => {} + _ => { + tracing::debug!(%manifest_name, "we don't own the lease, skipping"); + return Ok(()); + } + } + + let now = Timestamp::now(); + manifest.status.status = ManifestStatusState::Started; + manifest.updated = now; + + // Run the reconciliation + let action = match self.operator.reconcile(&mut manifest).await { + Ok(action) => action, + Err(e) => { + manifest.status.status = ManifestStatusState::Errored; + manifest.updated = Timestamp::now(); + + // Let the operator handle the error + self.operator.on_error(&mut manifest, &e).await + } + }; + + // Update status + let now = Timestamp::now(); + manifest.status.status = ManifestStatusState::Running; + manifest.updated = now; + + // Mark change as handled + if let Some(change) = manifest.status.changes.first_mut() { + change.handled = true; + } + + // Persist the updated state + self.store.update_state(&manifest).await?; + + // Handle requeue action + match action { + Action::None => {} + Action::Requeue(delay) => { + self.requeue_with_delay(manifest_name, delay).await; + } + } + + Ok(()) + } + + /// Requeue a manifest with a delay. + async fn requeue_with_delay(&self, manifest_name: &ManifestName, delay: std::time::Duration) { + let scheduled_at = Timestamp::now() + .checked_add(jiff::Span::try_from(delay).unwrap_or(jiff::Span::new().seconds(5))) + .unwrap_or_else(|_| Timestamp::now()); + + tracing::trace!(%manifest_name, ?delay, %scheduled_at, "requeuing manifest"); + self.reconcile_queue + .enqueue_at(manifest_name.clone(), scheduled_at) + .await; + } + + // Keep the old method name for backwards compatibility + pub async fn reconcile(&self, cancellation_token: &CancellationToken) -> anyhow::Result<()> { + self.run(cancellation_token).await + } } diff --git a/crates/nocontrol/src/lib.rs b/crates/nocontrol/src/lib.rs index 4b0a47f..d07b90a 100644 --- a/crates/nocontrol/src/lib.rs +++ b/crates/nocontrol/src/lib.rs @@ -1,21 +1,14 @@ +#![feature(associated_type_defaults)] + mod control_plane; +pub use control_plane::ControlPlane; + pub mod manifests; -pub use control_plane::ControlPlane; -use serde::{de::DeserializeOwned, Serialize}; +mod operator_state; +pub use operator_state::*; -use crate::manifests::ManifestState; +mod operator; +pub use operator::*; -pub trait Specification: Clone + Serialize + DeserializeOwned { - fn kind(&self) -> &'static str; -} - -#[async_trait::async_trait] -pub trait Operator { - type Specifications: Specification; - - async fn reconcile( - &self, - desired_manifest: &mut ManifestState, - ) -> anyhow::Result<()>; -} +mod reconcile_queue; diff --git a/crates/nocontrol/src/manifests.rs b/crates/nocontrol/src/manifests.rs index 16c89b1..993c721 100644 --- a/crates/nocontrol/src/manifests.rs +++ b/crates/nocontrol/src/manifests.rs @@ -1,4 +1,4 @@ -use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde::{Deserialize, Serialize, de::DeserializeOwned}; use crate::Specification; @@ -27,13 +27,15 @@ pub struct ManifestLease { pub last_seen: jiff::Timestamp, } +pub type ManifestName = String; + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(bound(serialize = "T: Serialize", deserialize = "T: DeserializeOwned"))] pub struct Manifest where T: Specification, { - pub name: String, + pub name: ManifestName, pub metadata: ManifestMetadata, pub spec: T, } @@ -50,6 +52,7 @@ pub struct ManifestStatus { pub enum ManifestStatusState { Pending, Started, + Errored, Running, Stopping, Deleting, @@ -80,3 +83,9 @@ pub struct ManifestEvent { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ManifestMetadata {} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Action { + None, + Requeue(std::time::Duration), +} diff --git a/crates/nocontrol/src/operator.rs b/crates/nocontrol/src/operator.rs new file mode 100644 index 0000000..fcc77cc --- /dev/null +++ b/crates/nocontrol/src/operator.rs @@ -0,0 +1,26 @@ +use serde::{Serialize, de::DeserializeOwned}; + +use crate::manifests::{Action, ManifestState}; + +pub trait Specification: Clone + Serialize + DeserializeOwned { + fn kind(&self) -> &'static str; +} + +#[allow(dead_code, unused_variables)] +pub trait Operator: Send + Sync + 'static { + type Specifications: Specification; + type Error = anyhow::Error; + + fn reconcile( + &self, + desired_manifest: &mut ManifestState, + ) -> impl Future>; + + fn on_error( + &self, + desired_manifest: &mut ManifestState, + error: &Self::Error, + ) -> impl Future { + async { Action::None } + } +} diff --git a/crates/nocontrol/src/operator_state.rs b/crates/nocontrol/src/operator_state.rs new file mode 100644 index 0000000..ed467cf --- /dev/null +++ b/crates/nocontrol/src/operator_state.rs @@ -0,0 +1,94 @@ +use std::{sync::Arc, time::Duration}; + +use tokio::sync::Mutex; + +use crate::Operator; + +#[derive(Clone)] +pub struct OperatorState { + inner: Arc>, + config: Arc, +} + +impl OperatorState { + pub fn new(operator: T) -> Self { + Self { + inner: Arc::new(Mutex::new(operator)), + config: Default::default(), + } + } + + pub fn new_with_config(operator: T, config: OperatorConfig) -> Self { + Self { + inner: Arc::new(Mutex::new(operator)), + config: Arc::new(config), + } + } + + pub(crate) fn config(&self) -> &OperatorConfig { + &self.config + } +} + +impl Operator for OperatorState { + type Specifications = T::Specifications; + type Error = T::Error; + + async fn reconcile( + &self, + desired_manifest: &mut crate::manifests::ManifestState, + ) -> Result { + self.inner.lock().await.reconcile(desired_manifest).await + } + + async fn on_error( + &self, + desired_manifest: &mut crate::manifests::ManifestState, + error: &Self::Error, + ) -> crate::manifests::Action { + self.inner + .lock() + .await + .on_error(desired_manifest, error) + .await + } +} + +pub struct OperatorConfig { + pub backoff_policy: BackoffPolicy, + pub reconcile_on: Option>, +} + +impl Default for OperatorConfig { + fn default() -> Self { + Self { + backoff_policy: BackoffPolicy::Static { + delay: Duration::from_secs(30), + }, + reconcile_on: Default::default(), + } + } +} + +pub enum BackoffPolicy { + Never, + Expotential { + multiplier: f64, + initial_delay: Duration, + max_delay: Option, + }, + Static { + delay: Duration, + }, + Ramp(Vec), +} + +impl Default for BackoffPolicy { + fn default() -> Self { + Self::Expotential { + multiplier: 2.0, + initial_delay: Duration::from_secs(1), + max_delay: Some(Duration::from_mins(5)), + } + } +} diff --git a/crates/nocontrol/src/reconcile_queue.rs b/crates/nocontrol/src/reconcile_queue.rs new file mode 100644 index 0000000..f26e2cc --- /dev/null +++ b/crates/nocontrol/src/reconcile_queue.rs @@ -0,0 +1,284 @@ +use std::{cmp::Ordering, collections::BinaryHeap, marker::PhantomData, sync::Arc}; + +use jiff::Timestamp; +use tokio::sync::{Mutex, Notify}; + +use crate::{Specification, manifests::ManifestName}; + +#[derive(Debug, Clone, Eq, PartialEq)] +struct QueueEntry { + scheduled_at: Timestamp, + key: ManifestName, +} + +impl Ord for QueueEntry { + fn cmp(&self, other: &Self) -> Ordering { + // Reverse ordering for min-heap (earliest first) + other + .scheduled_at + .cmp(&self.scheduled_at) + .then_with(|| self.key.cmp(&other.key)) + } +} + +impl PartialOrd for QueueEntry { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +struct Inner { + queue: BinaryHeap, +} + +/// A simple sorted idempotent queue for scheduling reconciliation jobs. +/// +/// Each key can only appear once in the queue. If a key is enqueued again +/// with an earlier scheduled time, the existing entry is updated. +/// +/// This queue is Clone, Send, and Sync - it can be shared across threads. +pub struct ReconcileQueue { + inner: Arc>, + notify: Arc, + _marker: PhantomData, +} + +impl Clone for ReconcileQueue { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + notify: Arc::clone(&self.notify), + _marker: PhantomData, + } + } +} + +impl Default for ReconcileQueue { + fn default() -> Self { + Self::new() + } +} + +impl ReconcileQueue { + pub fn new() -> Self { + Self { + inner: Arc::new(Mutex::new(Inner { + queue: BinaryHeap::new(), + })), + notify: Arc::new(Notify::new()), + _marker: PhantomData, + } + } + + /// Enqueue a key to be reconciled now. + pub async fn enqueue(&self, key: ManifestName) { + self.enqueue_at(key, Timestamp::now()).await; + } + + /// Enqueue a key to be reconciled at a specific time. + /// + /// If the key already exists in the queue with a later scheduled time, + /// it will be updated to the earlier time. If it exists with an earlier + /// or equal time, this is a no-op. + pub async fn enqueue_at(&self, key: ManifestName, scheduled_at: Timestamp) { + let mut inner = self.inner.lock().await; + + // Remove any existing entry for this key + let existing: Vec<_> = inner.queue.drain().collect(); + let mut found_earlier = false; + + for entry in existing { + if entry.key == key { + // Keep the earlier scheduled time + if entry.scheduled_at <= scheduled_at { + inner.queue.push(entry); + found_earlier = true; + } + } else { + inner.queue.push(entry); + } + } + + // Add new entry if we didn't find an earlier one + if !found_earlier { + inner.queue.push(QueueEntry { scheduled_at, key }); + } + + self.notify.notify_one(); + } + + /// Try to take the next ready job from the queue. + /// + /// Returns `Some(key)` if there's a job scheduled at or before now, + /// otherwise returns `None`. + pub async fn try_take(&self) -> Option { + let now = Timestamp::now(); + let mut inner = self.inner.lock().await; + + if let Some(entry) = inner.queue.peek() + && entry.scheduled_at <= now + { + return inner.queue.pop().map(|e| e.key); + } + + None + } + + /// Returns how long until the next job is ready, if any. + pub async fn next_ready_in(&self) -> Option { + let now = Timestamp::now(); + let inner = self.inner.lock().await; + + inner.queue.peek().and_then(|entry| { + let span = entry.scheduled_at - now; + if span.is_negative() { + Some(std::time::Duration::ZERO) + } else { + span.try_into().ok() + } + }) + } + + /// Wait for the next job to be ready and return it. + /// + /// This will block until a job is ready. + pub async fn take(&self) -> ManifestName { + loop { + if let Some(key) = self.try_take().await { + return key; + } + + match self.next_ready_in().await { + Some(duration) if duration > std::time::Duration::ZERO => { + tokio::select! { + _ = tokio::time::sleep(duration) => {} + _ = self.notify.notified() => {} + } + } + Some(_) => { + // Duration is zero, should have a ready item + continue; + } + None => { + // Queue is empty, wait for notification + self.notify.notified().await; + } + } + } + } + + /// Returns the number of pending jobs in the queue. + pub async fn len(&self) -> usize { + self.inner.lock().await.queue.len() + } + + /// Returns true if the queue is empty. + pub async fn is_empty(&self) -> bool { + self.inner.lock().await.queue.is_empty() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use jiff::ToSpan; + use serde::{Deserialize, Serialize}; + + #[derive(Clone, Serialize, Deserialize)] + struct TestSpec; + + impl Specification for TestSpec { + fn kind(&self) -> &'static str { + "test" + } + } + + #[tokio::test] + async fn test_enqueue_and_take() { + let queue: ReconcileQueue = ReconcileQueue::new(); + + queue.enqueue("job1".to_string()).await; + queue.enqueue("job2".to_string()).await; + + assert_eq!(queue.len().await, 2); + + let job = queue.try_take().await; + assert!(job.is_some()); + assert_eq!(queue.len().await, 1); + } + + #[tokio::test] + async fn test_idempotent_enqueue() { + let queue: ReconcileQueue = ReconcileQueue::new(); + + queue.enqueue("job1".to_string()).await; + queue.enqueue("job1".to_string()).await; + + assert_eq!(queue.len().await, 1); + } + + #[tokio::test] + async fn test_scheduled_order() { + let queue: ReconcileQueue = ReconcileQueue::new(); + + let now = Timestamp::now(); + let later = now.checked_add(1.hour()).unwrap(); + + queue.enqueue_at("later_job".to_string(), later).await; + queue.enqueue_at("now_job".to_string(), now).await; + + // Should get the "now" job first + let job = queue.try_take().await; + assert_eq!(job, Some("now_job".to_string())); + + // Later job should not be ready yet + let job = queue.try_take().await; + assert!(job.is_none()); + } + + #[tokio::test] + async fn test_enqueue_earlier_updates() { + let queue: ReconcileQueue = ReconcileQueue::new(); + + let now = Timestamp::now(); + let later = now.checked_add(1.hour()).unwrap(); + + // First enqueue for later + queue.enqueue_at("job1".to_string(), later).await; + assert!(queue.try_take().await.is_none()); + + // Re-enqueue for now - should update + queue.enqueue_at("job1".to_string(), now).await; + assert_eq!(queue.try_take().await, Some("job1".to_string())); + } + + #[tokio::test] + async fn test_enqueue_later_no_op() { + let queue: ReconcileQueue = ReconcileQueue::new(); + + let now = Timestamp::now(); + let later = now.checked_add(1.hour()).unwrap(); + + // First enqueue for now + queue.enqueue_at("job1".to_string(), now).await; + + // Try to enqueue for later - should be no-op + queue.enqueue_at("job1".to_string(), later).await; + + // Should still be ready now + assert_eq!(queue.try_take().await, Some("job1".to_string())); + } + + #[tokio::test] + async fn test_clone_shares_state() { + let queue1: ReconcileQueue = ReconcileQueue::new(); + let queue2 = queue1.clone(); + + queue1.enqueue("job1".to_string()).await; + assert_eq!(queue2.len().await, 1); + + let job = queue2.try_take().await; + assert_eq!(job, Some("job1".to_string())); + assert!(queue1.is_empty().await); + } +} diff --git a/crates/nocontrol/tests/mod.rs b/crates/nocontrol/tests/mod.rs index 7cd18dc..cf250a2 100644 --- a/crates/nocontrol/tests/mod.rs +++ b/crates/nocontrol/tests/mod.rs @@ -1,7 +1,6 @@ -use async_trait::async_trait; use nocontrol::{ - manifests::{Manifest, ManifestMetadata, ManifestState}, - Operator, Specification, + Operator, OperatorState, Specification, + manifests::{Action, Manifest, ManifestMetadata, ManifestState}, }; use serde::{Deserialize, Serialize}; use tracing_test::traced_test; @@ -9,8 +8,7 @@ use tracing_test::traced_test; #[tokio::test] #[traced_test] async fn test_can_run_reconciler() -> anyhow::Result<()> { - let operator = MyOperator {}; - + let operator = OperatorState::new(MyOperator {}); let mut control_plane = nocontrol::ControlPlane::new(operator); control_plane.with_deadline(std::time::Duration::from_secs(3)); @@ -51,19 +49,13 @@ async fn test_can_run_reconciler() -> anyhow::Result<()> { #[derive(Clone)] pub struct MyOperator {} -#[async_trait] impl Operator for MyOperator { type Specifications = Specifications; async fn reconcile( &self, desired_manifest: &mut ManifestState, - ) -> anyhow::Result<()> { - let now = jiff::Timestamp::now(); - - desired_manifest.status.status = nocontrol::manifests::ManifestStatusState::Started; - desired_manifest.updated = now; - + ) -> anyhow::Result { match &desired_manifest.manifest.spec { Specifications::Deployment(spec) => { tracing::info!( @@ -73,10 +65,8 @@ impl Operator for MyOperator { ) } } - desired_manifest.status.status = nocontrol::manifests::ManifestStatusState::Running; - desired_manifest.updated = now; - Ok(()) + Ok(Action::Requeue(std::time::Duration::from_secs(1))) } }