feat: add reconciliation loop

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
2026-01-06 22:06:54 +01:00
parent 2d59c7fd69
commit f73b8c7796
13 changed files with 790 additions and 221 deletions

View File

@@ -1,7 +1,7 @@
[package] [package]
name = "nocontrol-tui" name = "nocontrol-tui"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2024"
[lib] [lib]
name = "nocontrol_tui" name = "nocontrol_tui"

View File

@@ -3,20 +3,22 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use crossterm::{ use crossterm::{
event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode, KeyEventKind, KeyModifiers}, event::{
self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode, KeyEventKind, KeyModifiers,
},
execute, execute,
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, terminal::{EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode},
}; };
use fuzzy_matcher::FuzzyMatcher; use fuzzy_matcher::FuzzyMatcher;
use fuzzy_matcher::skim::SkimMatcherV2; use fuzzy_matcher::skim::SkimMatcherV2;
use nocontrol::{manifests::ManifestState, ControlPlane, Operator, Specification}; use nocontrol::{ControlPlane, Operator, Specification, manifests::ManifestState};
use ratatui::{ use ratatui::{
Frame, Terminal,
backend::{Backend, CrosstermBackend}, backend::{Backend, CrosstermBackend},
layout::{Constraint, Direction, Layout, Rect}, layout::{Constraint, Direction, Layout, Rect},
style::{Color, Modifier, Style}, style::{Color, Modifier, Style},
text::{Line, Span, Text}, text::{Line, Span, Text},
widgets::{Block, Borders, List, ListItem, ListState, Paragraph, Wrap}, widgets::{Block, Borders, List, ListItem, ListState, Paragraph, Wrap},
Frame, Terminal,
}; };
use tokio::sync::Mutex; use tokio::sync::Mutex;
@@ -265,22 +267,36 @@ impl<TOperator: Operator> App<TOperator> {
} }
["help"] => { ["help"] => {
self.messages.push("Commands:".to_string()); self.messages.push("Commands:".to_string());
self.messages.push(" get, list - Refresh manifest list".to_string()); self.messages
self.messages.push(" describe - Show selected manifest details".to_string()); .push(" get, list - Refresh manifest list".to_string());
self.messages.push(" history - Show command history".to_string()); self.messages
self.messages.push(" clear - Clear output messages".to_string()); .push(" describe - Show selected manifest details".to_string());
self.messages.push(" quit, exit, q - Exit application".to_string()); self.messages
self.messages.push(" help - Show this help".to_string()); .push(" history - Show command history".to_string());
self.messages
.push(" clear - Clear output messages".to_string());
self.messages
.push(" quit, exit, q - Exit application".to_string());
self.messages
.push(" help - Show this help".to_string());
self.messages.push("".to_string()); self.messages.push("".to_string());
self.messages.push("Keys:".to_string()); self.messages.push("Keys:".to_string());
self.messages.push(" / - Toggle search mode".to_string()); self.messages
self.messages.push(" ↑/↓ - Navigate list OR cycle command history".to_string()); .push(" / - Toggle search mode".to_string());
self.messages.push(" Enter - Execute command (or exit search)".to_string()); self.messages
self.messages.push(" Esc - Clear input / exit search".to_string()); .push(" ↑/↓ - Navigate list OR cycle command history".to_string());
self.messages.push(" q - Quick quit (when input empty)".to_string()); self.messages
.push(" Enter - Execute command (or exit search)".to_string());
self.messages
.push(" Esc - Clear input / exit search".to_string());
self.messages
.push(" q - Quick quit (when input empty)".to_string());
} }
_ => { _ => {
self.messages.push(format!("Unknown command: {}. Type 'help' for commands.", cmd)); self.messages.push(format!(
"Unknown command: {}. Type 'help' for commands.",
cmd
));
} }
} }
@@ -321,15 +337,17 @@ where
} }
// Handle input with timeout // Handle input with timeout
if event::poll(Duration::from_millis(100))? { if event::poll(Duration::from_millis(100))?
if let Event::Key(key) = event::read()? { && let Event::Key(key) = event::read()?
if key.kind == KeyEventKind::Press { && key.kind == KeyEventKind::Press
{
let mut app = app.lock().await; let mut app = app.lock().await;
match (key.code, key.modifiers, app.input_mode) { match (key.code, key.modifiers, app.input_mode) {
// Quick quit with 'q' when input is empty // Quick quit with 'q' when input is empty
(KeyCode::Char('q'), KeyModifiers::NONE, InputMode::Normal) (KeyCode::Char('q'), KeyModifiers::NONE, InputMode::Normal)
if app.command_input.is_empty() => { if app.command_input.is_empty() =>
{
app.should_quit = true; app.should_quit = true;
} }
@@ -392,8 +410,6 @@ where
} }
} }
} }
}
}
Ok(()) Ok(())
} }
@@ -415,21 +431,25 @@ fn ui<TOperator: Operator>(f: &mut Frame, app: &mut App<TOperator>) {
InputMode::Normal => "NORMAL", InputMode::Normal => "NORMAL",
InputMode::Search => "SEARCH", InputMode::Search => "SEARCH",
}; };
let title = Paragraph::new(vec![ let title = Paragraph::new(vec![Line::from(vec![
Line::from(vec![
Span::styled( Span::styled(
"NoControl - Kubernetes-like Control Plane", "NoControl - Kubernetes-like Control Plane",
Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD), Style::default()
.fg(Color::Cyan)
.add_modifier(Modifier::BOLD),
), ),
Span::raw(" "), Span::raw(" "),
Span::styled( Span::styled(
format!("[{}]", mode_text), format!("[{}]", mode_text),
Style::default() Style::default()
.fg(if app.input_mode == InputMode::Search { Color::Yellow } else { Color::Green }) .fg(if app.input_mode == InputMode::Search {
Color::Yellow
} else {
Color::Green
})
.add_modifier(Modifier::BOLD), .add_modifier(Modifier::BOLD),
), ),
]), ])])
])
.block(Block::default().borders(Borders::ALL)); .block(Block::default().borders(Borders::ALL));
f.render_widget(title, chunks[0]); f.render_widget(title, chunks[0]);
@@ -454,7 +474,8 @@ fn ui<TOperator: Operator>(f: &mut Frame, app: &mut App<TOperator>) {
fn render_manifest_list<TOperator: Operator>(f: &mut Frame, app: &mut App<TOperator>, area: Rect) { fn render_manifest_list<TOperator: Operator>(f: &mut Frame, app: &mut App<TOperator>, area: Rect) {
// Collect filtered manifests data before borrowing list_state // Collect filtered manifests data before borrowing list_state
let filtered_data: Vec<_> = app.filtered_indices let filtered_data: Vec<_> = app
.filtered_indices
.iter() .iter()
.filter_map(|&idx| app.manifests.get(idx)) .filter_map(|&idx| app.manifests.get(idx))
.map(|m| (m.manifest.name.clone(), m.status.status.clone())) .map(|m| (m.manifest.name.clone(), m.status.status.clone()))
@@ -469,14 +490,12 @@ fn render_manifest_list<TOperator: Operator>(f: &mut Frame, app: &mut App<TOpera
nocontrol::manifests::ManifestStatusState::Pending => Color::Gray, nocontrol::manifests::ManifestStatusState::Pending => Color::Gray,
nocontrol::manifests::ManifestStatusState::Stopping => Color::Magenta, nocontrol::manifests::ManifestStatusState::Stopping => Color::Magenta,
nocontrol::manifests::ManifestStatusState::Deleting => Color::Red, nocontrol::manifests::ManifestStatusState::Deleting => Color::Red,
nocontrol::manifests::ManifestStatusState::Errored => Color::Red,
}; };
let status_text = format!("{:?}", status); let status_text = format!("{:?}", status);
let content = Line::from(vec![ let content = Line::from(vec![
Span::styled( Span::styled("", Style::default().fg(status_color)),
"",
Style::default().fg(status_color),
),
Span::raw(name), Span::raw(name),
Span::raw(" "), Span::raw(" "),
Span::styled( Span::styled(
@@ -526,7 +545,10 @@ fn render_manifest_details<TOperator: Operator>(f: &mut Frame, app: &App<TOperat
Span::raw(format!("{:?}", manifest.status.status)), Span::raw(format!("{:?}", manifest.status.status)),
]), ]),
Line::from(vec![ Line::from(vec![
Span::styled("Generation: ", Style::default().add_modifier(Modifier::BOLD)), Span::styled(
"Generation: ",
Style::default().add_modifier(Modifier::BOLD),
),
Span::raw(format!("{}", manifest.generation)), Span::raw(format!("{}", manifest.generation)),
]), ]),
Line::from(vec![ Line::from(vec![
@@ -541,7 +563,10 @@ fn render_manifest_details<TOperator: Operator>(f: &mut Frame, app: &App<TOperat
if !manifest.status.events.is_empty() { if !manifest.status.events.is_empty() {
lines.push(Line::from("")); lines.push(Line::from(""));
lines.push(Line::from(Span::styled("Events:", Style::default().add_modifier(Modifier::BOLD)))); lines.push(Line::from(Span::styled(
"Events:",
Style::default().add_modifier(Modifier::BOLD),
)));
for event in manifest.status.events.iter().rev().take(5) { for event in manifest.status.events.iter().rev().take(5) {
lines.push(Line::from(format!("{}", event.message))); lines.push(Line::from(format!("{}", event.message)));
} }
@@ -549,7 +574,10 @@ fn render_manifest_details<TOperator: Operator>(f: &mut Frame, app: &App<TOperat
if !manifest.status.changes.is_empty() { if !manifest.status.changes.is_empty() {
lines.push(Line::from("")); lines.push(Line::from(""));
lines.push(Line::from(Span::styled("Changes:", Style::default().add_modifier(Modifier::BOLD)))); lines.push(Line::from(Span::styled(
"Changes:",
Style::default().add_modifier(Modifier::BOLD),
)));
for change in manifest.status.changes.iter().rev().take(5) { for change in manifest.status.changes.iter().rev().take(5) {
lines.push(Line::from(format!( lines.push(Line::from(format!(
"{:?} at {}", "{:?} at {}",
@@ -596,7 +624,10 @@ fn render_command_input<TOperator: Operator>(f: &mut Frame, app: &App<TOperator>
String::new() String::new()
}; };
( (
format!(" Command{} (/ to search, ↑↓ for history, 'help' for commands) ", hist_info), format!(
" Command{} (/ to search, ↑↓ for history, 'help' for commands) ",
hist_info
),
app.command_input.as_str(), app.command_input.as_str(),
Style::default().fg(Color::Yellow), Style::default().fg(Color::Yellow),
) )
@@ -604,7 +635,9 @@ fn render_command_input<TOperator: Operator>(f: &mut Frame, app: &App<TOperator>
InputMode::Search => ( InputMode::Search => (
" Search (fuzzy) - Enter to apply, Esc to cancel ".to_string(), " Search (fuzzy) - Enter to apply, Esc to cancel ".to_string(),
app.command_input.as_str(), app.command_input.as_str(),
Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD), Style::default()
.fg(Color::Cyan)
.add_modifier(Modifier::BOLD),
), ),
}; };

View File

@@ -1,7 +1,7 @@
[package] [package]
name = "nocontrol" name = "nocontrol"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2024"
[dependencies] [dependencies]
anyhow.workspace = true anyhow.workspace = true

View File

@@ -1,9 +1,8 @@
use std::time::Duration; use std::time::Duration;
use async_trait::async_trait;
use nocontrol::{ use nocontrol::{
manifests::{Manifest, ManifestMetadata, ManifestState}, Operator, OperatorState, Specification,
Operator, Specification, manifests::{Action, Manifest, ManifestMetadata, ManifestState},
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
@@ -22,7 +21,7 @@ async fn main() -> anyhow::Result<()> {
.without_time() .without_time()
.init(); .init();
let operator = MyOperator {}; let operator = OperatorState::new(MyOperator {});
let control_plane = nocontrol::ControlPlane::new(operator); let control_plane = nocontrol::ControlPlane::new(operator);
// Add initial manifest // Add initial manifest
@@ -56,7 +55,7 @@ async fn main() -> anyhow::Result<()> {
name: "initial-deployment".into(), name: "initial-deployment".into(),
metadata: ManifestMetadata {}, metadata: ManifestMetadata {},
spec: Specifications::Deployment(DeploymentControllerManifest { spec: Specifications::Deployment(DeploymentControllerManifest {
name: format!("app-{}", &random.to_string()[..8]), name: format!("app-{}", &random.to_string()),
}), }),
}) })
.await; .await;
@@ -81,19 +80,13 @@ async fn main() -> anyhow::Result<()> {
#[derive(Clone)] #[derive(Clone)]
pub struct MyOperator {} pub struct MyOperator {}
#[async_trait]
impl Operator for MyOperator { impl Operator for MyOperator {
type Specifications = Specifications; type Specifications = Specifications;
async fn reconcile( async fn reconcile(
&self, &self,
desired_manifest: &mut ManifestState<Specifications>, desired_manifest: &mut ManifestState<Specifications>,
) -> anyhow::Result<()> { ) -> anyhow::Result<Action> {
let now = jiff::Timestamp::now();
desired_manifest.status.status = nocontrol::manifests::ManifestStatusState::Started;
desired_manifest.updated = now;
match &desired_manifest.manifest.spec { match &desired_manifest.manifest.spec {
Specifications::Deployment(spec) => { Specifications::Deployment(spec) => {
tracing::info!( tracing::info!(
@@ -104,10 +97,7 @@ impl Operator for MyOperator {
} }
} }
desired_manifest.status.status = nocontrol::manifests::ManifestStatusState::Running; Ok(Action::Requeue(std::time::Duration::from_secs(10)))
desired_manifest.updated = now;
Ok(())
} }
} }

View File

@@ -1,9 +1,9 @@
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use crate::{ use crate::{
Operator, OperatorState,
control_plane::{backing_store::BackingStore, reconciler::Reconciler}, control_plane::{backing_store::BackingStore, reconciler::Reconciler},
manifests::{Manifest, ManifestState}, manifests::{Manifest, ManifestState},
Operator,
}; };
pub mod backing_store; pub mod backing_store;
@@ -19,7 +19,7 @@ pub struct ControlPlane<TOperator: Operator> {
} }
impl<TOperator: Operator> ControlPlane<TOperator> { impl<TOperator: Operator> ControlPlane<TOperator> {
pub fn new(operator: TOperator) -> Self { pub fn new(operator: OperatorState<TOperator>) -> Self {
let worker_id = uuid::Uuid::now_v7(); let worker_id = uuid::Uuid::now_v7();
let store = BackingStore::<TOperator::Specifications>::new(); let store = BackingStore::<TOperator::Specifications>::new();

View File

@@ -5,11 +5,11 @@ use sha2::{Digest, Sha256};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use crate::{ use crate::{
Specification,
manifests::{ manifests::{
Manifest, ManifestChangeEvent, ManifestChangeEventType, ManifestLease, ManifestState, Manifest, ManifestChangeEvent, ManifestChangeEventType, ManifestLease, ManifestState,
ManifestStatus, ManifestStatusState, WorkerId, ManifestStatus, ManifestStatusState, WorkerId,
}, },
Specification,
}; };
#[derive(Clone)] #[derive(Clone)]
@@ -46,6 +46,16 @@ impl<T: Specification> BackingStore<T> {
Ok(self.manifests.read().await.clone()) Ok(self.manifests.read().await.clone())
} }
pub async fn get(&self, name: &str) -> anyhow::Result<Option<ManifestState<T>>> {
Ok(self
.manifests
.read()
.await
.iter()
.find(|m| m.manifest.name == name)
.cloned())
}
pub async fn update_lease(&self, manifest_state: &ManifestState<T>) -> anyhow::Result<()> { pub async fn update_lease(&self, manifest_state: &ManifestState<T>) -> anyhow::Result<()> {
tracing::trace!(manifest_state.manifest.name, "updating lease"); tracing::trace!(manifest_state.manifest.name, "updating lease");
let mut manifests = self.manifests.write().await; let mut manifests = self.manifests.write().await;

View File

@@ -1,88 +1,228 @@
use anyhow::Context; use anyhow::Context;
use jiff::Timestamp;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use crate::{control_plane::backing_store::BackingStore, manifests::WorkerId, Operator}; use crate::{
Operator, OperatorState,
control_plane::backing_store::BackingStore,
manifests::{Action, ManifestName, ManifestState, ManifestStatusState, WorkerId},
reconcile_queue::ReconcileQueue,
};
#[derive(Clone)] #[derive(Clone)]
pub struct Reconciler<T: Operator> { pub struct Reconciler<T: Operator> {
worker_id: WorkerId, worker_id: WorkerId,
store: BackingStore<T::Specifications>, store: BackingStore<T::Specifications>,
operator: T, operator: OperatorState<T>,
reconcile_queue: ReconcileQueue<T::Specifications>,
} }
impl<T: Operator> Reconciler<T> { impl<T: Operator> Reconciler<T> {
pub fn new(worker_id: WorkerId, store: &BackingStore<T::Specifications>, operator: T) -> Self { pub fn new(
worker_id: WorkerId,
store: &BackingStore<T::Specifications>,
operator: OperatorState<T>,
) -> Self {
Self { Self {
worker_id, worker_id,
store: store.clone(), store: store.clone(),
operator, operator,
reconcile_queue: ReconcileQueue::new(),
} }
} }
pub async fn reconcile(&self, cancellation_token: &CancellationToken) -> anyhow::Result<()> { /// Run the reconciler. This starts two concurrent tasks:
let now = jiff::Timestamp::now(); /// 1. A sync task that periodically checks for new/changed manifests and enqueues them
tracing::debug!(%self.worker_id, %now, "running reconciler"); /// 2. A worker task that processes the queue and runs reconciliations
pub async fn run(&self, cancellation_token: &CancellationToken) -> anyhow::Result<()> {
let now = Timestamp::now();
tracing::debug!(%self.worker_id, %now, "starting reconciler");
tokio::select! {
result = self.sync_manifests(cancellation_token) => {
result.context("sync manifests task failed")?;
}
result = self.process_queue(cancellation_token) => {
result.context("process queue task failed")?;
}
_ = cancellation_token.cancelled() => {
tracing::debug!("reconciler received cancellation");
}
}
tracing::debug!("reconciler shutting down");
Ok(())
}
/// Periodically sync manifests from the backing store.
/// Acquires leases and enqueues manifests that need reconciliation.
async fn sync_manifests(&self, cancellation_token: &CancellationToken) -> anyhow::Result<()> {
loop { loop {
let now = jiff::Timestamp::now();
if cancellation_token.is_cancelled() { if cancellation_token.is_cancelled() {
break; break;
} }
tracing::trace!(%self.worker_id, %now, "reconciler iteration");
let mut our_manifests = Vec::new(); let now = Timestamp::now();
// 1. read manifests from a backing store tracing::trace!(%self.worker_id, %now, "syncing manifests");
for manifest_state in self.store.get_owned_and_potential_leases().await? {
// 3. Lease the manifest
match &manifest_state.lease {
Some(lease) if lease.owner == self.worker_id => {
// We own the lease, update
self.store
.update_lease(&manifest_state)
.await
.context("update lease")?;
our_manifests.push(manifest_state.clone());
}
None => {
// 2. If no lease
// Acquire lease
self.store
.acquire_lease(&manifest_state, &self.worker_id)
.await
.context("acquire lease")?;
our_manifests.push(manifest_state.clone());
}
_ => {
// Skipping manifest, as it is not vaid
continue;
}
}
}
// 4. Check desired vs actual if let Err(e) = self.sync_once().await {
'manifest: for manifest in our_manifests.iter_mut() { tracing::warn!(error = %e, "failed to sync manifests");
// Currently periodic sync,
// TODO: this should also be made event based
if let Some(change) = manifest.status.changes.first() {
if change.handled {
continue 'manifest;
}
}
self.operator.reconcile(manifest).await?;
self.store.update_state(manifest).await?;
if let Some(change) = manifest.status.changes.first_mut() {
change.handled = true
}
} }
tokio::time::sleep(std::time::Duration::from_millis(500)).await; tokio::time::sleep(std::time::Duration::from_millis(500)).await;
} }
tracing::debug!("reconciler shutting down"); Ok(())
}
/// Single sync iteration - check for manifests, acquire leases, enqueue work.
async fn sync_once(&self) -> anyhow::Result<()> {
for manifest_state in self.store.get_owned_and_potential_leases().await? {
let manifest_name = manifest_state.manifest.name.clone();
match &manifest_state.lease {
Some(lease) if lease.owner == self.worker_id => {
// We own the lease, update it
self.store
.update_lease(&manifest_state)
.await
.context("update lease")?;
// Check if there are unhandled changes
if self.needs_reconciliation(&manifest_state) {
self.reconcile_queue.enqueue(manifest_name).await;
}
}
None => {
// No lease, try to acquire
self.store
.acquire_lease(&manifest_state, &self.worker_id)
.await
.context("acquire lease")?;
// Enqueue for reconciliation
self.reconcile_queue.enqueue(manifest_name).await;
}
_ => {
// Someone else owns the lease, skip
continue;
}
}
}
Ok(()) Ok(())
} }
/// Check if a manifest needs reconciliation.
fn needs_reconciliation(&self, manifest: &ManifestState<T::Specifications>) -> bool {
// Has unhandled changes
if let Some(change) = manifest.status.changes.first()
&& !change.handled
{
return true;
}
false
}
/// Process the reconciliation queue.
/// Takes items from the queue and reconciles them, re-enqueuing with delay if needed.
async fn process_queue(&self, cancellation_token: &CancellationToken) -> anyhow::Result<()> {
loop {
if cancellation_token.is_cancelled() {
break;
}
// Wait for the next item from the queue
let manifest_name = tokio::select! {
name = self.reconcile_queue.take() => name,
_ = cancellation_token.cancelled() => break,
};
tracing::trace!(%self.worker_id, %manifest_name, "processing manifest from queue");
if let Err(e) = self.reconcile_manifest(&manifest_name).await {
tracing::warn!(error = %e, %manifest_name, "failed to reconcile manifest");
// On error, requeue with a short delay
self.requeue_with_delay(&manifest_name, std::time::Duration::from_secs(5))
.await;
}
}
Ok(())
}
/// Reconcile a single manifest by name.
async fn reconcile_manifest(&self, manifest_name: &ManifestName) -> anyhow::Result<()> {
// Fetch the current manifest state
let Some(mut manifest) = self.store.get(manifest_name).await? else {
tracing::debug!(%manifest_name, "manifest not found, skipping");
return Ok(());
};
// Verify we still own the lease
match &manifest.lease {
Some(lease) if lease.owner == self.worker_id => {}
_ => {
tracing::debug!(%manifest_name, "we don't own the lease, skipping");
return Ok(());
}
}
let now = Timestamp::now();
manifest.status.status = ManifestStatusState::Started;
manifest.updated = now;
// Run the reconciliation
let action = match self.operator.reconcile(&mut manifest).await {
Ok(action) => action,
Err(e) => {
manifest.status.status = ManifestStatusState::Errored;
manifest.updated = Timestamp::now();
// Let the operator handle the error
self.operator.on_error(&mut manifest, &e).await
}
};
// Update status
let now = Timestamp::now();
manifest.status.status = ManifestStatusState::Running;
manifest.updated = now;
// Mark change as handled
if let Some(change) = manifest.status.changes.first_mut() {
change.handled = true;
}
// Persist the updated state
self.store.update_state(&manifest).await?;
// Handle requeue action
match action {
Action::None => {}
Action::Requeue(delay) => {
self.requeue_with_delay(manifest_name, delay).await;
}
}
Ok(())
}
/// Requeue a manifest with a delay.
async fn requeue_with_delay(&self, manifest_name: &ManifestName, delay: std::time::Duration) {
let scheduled_at = Timestamp::now()
.checked_add(jiff::Span::try_from(delay).unwrap_or(jiff::Span::new().seconds(5)))
.unwrap_or_else(|_| Timestamp::now());
tracing::trace!(%manifest_name, ?delay, %scheduled_at, "requeuing manifest");
self.reconcile_queue
.enqueue_at(manifest_name.clone(), scheduled_at)
.await;
}
// Keep the old method name for backwards compatibility
pub async fn reconcile(&self, cancellation_token: &CancellationToken) -> anyhow::Result<()> {
self.run(cancellation_token).await
}
} }

View File

@@ -1,21 +1,14 @@
#![feature(associated_type_defaults)]
mod control_plane; mod control_plane;
pub use control_plane::ControlPlane;
pub mod manifests; pub mod manifests;
pub use control_plane::ControlPlane; mod operator_state;
use serde::{de::DeserializeOwned, Serialize}; pub use operator_state::*;
use crate::manifests::ManifestState; mod operator;
pub use operator::*;
pub trait Specification: Clone + Serialize + DeserializeOwned { mod reconcile_queue;
fn kind(&self) -> &'static str;
}
#[async_trait::async_trait]
pub trait Operator {
type Specifications: Specification;
async fn reconcile(
&self,
desired_manifest: &mut ManifestState<Self::Specifications>,
) -> anyhow::Result<()>;
}

View File

@@ -1,4 +1,4 @@
use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde::{Deserialize, Serialize, de::DeserializeOwned};
use crate::Specification; use crate::Specification;
@@ -27,13 +27,15 @@ pub struct ManifestLease {
pub last_seen: jiff::Timestamp, pub last_seen: jiff::Timestamp,
} }
pub type ManifestName = String;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(bound(serialize = "T: Serialize", deserialize = "T: DeserializeOwned"))] #[serde(bound(serialize = "T: Serialize", deserialize = "T: DeserializeOwned"))]
pub struct Manifest<T> pub struct Manifest<T>
where where
T: Specification, T: Specification,
{ {
pub name: String, pub name: ManifestName,
pub metadata: ManifestMetadata, pub metadata: ManifestMetadata,
pub spec: T, pub spec: T,
} }
@@ -50,6 +52,7 @@ pub struct ManifestStatus {
pub enum ManifestStatusState { pub enum ManifestStatusState {
Pending, Pending,
Started, Started,
Errored,
Running, Running,
Stopping, Stopping,
Deleting, Deleting,
@@ -80,3 +83,9 @@ pub struct ManifestEvent {
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ManifestMetadata {} pub struct ManifestMetadata {}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Action {
None,
Requeue(std::time::Duration),
}

View File

@@ -0,0 +1,26 @@
use serde::{Serialize, de::DeserializeOwned};
use crate::manifests::{Action, ManifestState};
pub trait Specification: Clone + Serialize + DeserializeOwned {
fn kind(&self) -> &'static str;
}
#[allow(dead_code, unused_variables)]
pub trait Operator: Send + Sync + 'static {
type Specifications: Specification;
type Error = anyhow::Error;
fn reconcile(
&self,
desired_manifest: &mut ManifestState<Self::Specifications>,
) -> impl Future<Output = Result<Action, Self::Error>>;
fn on_error(
&self,
desired_manifest: &mut ManifestState<Self::Specifications>,
error: &Self::Error,
) -> impl Future<Output = Action> {
async { Action::None }
}
}

View File

@@ -0,0 +1,94 @@
use std::{sync::Arc, time::Duration};
use tokio::sync::Mutex;
use crate::Operator;
#[derive(Clone)]
pub struct OperatorState<T: Operator> {
inner: Arc<Mutex<T>>,
config: Arc<OperatorConfig>,
}
impl<T: Operator> OperatorState<T> {
pub fn new(operator: T) -> Self {
Self {
inner: Arc::new(Mutex::new(operator)),
config: Default::default(),
}
}
pub fn new_with_config(operator: T, config: OperatorConfig) -> Self {
Self {
inner: Arc::new(Mutex::new(operator)),
config: Arc::new(config),
}
}
pub(crate) fn config(&self) -> &OperatorConfig {
&self.config
}
}
impl<T: Operator> Operator for OperatorState<T> {
type Specifications = T::Specifications;
type Error = T::Error;
async fn reconcile(
&self,
desired_manifest: &mut crate::manifests::ManifestState<Self::Specifications>,
) -> Result<crate::manifests::Action, Self::Error> {
self.inner.lock().await.reconcile(desired_manifest).await
}
async fn on_error(
&self,
desired_manifest: &mut crate::manifests::ManifestState<Self::Specifications>,
error: &Self::Error,
) -> crate::manifests::Action {
self.inner
.lock()
.await
.on_error(desired_manifest, error)
.await
}
}
pub struct OperatorConfig {
pub backoff_policy: BackoffPolicy,
pub reconcile_on: Option<tokio::sync::mpsc::Receiver<()>>,
}
impl Default for OperatorConfig {
fn default() -> Self {
Self {
backoff_policy: BackoffPolicy::Static {
delay: Duration::from_secs(30),
},
reconcile_on: Default::default(),
}
}
}
pub enum BackoffPolicy {
Never,
Expotential {
multiplier: f64,
initial_delay: Duration,
max_delay: Option<Duration>,
},
Static {
delay: Duration,
},
Ramp(Vec<Duration>),
}
impl Default for BackoffPolicy {
fn default() -> Self {
Self::Expotential {
multiplier: 2.0,
initial_delay: Duration::from_secs(1),
max_delay: Some(Duration::from_mins(5)),
}
}
}

View File

@@ -0,0 +1,284 @@
use std::{cmp::Ordering, collections::BinaryHeap, marker::PhantomData, sync::Arc};
use jiff::Timestamp;
use tokio::sync::{Mutex, Notify};
use crate::{Specification, manifests::ManifestName};
#[derive(Debug, Clone, Eq, PartialEq)]
struct QueueEntry {
scheduled_at: Timestamp,
key: ManifestName,
}
impl Ord for QueueEntry {
fn cmp(&self, other: &Self) -> Ordering {
// Reverse ordering for min-heap (earliest first)
other
.scheduled_at
.cmp(&self.scheduled_at)
.then_with(|| self.key.cmp(&other.key))
}
}
impl PartialOrd for QueueEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
struct Inner {
queue: BinaryHeap<QueueEntry>,
}
/// A simple sorted idempotent queue for scheduling reconciliation jobs.
///
/// Each key can only appear once in the queue. If a key is enqueued again
/// with an earlier scheduled time, the existing entry is updated.
///
/// This queue is Clone, Send, and Sync - it can be shared across threads.
pub struct ReconcileQueue<T: Specification> {
inner: Arc<Mutex<Inner>>,
notify: Arc<Notify>,
_marker: PhantomData<T>,
}
impl<T: Specification> Clone for ReconcileQueue<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
notify: Arc::clone(&self.notify),
_marker: PhantomData,
}
}
}
impl<T: Specification> Default for ReconcileQueue<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: Specification> ReconcileQueue<T> {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(Inner {
queue: BinaryHeap::new(),
})),
notify: Arc::new(Notify::new()),
_marker: PhantomData,
}
}
/// Enqueue a key to be reconciled now.
pub async fn enqueue(&self, key: ManifestName) {
self.enqueue_at(key, Timestamp::now()).await;
}
/// Enqueue a key to be reconciled at a specific time.
///
/// If the key already exists in the queue with a later scheduled time,
/// it will be updated to the earlier time. If it exists with an earlier
/// or equal time, this is a no-op.
pub async fn enqueue_at(&self, key: ManifestName, scheduled_at: Timestamp) {
let mut inner = self.inner.lock().await;
// Remove any existing entry for this key
let existing: Vec<_> = inner.queue.drain().collect();
let mut found_earlier = false;
for entry in existing {
if entry.key == key {
// Keep the earlier scheduled time
if entry.scheduled_at <= scheduled_at {
inner.queue.push(entry);
found_earlier = true;
}
} else {
inner.queue.push(entry);
}
}
// Add new entry if we didn't find an earlier one
if !found_earlier {
inner.queue.push(QueueEntry { scheduled_at, key });
}
self.notify.notify_one();
}
/// Try to take the next ready job from the queue.
///
/// Returns `Some(key)` if there's a job scheduled at or before now,
/// otherwise returns `None`.
pub async fn try_take(&self) -> Option<ManifestName> {
let now = Timestamp::now();
let mut inner = self.inner.lock().await;
if let Some(entry) = inner.queue.peek()
&& entry.scheduled_at <= now
{
return inner.queue.pop().map(|e| e.key);
}
None
}
/// Returns how long until the next job is ready, if any.
pub async fn next_ready_in(&self) -> Option<std::time::Duration> {
let now = Timestamp::now();
let inner = self.inner.lock().await;
inner.queue.peek().and_then(|entry| {
let span = entry.scheduled_at - now;
if span.is_negative() {
Some(std::time::Duration::ZERO)
} else {
span.try_into().ok()
}
})
}
/// Wait for the next job to be ready and return it.
///
/// This will block until a job is ready.
pub async fn take(&self) -> ManifestName {
loop {
if let Some(key) = self.try_take().await {
return key;
}
match self.next_ready_in().await {
Some(duration) if duration > std::time::Duration::ZERO => {
tokio::select! {
_ = tokio::time::sleep(duration) => {}
_ = self.notify.notified() => {}
}
}
Some(_) => {
// Duration is zero, should have a ready item
continue;
}
None => {
// Queue is empty, wait for notification
self.notify.notified().await;
}
}
}
}
/// Returns the number of pending jobs in the queue.
pub async fn len(&self) -> usize {
self.inner.lock().await.queue.len()
}
/// Returns true if the queue is empty.
pub async fn is_empty(&self) -> bool {
self.inner.lock().await.queue.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
use jiff::ToSpan;
use serde::{Deserialize, Serialize};
#[derive(Clone, Serialize, Deserialize)]
struct TestSpec;
impl Specification for TestSpec {
fn kind(&self) -> &'static str {
"test"
}
}
#[tokio::test]
async fn test_enqueue_and_take() {
let queue: ReconcileQueue<TestSpec> = ReconcileQueue::new();
queue.enqueue("job1".to_string()).await;
queue.enqueue("job2".to_string()).await;
assert_eq!(queue.len().await, 2);
let job = queue.try_take().await;
assert!(job.is_some());
assert_eq!(queue.len().await, 1);
}
#[tokio::test]
async fn test_idempotent_enqueue() {
let queue: ReconcileQueue<TestSpec> = ReconcileQueue::new();
queue.enqueue("job1".to_string()).await;
queue.enqueue("job1".to_string()).await;
assert_eq!(queue.len().await, 1);
}
#[tokio::test]
async fn test_scheduled_order() {
let queue: ReconcileQueue<TestSpec> = ReconcileQueue::new();
let now = Timestamp::now();
let later = now.checked_add(1.hour()).unwrap();
queue.enqueue_at("later_job".to_string(), later).await;
queue.enqueue_at("now_job".to_string(), now).await;
// Should get the "now" job first
let job = queue.try_take().await;
assert_eq!(job, Some("now_job".to_string()));
// Later job should not be ready yet
let job = queue.try_take().await;
assert!(job.is_none());
}
#[tokio::test]
async fn test_enqueue_earlier_updates() {
let queue: ReconcileQueue<TestSpec> = ReconcileQueue::new();
let now = Timestamp::now();
let later = now.checked_add(1.hour()).unwrap();
// First enqueue for later
queue.enqueue_at("job1".to_string(), later).await;
assert!(queue.try_take().await.is_none());
// Re-enqueue for now - should update
queue.enqueue_at("job1".to_string(), now).await;
assert_eq!(queue.try_take().await, Some("job1".to_string()));
}
#[tokio::test]
async fn test_enqueue_later_no_op() {
let queue: ReconcileQueue<TestSpec> = ReconcileQueue::new();
let now = Timestamp::now();
let later = now.checked_add(1.hour()).unwrap();
// First enqueue for now
queue.enqueue_at("job1".to_string(), now).await;
// Try to enqueue for later - should be no-op
queue.enqueue_at("job1".to_string(), later).await;
// Should still be ready now
assert_eq!(queue.try_take().await, Some("job1".to_string()));
}
#[tokio::test]
async fn test_clone_shares_state() {
let queue1: ReconcileQueue<TestSpec> = ReconcileQueue::new();
let queue2 = queue1.clone();
queue1.enqueue("job1".to_string()).await;
assert_eq!(queue2.len().await, 1);
let job = queue2.try_take().await;
assert_eq!(job, Some("job1".to_string()));
assert!(queue1.is_empty().await);
}
}

View File

@@ -1,7 +1,6 @@
use async_trait::async_trait;
use nocontrol::{ use nocontrol::{
manifests::{Manifest, ManifestMetadata, ManifestState}, Operator, OperatorState, Specification,
Operator, Specification, manifests::{Action, Manifest, ManifestMetadata, ManifestState},
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tracing_test::traced_test; use tracing_test::traced_test;
@@ -9,8 +8,7 @@ use tracing_test::traced_test;
#[tokio::test] #[tokio::test]
#[traced_test] #[traced_test]
async fn test_can_run_reconciler() -> anyhow::Result<()> { async fn test_can_run_reconciler() -> anyhow::Result<()> {
let operator = MyOperator {}; let operator = OperatorState::new(MyOperator {});
let mut control_plane = nocontrol::ControlPlane::new(operator); let mut control_plane = nocontrol::ControlPlane::new(operator);
control_plane.with_deadline(std::time::Duration::from_secs(3)); control_plane.with_deadline(std::time::Duration::from_secs(3));
@@ -51,19 +49,13 @@ async fn test_can_run_reconciler() -> anyhow::Result<()> {
#[derive(Clone)] #[derive(Clone)]
pub struct MyOperator {} pub struct MyOperator {}
#[async_trait]
impl Operator for MyOperator { impl Operator for MyOperator {
type Specifications = Specifications; type Specifications = Specifications;
async fn reconcile( async fn reconcile(
&self, &self,
desired_manifest: &mut ManifestState<Specifications>, desired_manifest: &mut ManifestState<Specifications>,
) -> anyhow::Result<()> { ) -> anyhow::Result<Action> {
let now = jiff::Timestamp::now();
desired_manifest.status.status = nocontrol::manifests::ManifestStatusState::Started;
desired_manifest.updated = now;
match &desired_manifest.manifest.spec { match &desired_manifest.manifest.spec {
Specifications::Deployment(spec) => { Specifications::Deployment(spec) => {
tracing::info!( tracing::info!(
@@ -73,10 +65,8 @@ impl Operator for MyOperator {
) )
} }
} }
desired_manifest.status.status = nocontrol::manifests::ManifestStatusState::Running;
desired_manifest.updated = now;
Ok(()) Ok(Action::Requeue(std::time::Duration::from_secs(1)))
} }
} }