feat: prepare for alternative stores

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
2026-01-17 00:19:18 +01:00
parent f19cea610b
commit 41b6479137
10 changed files with 316 additions and 179 deletions

View File

@@ -2,7 +2,10 @@ use tokio_util::sync::CancellationToken;
use crate::{
Operator, OperatorState,
control_plane::{backing_store::BackingStore, reconciler::Reconciler},
control_plane::{
backing_store::{BackingStore, BackingStoreEdge},
reconciler::Reconciler,
},
manifests::{Manifest, ManifestState},
};
@@ -10,18 +13,22 @@ pub mod backing_store;
pub mod reconciler;
#[derive(Clone)]
pub struct ControlPlane<TOperator: Operator> {
reconciler: Reconciler<TOperator>,
pub struct ControlPlane<TOperator: Operator, TStore: BackingStoreEdge<TOperator::Specifications>> {
reconciler: Reconciler<TOperator, TStore>,
worker_id: uuid::Uuid,
store: BackingStore<TOperator::Specifications>,
store: BackingStore<TOperator::Specifications, TStore>,
deadline: Option<std::time::Duration>,
}
impl<TOperator: Operator> ControlPlane<TOperator> {
pub fn new(operator: OperatorState<TOperator>) -> Self {
impl<TOperator: Operator, TStore: BackingStoreEdge<TOperator::Specifications>>
ControlPlane<TOperator, TStore>
{
pub fn new(
operator: OperatorState<TOperator>,
store: BackingStore<TOperator::Specifications, TStore>,
) -> Self {
let worker_id = uuid::Uuid::now_v7();
let store = BackingStore::<TOperator::Specifications>::new();
let reconciler = Reconciler::new(worker_id, &store, operator);

View File

@@ -1,174 +1,80 @@
use std::sync::Arc;
use std::{marker::PhantomData, ops::Deref};
use jiff::ToSpan;
use sha2::{Digest, Sha256};
use tokio::sync::RwLock;
use uuid::Uuid;
use crate::{
Specification,
manifests::{
Manifest, ManifestChangeEvent, ManifestChangeEventType, ManifestLease, ManifestState,
ManifestStatus, ManifestStatusState, WorkerId,
},
control_plane::backing_store::in_process::BackingStoreInProcess,
manifests::{Manifest, ManifestState, WorkerId},
};
pub mod in_process;
#[derive(Clone)]
pub struct BackingStore<T: Specification> {
manifests: Arc<RwLock<Vec<ManifestState<T>>>>,
pub struct BackingStore<T: Specification, TStore: BackingStoreEdge<T>> {
inner: TStore,
_marker: PhantomData<T>,
}
impl<T: Specification, TStore: BackingStoreEdge<T>> BackingStore<T, TStore> {
pub fn new(store: TStore) -> Self {
Self {
inner: store,
_marker: PhantomData,
}
}
}
impl<T: Specification> BackingStore<T> {
pub fn new() -> Self {
impl<T: Specification, TStore: BackingStoreEdge<T>> Deref for BackingStore<T, TStore> {
type Target = TStore;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T: Specification> BackingStore<T, BackingStoreInProcess<T>> {
pub fn in_process() -> Self {
Self {
manifests: Arc::new(RwLock::new(Vec::new())),
inner: BackingStoreInProcess::new(),
_marker: PhantomData,
}
}
}
pub async fn get_owned_and_potential_leases(
pub trait BackingStoreEdge<T: Specification>: Send + Sync + Clone {
fn get_owned_and_potential_leases(
&self,
worker_id: &WorkerId,
) -> anyhow::Result<Vec<ManifestState<T>>> {
let now = jiff::Timestamp::now().checked_sub(10.second())?;
let manifests = self
.manifests
.read()
.await
.iter()
.filter(|m| match &m.lease {
Some(lease) if lease.last_seen < now => true,
Some(lease) if &lease.owner == worker_id && lease.last_seen > now => true,
Some(_lease) => false,
None => true,
})
.cloned()
.collect::<Vec<_>>();
worker_id: &Uuid,
) -> impl std::future::Future<Output = anyhow::Result<Vec<ManifestState<T>>>> + Send;
Ok(manifests)
}
fn get_manifests(
&self,
) -> impl std::future::Future<Output = anyhow::Result<Vec<ManifestState<T>>>> + Send;
pub async fn get_manifests(&self) -> anyhow::Result<Vec<ManifestState<T>>> {
Ok(self.manifests.read().await.clone())
}
fn get(
&self,
name: &str,
) -> impl std::future::Future<Output = anyhow::Result<Option<ManifestState<T>>>> + Send;
pub async fn get(&self, name: &str) -> anyhow::Result<Option<ManifestState<T>>> {
Ok(self
.manifests
.read()
.await
.iter()
.find(|m| m.manifest.name == name)
.cloned())
}
fn update_lease(
&self,
manifest_state: &ManifestState<T>,
) -> impl std::future::Future<Output = anyhow::Result<()>> + Send;
pub async fn update_lease(&self, manifest_state: &ManifestState<T>) -> anyhow::Result<()> {
tracing::trace!(manifest_state.manifest.name, "updating lease");
let mut manifests = self.manifests.write().await;
match manifests
.iter_mut()
.find(|m| m.manifest.name == manifest_state.manifest.name)
{
Some(manifest) => {
let mut manifest_state = manifest_state.clone();
if let Some(lease) = manifest_state.lease.as_mut() {
lease.last_seen = jiff::Timestamp::now();
}
manifest.lease = manifest_state.lease
}
None => anyhow::bail!("manifest is not found"),
}
Ok(())
}
pub async fn acquire_lease(
fn acquire_lease(
&self,
manifest_state: &ManifestState<T>,
worker_id: &WorkerId,
) -> anyhow::Result<()> {
tracing::trace!(manifest_state.manifest.name, "acquiring lease");
let mut manifests = self.manifests.write().await;
) -> impl std::future::Future<Output = anyhow::Result<()>> + Send;
match manifests
.iter_mut()
.find(|m| m.manifest.name == manifest_state.manifest.name)
{
Some(manifest) => {
let mut manifest_state = manifest_state.clone();
manifest_state.lease = Some(ManifestLease {
owner: *worker_id,
last_seen: jiff::Timestamp::now(),
});
manifest.lease = manifest_state.lease
}
None => anyhow::bail!("manifest is not found"),
}
fn upsert_manifest(
&self,
manifest: Manifest<T>,
) -> impl std::future::Future<Output = anyhow::Result<()>> + Send;
Ok(())
}
pub async fn upsert_manifest(&self, manifest: Manifest<T>) -> anyhow::Result<()> {
let mut manifests = self.manifests.write().await;
let now = jiff::Timestamp::now();
match manifests
.iter_mut()
.find(|m| m.manifest.name == manifest.name)
{
Some(current_manifest) => {
tracing::debug!("updating manifest");
current_manifest.manifest = manifest;
current_manifest.updated = now;
}
None => {
tracing::debug!("adding manifest");
let content = serde_json::to_vec(&manifest)?;
let output = Sha256::digest(&content);
manifests.push(ManifestState {
manifest,
manifest_hash: output[..].to_vec(),
generation: 0,
status: ManifestStatus {
status: ManifestStatusState::Pending,
events: Vec::default(),
changes: vec![ManifestChangeEvent {
created: now,
handled: false,
event: ManifestChangeEventType::Changed,
}],
},
created: now,
updated: now,
lease: None,
});
}
}
Ok(())
}
pub async fn update_state(&self, manifest: &ManifestState<T>) -> anyhow::Result<()> {
let mut manifests = self.manifests.write().await;
let Some(current_manifest) = manifests
.iter_mut()
.find(|m| m.manifest.name == manifest.manifest.name)
else {
anyhow::bail!(
"manifest state was not found for {}",
manifest.manifest.name
)
};
let manifest = manifest.clone();
current_manifest.generation += 1;
current_manifest.status = manifest.status;
current_manifest.updated = manifest.updated;
Ok(())
}
fn update_state(
&self,
manifest: &ManifestState<T>,
) -> impl std::future::Future<Output = anyhow::Result<()>> + Send;
}

View File

@@ -0,0 +1,177 @@
use std::sync::Arc;
use jiff::ToSpan;
use sha2::{Digest, Sha256};
use tokio::sync::RwLock;
use crate::{
Specification,
control_plane::backing_store::BackingStoreEdge,
manifests::{
Manifest, ManifestChangeEvent, ManifestChangeEventType, ManifestLease, ManifestState,
ManifestStatus, ManifestStatusState, WorkerId,
},
};
#[derive(Clone, Default)]
pub struct BackingStoreInProcess<T: Specification> {
manifests: Arc<RwLock<Vec<ManifestState<T>>>>,
}
impl<T: Specification> BackingStoreEdge<T> for BackingStoreInProcess<T> {
async fn get_owned_and_potential_leases(
&self,
worker_id: &WorkerId,
) -> anyhow::Result<Vec<ManifestState<T>>> {
let now = jiff::Timestamp::now().checked_sub(10.second())?;
let manifests = self
.manifests
.read()
.await
.iter()
.filter(|m| match &m.lease {
Some(lease) if lease.last_seen < now => true,
Some(lease) if &lease.owner == worker_id && lease.last_seen > now => true,
Some(_lease) => false,
None => true,
})
.cloned()
.collect::<Vec<_>>();
Ok(manifests)
}
async fn get_manifests(&self) -> anyhow::Result<Vec<ManifestState<T>>> {
Ok(self.manifests.read().await.clone())
}
async fn get(&self, name: &str) -> anyhow::Result<Option<ManifestState<T>>> {
Ok(self
.manifests
.read()
.await
.iter()
.find(|m| m.manifest.name == name)
.cloned())
}
async fn update_lease(&self, manifest_state: &ManifestState<T>) -> anyhow::Result<()> {
tracing::trace!(manifest_state.manifest.name, "updating lease");
let mut manifests = self.manifests.write().await;
match manifests
.iter_mut()
.find(|m| m.manifest.name == manifest_state.manifest.name)
{
Some(manifest) => {
let mut manifest_state = manifest_state.clone();
if let Some(lease) = manifest_state.lease.as_mut() {
lease.last_seen = jiff::Timestamp::now();
}
manifest.lease = manifest_state.lease
}
None => anyhow::bail!("manifest is not found"),
}
Ok(())
}
async fn acquire_lease(
&self,
manifest_state: &ManifestState<T>,
worker_id: &WorkerId,
) -> anyhow::Result<()> {
tracing::trace!(manifest_state.manifest.name, "acquiring lease");
let mut manifests = self.manifests.write().await;
match manifests
.iter_mut()
.find(|m| m.manifest.name == manifest_state.manifest.name)
{
Some(manifest) => {
let mut manifest_state = manifest_state.clone();
manifest_state.lease = Some(ManifestLease {
owner: *worker_id,
last_seen: jiff::Timestamp::now(),
});
manifest.lease = manifest_state.lease
}
None => anyhow::bail!("manifest is not found"),
}
Ok(())
}
async fn upsert_manifest(&self, manifest: Manifest<T>) -> anyhow::Result<()> {
let mut manifests = self.manifests.write().await;
let now = jiff::Timestamp::now();
match manifests
.iter_mut()
.find(|m| m.manifest.name == manifest.name)
{
Some(current_manifest) => {
tracing::debug!("updating manifest");
current_manifest.manifest = manifest;
current_manifest.updated = now;
}
None => {
tracing::debug!("adding manifest");
let content = serde_json::to_vec(&manifest)?;
let output = Sha256::digest(&content);
manifests.push(ManifestState {
manifest,
manifest_hash: output[..].to_vec(),
generation: 0,
status: ManifestStatus {
status: ManifestStatusState::Pending,
events: Vec::default(),
changes: vec![ManifestChangeEvent {
created: now,
handled: false,
event: ManifestChangeEventType::Changed,
}],
},
created: now,
updated: now,
lease: None,
});
}
}
Ok(())
}
async fn update_state(&self, manifest: &ManifestState<T>) -> anyhow::Result<()> {
let mut manifests = self.manifests.write().await;
let Some(current_manifest) = manifests
.iter_mut()
.find(|m| m.manifest.name == manifest.manifest.name)
else {
anyhow::bail!(
"manifest state was not found for {}",
manifest.manifest.name
)
};
let manifest = manifest.clone();
current_manifest.generation += 1;
current_manifest.status = manifest.status;
current_manifest.updated = manifest.updated;
Ok(())
}
}
impl<T: Specification> BackingStoreInProcess<T> {
pub fn new() -> Self {
Self {
manifests: Arc::new(RwLock::new(Vec::new())),
}
}
}

View File

@@ -4,23 +4,23 @@ use tokio_util::sync::CancellationToken;
use crate::{
Operator, OperatorState,
control_plane::backing_store::BackingStore,
control_plane::backing_store::{BackingStore, BackingStoreEdge},
manifests::{Action, ManifestName, ManifestState, ManifestStatusState, WorkerId},
reconcile_queue::ReconcileQueue,
};
#[derive(Clone)]
pub struct Reconciler<T: Operator> {
pub struct Reconciler<T: Operator, TStore: BackingStoreEdge<T::Specifications>> {
worker_id: WorkerId,
store: BackingStore<T::Specifications>,
store: BackingStore<T::Specifications, TStore>,
operator: OperatorState<T>,
reconcile_queue: ReconcileQueue<T::Specifications>,
}
impl<T: Operator> Reconciler<T> {
impl<T: Operator, TStore: BackingStoreEdge<T::Specifications>> Reconciler<T, TStore> {
pub fn new(
worker_id: WorkerId,
store: &BackingStore<T::Specifications>,
store: &BackingStore<T::Specifications, TStore>,
operator: OperatorState<T>,
) -> Self {
Self {

View File

@@ -1,6 +1,10 @@
mod control_plane;
pub use control_plane::ControlPlane;
pub mod stores {
pub use crate::control_plane::backing_store::*;
}
pub mod manifests;
mod operator_state;

View File

@@ -2,7 +2,7 @@ use serde::{Serialize, de::DeserializeOwned};
use crate::manifests::{Action, ManifestState};
pub trait Specification: Clone + Serialize + DeserializeOwned {
pub trait Specification: Clone + Serialize + DeserializeOwned + Send + Sync + 'static {
fn kind(&self) -> &'static str;
}

View File

@@ -1,6 +1,7 @@
use nocontrol::{
Operator, OperatorState, Specification,
manifests::{Action, Manifest, ManifestMetadata, ManifestState},
stores::BackingStore,
};
use serde::{Deserialize, Serialize};
use tracing_test::traced_test;
@@ -9,7 +10,9 @@ use tracing_test::traced_test;
#[traced_test]
async fn test_can_run_reconciler() -> anyhow::Result<()> {
let operator = OperatorState::new(MyOperator {});
let mut control_plane = nocontrol::ControlPlane::new(operator);
let store = BackingStore::in_process();
let mut control_plane = nocontrol::ControlPlane::new(operator, store);
control_plane.with_deadline(std::time::Duration::from_secs(3));
tokio::spawn({