feat: add very basic in memory coordination

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
2026-01-05 19:10:00 +01:00
commit 353b8c1eb0
15 changed files with 1794 additions and 0 deletions

View File

@@ -0,0 +1,23 @@
[package]
name = "nocontrol"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow.workspace = true
async-trait = "0.1.89"
hex = "0.4.3"
jiff = { version = "0.2.17", features = ["serde"] }
rand = "0.9.2"
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.148"
sha2 = "0.10.9"
tokio.workspace = true
tokio-util = "0.7.18"
tracing.workspace = true
uuid = { version = "1.19.0", features = ["serde", "v4", "v7"] }
[dev-dependencies]
insta = "1.46.0"
tracing-subscriber = { version = "0.3.22", features = ["env-filter"] }
tracing-test = { version = "0.2.5", features = ["no-env-filter"] }

View File

@@ -0,0 +1,200 @@
use std::io::{BufRead, Write};
use async_trait::async_trait;
use nocontrol::{
manifests::{Manifest, ManifestMetadata, ManifestState},
Operator,
};
use serde::{Deserialize, Serialize};
use tracing_subscriber::EnvFilter;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let output_file = std::fs::File::create("target/nocontrol.log")?;
tracing_subscriber::fmt()
// .pretty()
.with_env_filter(EnvFilter::from_default_env())
.with_writer(output_file)
.with_file(false)
.with_line_number(false)
.with_target(false)
.without_time()
.init();
let operator = MyOperator {};
let mut control_plane = nocontrol::ControlPlane::new(operator);
// control_plane.with_deadline(std::time::Duration::from_secs(4));
tokio::spawn({
let control_plane = control_plane.clone();
async move {
control_plane
.add_manifest(Manifest {
name: "some-manifest".into(),
metadata: ManifestMetadata {},
spec: Specifications::Deployment(DeploymentControllerManifest {
name: "some-name".into(),
}),
})
.await
.unwrap();
loop {
let rand = {
use rand::prelude::*;
let mut rng = rand::rng();
rng.random_range(2..5)
};
tokio::time::sleep(std::time::Duration::from_secs(rand)).await;
let random = uuid::Uuid::now_v7();
control_plane
.add_manifest(Manifest {
name: "some-manifest".into(),
metadata: ManifestMetadata {},
spec: Specifications::Deployment(DeploymentControllerManifest {
name: format!("some-changed-name: {}", random),
}),
})
.await
.unwrap();
}
}
});
// Debugging shell
tokio::spawn({
let control_plane = control_plane.clone();
async move {
let ui = Ui {};
loop {
ui.write("> ");
let cmd = ui.read_line();
let items = cmd.split(" ").map(|t| t.to_string()).collect::<Vec<_>>();
let (command, args) = match &items[..] {
[first, rest @ ..] => (first, rest.to_vec()),
//[first] => (first, vec![]),
_ => {
ui.writeln("invalid command");
continue;
}
};
match (command.as_str(), args.as_slice()) {
("get", _) => {
// get all for now
let manifests = control_plane
.get_manifests()
.await
.inspect_err(|e| ui.writeln(format!("get failed: {e:#}")))
.unwrap();
ui.writeln("listing manifests");
for manifest in manifests {
ui.writeln(format!(" - {}", manifest.manifest.name));
}
}
("describe", [manifest_name, ..]) => {
let manifests = control_plane
.get_manifests()
.await
.inspect_err(|e| ui.writeln(format!("get failed: {e:#}")))
.unwrap();
if let Some(manifest) =
manifests.iter().find(|m| &m.manifest.name == manifest_name)
{
let output = serde_json::to_string_pretty(&manifest).unwrap();
ui.writeln(output);
}
}
(cmd, _) => ui.writeln(format!("command is not implemented: {}", cmd)),
}
ui.writeln("");
}
}
});
control_plane.execute().await?;
Ok(())
}
#[derive(Clone)]
pub struct MyOperator {}
#[async_trait]
impl Operator for MyOperator {
type Specifications = Specifications;
async fn reconcile(
&self,
desired_manifest: &mut ManifestState<Specifications>,
) -> anyhow::Result<()> {
let now = jiff::Timestamp::now();
desired_manifest.status.status = nocontrol::manifests::ManifestStatusState::Started;
desired_manifest.updated = now;
match &desired_manifest.manifest.spec {
Specifications::Deployment(spec) => {
tracing::info!(
"reconciliation was called for name = {}, value = {}",
desired_manifest.manifest.name,
spec.name
)
}
}
desired_manifest.status.status = nocontrol::manifests::ManifestStatusState::Running;
desired_manifest.updated = now;
Ok(())
}
}
#[derive(Clone, Serialize)]
pub enum Specifications {
Deployment(DeploymentControllerManifest),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeploymentControllerManifest {
name: String,
}
pub struct Ui {}
impl Ui {
pub fn write(&self, msg: &str) {
let mut stderr = std::io::stderr().lock();
stderr.write_all(msg.as_bytes()).unwrap();
stderr.flush().unwrap()
}
pub fn writeln(&self, msg: impl AsRef<str>) {
let msg = msg.as_ref();
let mut stderr = std::io::stderr().lock();
stderr.write_all(msg.as_bytes()).unwrap();
writeln!(stderr).unwrap();
stderr.flush().unwrap()
}
pub fn read_line(&self) -> String {
let mut stdin = std::io::stdin().lock();
let mut output = String::new();
stdin.read_line(&mut output).unwrap();
output.trim().to_string()
}
}

View File

@@ -0,0 +1,77 @@
use std::marker::PhantomData;
use tokio_util::sync::CancellationToken;
use crate::{
control_plane::{backing_store::BackingStore, reconciler::Reconciler},
manifests::{Manifest, ManifestState},
Operator,
};
pub mod backing_store;
pub mod reconciler;
#[derive(Clone)]
pub struct ControlPlane<TOperator: Operator> {
reconciler: Reconciler<TOperator>,
worker_id: uuid::Uuid,
store: BackingStore<TOperator::Specifications>,
deadline: Option<std::time::Duration>,
}
impl<TOperator: Operator> ControlPlane<TOperator> {
pub fn new(operator: TOperator) -> Self {
let worker_id = uuid::Uuid::now_v7();
let store = BackingStore::<TOperator::Specifications>::new();
let reconciler = Reconciler::new(worker_id, &store, operator);
Self {
reconciler,
worker_id,
deadline: None,
store,
}
}
pub fn with_deadline(&mut self, deadline: std::time::Duration) -> &mut Self {
self.deadline = Some(deadline);
self
}
pub async fn execute(&self) -> anyhow::Result<()> {
tracing::info!(worker_id = %self.worker_id, "starting control plane");
let cancellation_token = CancellationToken::new();
let child_token = cancellation_token.child_token();
if let Some(deadline) = self.deadline {
tokio::spawn(async move {
tokio::time::sleep(deadline).await;
cancellation_token.cancel();
});
}
self.reconciler.reconcile(&child_token).await?;
Ok(())
}
pub async fn add_manifest(
&self,
manifest: Manifest<TOperator::Specifications>,
) -> anyhow::Result<()> {
tracing::info!(manifest.name, "adding manifest");
self.store.upsert_manifest(manifest).await?;
Ok(())
}
pub async fn get_manifests(
&self,
) -> anyhow::Result<Vec<ManifestState<TOperator::Specifications>>> {
self.store.get_manifests().await
}
}

View File

@@ -0,0 +1,152 @@
use std::sync::Arc;
use jiff::ToSpan;
use serde::Serialize;
use sha2::{Digest, Sha256};
use tokio::sync::RwLock;
use crate::manifests::{
Manifest, ManifestLease, ManifestState, ManifestStatus, ManifestStatusState, WorkerId,
};
#[derive(Clone)]
pub struct BackingStore<T: Clone + Serialize> {
manifests: Arc<RwLock<Vec<ManifestState<T>>>>,
}
impl<T: Clone + Serialize> BackingStore<T> {
pub fn new() -> Self {
Self {
manifests: Arc::new(RwLock::new(Vec::new())),
}
}
pub async fn get_owned_and_potential_leases(&self) -> anyhow::Result<Vec<ManifestState<T>>> {
let now = jiff::Timestamp::now().checked_sub(1.second())?;
let manifests = self
.manifests
.read()
.await
.iter()
.filter(|m| match &m.lease {
Some(lease) if lease.last_seen < now => true,
Some(_lease) => false,
None => true,
})
.cloned()
.collect::<Vec<_>>();
Ok(manifests)
}
pub async fn get_manifests(&self) -> anyhow::Result<Vec<ManifestState<T>>> {
Ok(self.manifests.read().await.clone())
}
pub async fn update_lease(&self, manifest_state: &ManifestState<T>) -> anyhow::Result<()> {
tracing::trace!(manifest_state.manifest.name, "updating lease");
let mut manifests = self.manifests.write().await;
match manifests
.iter_mut()
.find(|m| m.manifest.name == manifest_state.manifest.name)
{
Some(manifest) => {
let mut manifest_state = manifest_state.clone();
if let Some(lease) = manifest_state.lease.as_mut() {
lease.last_seen = jiff::Timestamp::now();
}
manifest.lease = manifest_state.lease
}
None => anyhow::bail!("manifest is not found"),
}
Ok(())
}
pub async fn acquire_lease(
&self,
manifest_state: &ManifestState<T>,
worker_id: &WorkerId,
) -> anyhow::Result<()> {
tracing::trace!(manifest_state.manifest.name, "acquiring lease");
let mut manifests = self.manifests.write().await;
match manifests
.iter_mut()
.find(|m| m.manifest.name == manifest_state.manifest.name)
{
Some(manifest) => {
let mut manifest_state = manifest_state.clone();
manifest_state.lease = Some(ManifestLease {
owner: *worker_id,
last_seen: jiff::Timestamp::now(),
});
manifest.lease = manifest_state.lease
}
None => anyhow::bail!("manifest is not found"),
}
Ok(())
}
pub async fn upsert_manifest(&self, manifest: Manifest<T>) -> anyhow::Result<()> {
let mut manifests = self.manifests.write().await;
let now = jiff::Timestamp::now();
match manifests
.iter_mut()
.find(|m| m.manifest.name == manifest.name)
{
Some(current_manifest) => {
tracing::debug!("updating manifest");
current_manifest.manifest = manifest;
current_manifest.updated = now;
}
None => {
tracing::debug!("adding manifest");
let content = serde_json::to_vec(&manifest)?;
let output = Sha256::digest(&content);
manifests.push(ManifestState {
manifest,
manifest_hash: output[..].to_vec(),
generation: 0,
status: ManifestStatus {
status: ManifestStatusState::Pending,
events: Vec::default(),
},
created: now,
updated: now,
lease: None,
});
}
}
Ok(())
}
pub async fn update_state(&self, manifest: &ManifestState<T>) -> anyhow::Result<()> {
let mut manifests = self.manifests.write().await;
let Some(current_manifest) = manifests
.iter_mut()
.find(|m| m.manifest.name == manifest.manifest.name)
else {
anyhow::bail!(
"manifest state was not found for {}",
manifest.manifest.name
)
};
let manifest = manifest.clone();
current_manifest.generation += 1;
current_manifest.status = manifest.status;
current_manifest.updated = manifest.updated;
Ok(())
}
}

View File

@@ -0,0 +1,77 @@
use anyhow::Context;
use tokio_util::sync::CancellationToken;
use crate::{control_plane::backing_store::BackingStore, manifests::WorkerId, Operator};
#[derive(Clone)]
pub struct Reconciler<T: Operator> {
worker_id: WorkerId,
store: BackingStore<T::Specifications>,
operator: T,
}
impl<T: Operator> Reconciler<T> {
pub fn new(worker_id: WorkerId, store: &BackingStore<T::Specifications>, operator: T) -> Self {
Self {
worker_id,
store: store.clone(),
operator,
}
}
pub async fn reconcile(&self, cancellation_token: &CancellationToken) -> anyhow::Result<()> {
let now = jiff::Timestamp::now();
tracing::debug!(%self.worker_id, %now, "running reconciler");
loop {
let now = jiff::Timestamp::now();
if cancellation_token.is_cancelled() {
break;
}
tracing::trace!(%self.worker_id, %now, "reconciler iteration");
let mut our_manifests = Vec::new();
// 1. read manifests from a backing store
for manifest_state in self.store.get_owned_and_potential_leases().await? {
// 3. Lease the manifest
match &manifest_state.lease {
Some(lease) if lease.owner == self.worker_id => {
// We own the lease, update
self.store
.update_lease(&manifest_state)
.await
.context("update lease")?;
our_manifests.push(manifest_state.clone());
}
None => {
// 2. If no lease
// Acquire lease
self.store
.acquire_lease(&manifest_state, &self.worker_id)
.await
.context("acquire lease")?;
our_manifests.push(manifest_state.clone());
}
_ => {
// Skipping manifest, as it is not vaid
continue;
}
}
}
// 4. Check desired vs actual
for manifest in our_manifests.iter_mut() {
// Currently periodic sync,
// TODO: this should also be made event based
self.operator.reconcile(manifest).await?;
self.store.update_state(manifest).await?;
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
tracing::debug!("reconciler shutting down");
Ok(())
}
}

View File

@@ -0,0 +1,17 @@
mod control_plane;
pub mod manifests;
pub use control_plane::ControlPlane;
use serde::Serialize;
use crate::manifests::{Manifest, ManifestState};
#[async_trait::async_trait]
pub trait Operator {
type Specifications: Clone + Serialize;
async fn reconcile(
&self,
desired_manifest: &mut ManifestState<Self::Specifications>,
) -> anyhow::Result<()>;
}

View File

@@ -0,0 +1,79 @@
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ManifestState<T: Clone> {
pub manifest: Manifest<T>,
pub manifest_hash: Vec<u8>,
pub generation: u64,
pub status: ManifestStatus,
pub created: jiff::Timestamp,
pub updated: jiff::Timestamp,
pub lease: Option<ManifestLease>,
}
pub type WorkerId = uuid::Uuid;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ManifestLease {
pub owner: WorkerId,
pub last_seen: jiff::Timestamp,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Manifest<T> {
pub name: String,
pub metadata: ManifestMetadata,
pub spec: T,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ManifestStatus {
pub status: ManifestStatusState,
pub events: Vec<ManifestEvent>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ManifestStatusState {
Pending,
Started,
Running,
Stopping,
Deleting,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ManifestEvent {
pub owner: WorkerId,
pub created: u64,
pub message: String,
pub state: Option<ManifestStatusState>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ManifestMetadata {}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "@type")]
pub enum ManifestSpecification {
SchemaApplication {},
}
#[cfg(test)]
mod test {
use crate::manifests::*;
#[test]
fn manifest() -> anyhow::Result<()> {
let manifest = Manifest {
name: "ingest".into(),
metadata: ManifestMetadata {},
spec: ManifestSpecification::SchemaApplication {},
};
insta::assert_debug_snapshot!(manifest);
Ok(())
}
}

View File

@@ -0,0 +1,9 @@
---
source: crates/nocontrol/src/manifests.rs
expression: manifest
---
Manifest {
name: "ingest",
metadata: ManifestMetadata,
spec: SchemaApplication,
}

View File

@@ -0,0 +1,91 @@
use async_trait::async_trait;
use nocontrol::{
manifests::{Manifest, ManifestMetadata, ManifestState},
Operator,
};
use serde::{Deserialize, Serialize};
use tracing_test::traced_test;
#[tokio::test]
#[traced_test]
async fn test_can_run_reconciler() -> anyhow::Result<()> {
let operator = MyOperator {};
let mut control_plane = nocontrol::ControlPlane::new(operator);
control_plane.with_deadline(std::time::Duration::from_secs(3));
tokio::spawn({
let control_plane = control_plane.clone();
async move {
control_plane
.add_manifest(Manifest {
name: "some-manifest".into(),
metadata: ManifestMetadata {},
spec: Specifications::Deployment(DeploymentControllerManifest {
name: "some-name".into(),
}),
})
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
control_plane
.add_manifest(Manifest {
name: "some-manifest".into(),
metadata: ManifestMetadata {},
spec: Specifications::Deployment(DeploymentControllerManifest {
name: "some-changed-name".into(),
}),
})
.await
.unwrap();
}
});
control_plane.execute().await?;
Err(anyhow::anyhow!("fail"))
}
#[derive(Clone)]
pub struct MyOperator {}
#[async_trait]
impl Operator for MyOperator {
type Specifications = Specifications;
async fn reconcile(
&self,
desired_manifest: &mut ManifestState<Specifications>,
) -> anyhow::Result<()> {
let now = jiff::Timestamp::now();
desired_manifest.status.status = nocontrol::manifests::ManifestStatusState::Started;
desired_manifest.updated = now;
match &desired_manifest.manifest.spec {
Specifications::Deployment(spec) => {
tracing::info!(
"reconciliation was called for name = {}, value = {}",
desired_manifest.manifest.name,
spec.name
)
}
}
desired_manifest.status.status = nocontrol::manifests::ManifestStatusState::Running;
desired_manifest.updated = now;
Ok(())
}
}
#[derive(Clone, Serialize)]
pub enum Specifications {
Deployment(DeploymentControllerManifest),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeploymentControllerManifest {
name: String,
}