Compare commits

..

4 Commits

Author SHA1 Message Date
4b672ff800 chore: remove unused code
Signed-off-by: kjuulh <contact@kjuulh.io>
2026-01-18 23:02:26 +01:00
2729d674f8 feat: add postgresql example
Signed-off-by: kjuulh <contact@kjuulh.io>
2026-01-18 22:54:36 +01:00
2b7014e038 feat: add postgresql backing store
Signed-off-by: kjuulh <contact@kjuulh.io>
2026-01-18 22:53:15 +01:00
e0d6172e21 feat: add initial postgres
Signed-off-by: kjuulh <contact@kjuulh.io>
2026-01-18 22:53:15 +01:00
16 changed files with 2186 additions and 89 deletions

1
.env Normal file
View File

@@ -0,0 +1 @@
DATABASE_URL=postgres://devuser:devpassword@localhost:5432/dev

1351
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -24,6 +24,19 @@ serde_json = "1.0.148"
sha2 = "0.10.9"
tokio-util = "0.7.18"
sqlx = { version = "0.8.6", optional = true, features = [
"chrono",
"json",
"postgres",
"runtime-tokio",
"uuid",
] }
[dev-dependencies]
insta = "1.46.0"
tracing-test = { version = "0.2.5", features = ["no-env-filter"] }
[features]
default = []
postgres = ["dep:sqlx"]

View File

@@ -0,0 +1,21 @@
-- Add migration script here
create table manifests (
id UUID PRIMARY KEY NOT NULL,
generation BIGINT NOT NULL,
name TEXT NOT NULL,
kind TEXT NOT NULL,
status JSONB NOT NULL,
manifest_content JSONB NOT NULL,
manifest_hash BYTEA NOT NULL,
created TIMESTAMPTZ NOT NULL,
updated TIMESTAMPTZ NOT NULL,
lease_owner_id UUID,
lease_last_updated TIMESTAMPTZ
);
CREATE UNIQUE INDEX idx_manifest_name ON manifests(name, kind);

View File

@@ -17,6 +17,7 @@ pub struct ControlPlane<TOperator: Operator, TStore: BackingStoreEdge<TOperator:
reconciler: Reconciler<TOperator, TStore>,
worker_id: uuid::Uuid,
store: BackingStore<TOperator::Specifications, TStore>,
cancellation: CancellationToken,
deadline: Option<std::time::Duration>,
}
@@ -33,6 +34,7 @@ impl<TOperator: Operator, TStore: BackingStoreEdge<TOperator::Specifications>>
let reconciler = Reconciler::new(worker_id, &store, operator);
Self {
cancellation: CancellationToken::new(),
reconciler,
worker_id,
deadline: None,
@@ -59,12 +61,24 @@ impl<TOperator: Operator, TStore: BackingStoreEdge<TOperator::Specifications>>
let cancellation_token = cancellation;
let child_token = cancellation_token.child_token();
if let Some(deadline) = self.deadline {
tokio::spawn(async move {
tokio::time::sleep(deadline).await;
cancellation_token.cancel();
tokio::spawn({
let cancellation_token = cancellation_token.clone();
async move {
tokio::time::sleep(deadline).await;
cancellation_token.cancel();
}
});
tokio::spawn({
let self_cancel = self.cancellation.child_token();
async move {
self_cancel.cancelled().await;
cancellation_token.cancel();
}
});
}
tracing::debug_span!("reconcile", worker = self.worker_id.to_string());
self.reconciler.reconcile(&child_token).await?;
Ok(())
@@ -86,4 +100,8 @@ impl<TOperator: Operator, TStore: BackingStoreEdge<TOperator::Specifications>>
) -> anyhow::Result<Vec<ManifestState<TOperator::Specifications>>> {
self.store.get_manifests().await
}
pub async fn shutdown(&self) -> anyhow::Result<()> {
Ok(())
}
}

View File

@@ -10,6 +10,9 @@ use crate::{
pub mod in_process;
#[cfg(feature = "postgres")]
pub mod postgres;
#[derive(Clone)]
pub struct BackingStore<T: Specification, TStore: BackingStoreEdge<T>> {
inner: TStore,
@@ -42,6 +45,16 @@ impl<T: Specification> BackingStore<T, BackingStoreInProcess<T>> {
}
}
#[cfg(feature = "postgres")]
impl<T: Specification> BackingStore<T, postgres::BackingStorePostgres<T>> {
pub async fn postgres(database_url: &str) -> anyhow::Result<Self> {
Ok(Self {
inner: postgres::BackingStorePostgres::new(database_url).await?,
_marker: PhantomData,
})
}
}
pub trait BackingStoreEdge<T: Specification>: Send + Sync + Clone {
fn get_owned_and_potential_leases(
&self,
@@ -68,6 +81,12 @@ pub trait BackingStoreEdge<T: Specification>: Send + Sync + Clone {
worker_id: &WorkerId,
) -> impl std::future::Future<Output = anyhow::Result<()>> + Send;
fn delete_lease(
&self,
manifest: &ManifestState<T>,
worker_id: &WorkerId,
) -> impl std::future::Future<Output = anyhow::Result<()>> + Send;
fn upsert_manifest(
&self,
manifest: Manifest<T>,

View File

@@ -166,6 +166,23 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStoreInProcess<T> {
Ok(())
}
async fn delete_lease(
&self,
manifest: &ManifestState<T>,
_worker_id: &WorkerId,
) -> anyhow::Result<()> {
let mut manifests = self.manifests.write().await;
if let Some(manifest) = manifests
.iter_mut()
.find(|m| m.manifest.name == manifest.manifest.name)
{
manifest.lease = None;
}
Ok(())
}
}
impl<T: Specification> BackingStoreInProcess<T> {

View File

@@ -0,0 +1,400 @@
use std::marker::PhantomData;
use anyhow::Context;
use jiff::Timestamp;
use sha2::Digest;
use sqlx::PgPool;
use crate::{
Specification,
manifests::{
Manifest, ManifestLease, ManifestState, ManifestStatus, ManifestStatusState, WorkerId,
},
stores::BackingStoreEdge,
};
#[derive(Clone)]
pub struct BackingStorePostgres<T: Specification> {
pool: PgPool,
_marker: PhantomData<T>,
}
impl<T: Specification> BackingStorePostgres<T> {
pub(crate) async fn new(database_url: &str) -> anyhow::Result<Self> {
tracing::debug!("connecting to postgres database");
let pool = sqlx::PgPool::connect(database_url)
.await
.context("failed to connect to database")?;
tracing::debug!("migrating database");
sqlx::migrate!("migrations/postgres/")
.run(&pool)
.await
.context("failed to migrate")?;
Ok(Self {
_marker: PhantomData,
pool,
})
}
}
impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
async fn get_owned_and_potential_leases(
&self,
worker_id: &uuid::Uuid,
) -> anyhow::Result<Vec<crate::manifests::ManifestState<T>>> {
let recs = sqlx::query!(
r#"
SELECT
id,
generation,
name,
kind,
status,
manifest_content,
manifest_hash,
created,
updated,
lease_owner_id,
lease_last_updated
FROM
manifests
WHERE
lease_last_updated < now() - INTERVAL '30 seconds'
OR (lease_owner_id = $1 AND lease_last_updated > now() - INTERVAL '15 seconds')
OR lease_owner_id IS NULL
"#,
worker_id
)
.fetch_all(&self.pool)
.await?;
recs.into_iter()
.map(|r| {
let content: Manifest<T> = serde_json::from_value(r.manifest_content)?;
Ok(ManifestState {
manifest: content,
manifest_hash: r.manifest_hash,
generation: r.generation as u64,
status: serde_json::from_value(r.status)?,
created: Timestamp::from_millisecond(r.created.timestamp_millis())?,
updated: Timestamp::from_millisecond(r.updated.timestamp_millis())?,
lease: {
match (r.lease_owner_id, r.lease_last_updated) {
(Some(owner_id), Some(last_updated)) => Some(ManifestLease {
owner: owner_id,
last_seen: Timestamp::from_millisecond(
last_updated.timestamp_millis(),
)?,
}),
(_, _) => None,
}
},
})
})
.collect::<anyhow::Result<Vec<_>>>()
}
async fn get_manifests(&self) -> anyhow::Result<Vec<crate::manifests::ManifestState<T>>> {
let recs = sqlx::query!(
r#"
SELECT
id,
generation,
name,
kind,
status,
manifest_content,
manifest_hash,
created,
updated,
lease_owner_id,
lease_last_updated
FROM
manifests
"#
)
.fetch_all(&self.pool)
.await
.context("failed to get manifests from database")?;
recs.into_iter()
.map(|r| {
let content: Manifest<T> = serde_json::from_value(r.manifest_content)?;
Ok(ManifestState {
manifest: content,
manifest_hash: r.manifest_hash,
generation: r.generation as u64,
status: serde_json::from_value(r.status)?,
created: Timestamp::from_millisecond(r.created.timestamp_millis())?,
updated: Timestamp::from_millisecond(r.updated.timestamp_millis())?,
lease: {
match (r.lease_owner_id, r.lease_last_updated) {
(Some(owner_id), Some(last_updated)) => Some(ManifestLease {
owner: owner_id,
last_seen: Timestamp::from_millisecond(
last_updated.timestamp_millis(),
)?,
}),
(_, _) => None,
}
},
})
})
.collect::<anyhow::Result<Vec<_>>>()
}
async fn get(&self, name: &str) -> anyhow::Result<Option<ManifestState<T>>> {
let rec = sqlx::query!(
r#"
SELECT
id,
generation,
name,
kind,
status,
manifest_content,
manifest_hash,
created,
updated,
lease_owner_id,
lease_last_updated
FROM
manifests
WHERE
name = $1
"#,
name
)
.fetch_optional(&self.pool)
.await
.context("failed to get")?;
let Some(rec) = rec else { return Ok(None) };
let content: Manifest<T> = serde_json::from_value(rec.manifest_content)?;
Ok(Some(ManifestState {
manifest: content,
manifest_hash: rec.manifest_hash,
generation: rec.generation as u64,
status: serde_json::from_value(rec.status)?,
created: Timestamp::from_millisecond(rec.created.timestamp_millis())?,
updated: Timestamp::from_millisecond(rec.updated.timestamp_millis())?,
lease: {
match (rec.lease_owner_id, rec.lease_last_updated) {
(Some(owner_id), Some(last_updated)) => Some(ManifestLease {
owner: owner_id,
last_seen: Timestamp::from_millisecond(last_updated.timestamp_millis())?,
}),
(_, _) => None,
}
},
}))
}
async fn update_lease(
&self,
manifest_state: &crate::manifests::ManifestState<T>,
) -> anyhow::Result<()> {
let resp = sqlx::query!(
r#"
UPDATE manifests
SET
lease_last_updated = now()
WHERE
name = $1
AND kind = $2
AND generation = $3
-- AND owner_id = $4
"#,
manifest_state.manifest.name,
manifest_state.manifest.spec.kind(),
manifest_state.generation as i64,
// worker_id,
)
.execute(&self.pool)
.await
.context("failed to update lease")?;
if resp.rows_affected() == 0 {
anyhow::bail!("failed to update lease, the host is no longer the owner")
}
Ok(())
}
async fn acquire_lease(
&self,
manifest_state: &ManifestState<T>,
worker_id: &WorkerId,
) -> anyhow::Result<()> {
let name = &manifest_state.manifest.name;
let kind = manifest_state.manifest.spec.kind();
let generation = manifest_state.generation;
let resp = sqlx::query!(
r#"
UPDATE manifests
SET
lease_owner_id = $4,
lease_last_updated = now(),
generation = $3 + 1
WHERE
name = $1
AND kind = $2
AND generation = $3
"#,
name,
kind,
generation as i64,
worker_id
)
.execute(&self.pool)
.await
.context("failed to acquire lease")?;
if resp.rows_affected() == 0 {
anyhow::bail!("failed to acquire lease: {}/{}@{}", kind, name, generation);
}
// TODO: maybe we should update fence as well
// manifest_state.generation = generation + 1;
Ok(())
}
async fn upsert_manifest(&self, manifest: crate::manifests::Manifest<T>) -> anyhow::Result<()> {
let id = uuid::Uuid::now_v7();
let name = &manifest.name;
let kind = manifest.spec.kind();
let content = serde_json::to_value(&manifest)?;
let hash = &sha2::Sha256::digest(serde_json::to_vec(&content)?)[..];
let status = serde_json::to_value(ManifestStatus {
status: ManifestStatusState::Pending,
events: vec![],
changes: vec![],
})?;
sqlx::query!(
r#"
INSERT INTO manifests (
id,
generation,
name,
kind,
status,
manifest_content,
manifest_hash,
lease_owner_id,
lease_last_updated,
created,
updated
) VALUES (
$1,
0,
$2,
$3,
$4,
$5,
$6,
NULL,
NULL,
now(),
now()
)
ON CONFLICT (name, kind) DO UPDATE
SET
manifest_content = $5,
updated = now()
"#,
id,
name,
kind,
status,
content,
hash
)
.execute(&self.pool)
.await
.context("failed to upsert manifest")?;
Ok(())
}
async fn update_state(
&self,
manifest: &crate::manifests::ManifestState<T>,
) -> anyhow::Result<()> {
let generation = manifest.generation;
let status = serde_json::to_value(&manifest.status)?;
let resp = sqlx::query!(
r#"
UPDATE manifests
SET
generation = $3 + 1,
status = $4,
updated = now()
WHERE
name = $1
AND kind = $2
AND generation = $3
"#,
manifest.manifest.name,
manifest.manifest.spec.kind(),
generation as i32,
status
)
.execute(&self.pool)
.await
.context("failed to update state")?;
if resp.rows_affected() == 0 {
anyhow::bail!("failed to update state")
}
Ok(())
}
async fn delete_lease(
&self,
manifest: &ManifestState<T>,
worker_id: &WorkerId,
) -> anyhow::Result<()> {
let name = &manifest.manifest.name;
let kind = manifest.manifest.spec.kind();
let generation = manifest.generation;
let resp = sqlx::query!(
r#"
UPDATE manifests
SET
lease_owner_id = NULL,
lease_last_updated = NULL
WHERE
name = $1
AND kind = $2
AND generation = $3
AND lease_owner_id = $4
"#,
name,
kind,
generation as i64,
worker_id,
)
.execute(&self.pool)
.await
.context("failed to update lease")?;
if resp.rows_affected() == 0 {
anyhow::bail!("failed to delete lease, the host is no longer the owner")
}
Ok(())
}
}

View File

@@ -55,6 +55,8 @@ impl<T: Operator, TStore: BackingStoreEdge<T::Specifications>> Reconciler<T, TSt
}
tracing::debug!("reconciler shutting down");
self.relinquish_manifests().await?;
Ok(())
}
@@ -73,7 +75,27 @@ impl<T: Operator, TStore: BackingStoreEdge<T::Specifications>> Reconciler<T, TSt
tracing::warn!(error = %e, "failed to sync manifests");
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
Ok(())
}
async fn relinquish_manifests(&self) -> anyhow::Result<()> {
tracing::info!("relinquishing all known manifests");
let manifests = self
.store
.get_owned_and_potential_leases(&self.worker_id)
.await?;
for manifest in manifests {
if let Some(lease) = &manifest.lease
&& lease.owner == self.worker_id
&& let Err(e) = self.store.delete_lease(&manifest, &self.worker_id).await
{
tracing::warn!("failed to relinquish manifest: {e:#}");
}
}
Ok(())
@@ -81,15 +103,20 @@ impl<T: Operator, TStore: BackingStoreEdge<T::Specifications>> Reconciler<T, TSt
/// Single sync iteration - check for manifests, acquire leases, enqueue work.
async fn sync_once(&self) -> anyhow::Result<()> {
for manifest_state in self
let manifests = self
.store
.get_owned_and_potential_leases(&self.worker_id)
.await?
{
.await?;
tracing::trace!(manifests = manifests.len(), "sync once manifests");
for manifest_state in manifests {
let manifest_name = manifest_state.manifest.name.clone();
match &manifest_state.lease {
Some(lease) if lease.owner == self.worker_id => {
tracing::trace!("updating lease");
// We own the lease, update it
self.store
.update_lease(&manifest_state)
@@ -101,7 +128,9 @@ impl<T: Operator, TStore: BackingStoreEdge<T::Specifications>> Reconciler<T, TSt
self.reconcile_queue.enqueue(manifest_name).await;
}
}
None => {
_ => {
tracing::trace!("acquiring lease");
// No lease, try to acquire
self.store
.acquire_lease(&manifest_state, &self.worker_id)
@@ -111,10 +140,6 @@ impl<T: Operator, TStore: BackingStoreEdge<T::Specifications>> Reconciler<T, TSt
// Enqueue for reconciliation
self.reconcile_queue.enqueue(manifest_name).await;
}
_ => {
// Someone else owns the lease, skip
continue;
}
}
}
@@ -218,7 +243,13 @@ impl<T: Operator, TStore: BackingStoreEdge<T::Specifications>> Reconciler<T, TSt
match &manifest.lease {
Some(lease) if lease.owner == self.worker_id => {}
_ => {
tracing::debug!(%manifest_name, "we don't own the lease, skipping");
tracing::debug!(%manifest_name, "we don't own the lease, shutting down owned resources");
self.operator
.on_lease_lost(&manifest)
.await
.map_err(|_e| anyhow::anyhow!("failed handle lease lost event"))?;
return Ok(());
}
}

View File

@@ -16,6 +16,13 @@ pub trait Operator: Send + Sync + 'static {
desired_manifest: &mut ManifestState<Self::Specifications>,
) -> impl Future<Output = Result<Action, Self::Error>>;
fn on_lease_lost(
&self,
manifest: &ManifestState<Self::Specifications>,
) -> impl Future<Output = Result<(), Self::Error>> {
async { Ok(()) }
}
fn on_error(
&self,
desired_manifest: &mut ManifestState<Self::Specifications>,

View File

@@ -41,6 +41,13 @@ impl<T: Operator> Operator for OperatorState<T> {
self.inner.lock().await.reconcile(desired_manifest).await
}
async fn on_lease_lost(
&self,
manifest: &crate::manifests::ManifestState<Self::Specifications>,
) -> Result<(), Self::Error> {
self.inner.lock().await.on_lease_lost(manifest).await
}
async fn on_error(
&self,
desired_manifest: &mut crate::manifests::ManifestState<Self::Specifications>,

1
examples/postgres-backed/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
/target

View File

@@ -0,0 +1,18 @@
[package]
name = "postgres-backed"
edition = "2024"
version.workspace = true
[dependencies]
nocontrol = { workspace = true, features = ["postgres"] }
nocontrol-tui.workspace = true
anyhow.workspace = true
tokio.workspace = true
serde.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true
rand.workspace = true
uuid.workspace = true
noprocess.workspace = true
tokio-util = { version = "0.7", features = ["rt"] }

View File

@@ -0,0 +1,299 @@
use std::{collections::HashMap, future::Future, sync::Arc, time::Duration};
use anyhow::Context;
use nocontrol::{
Operator, OperatorState, Specification,
manifests::{Action, Manifest, ManifestMetadata, ManifestState, ManifestStatusState},
stores::BackingStore,
};
use noprocess::{HandleID, Process, ProcessHandler, ProcessManager, ProcessResult, ProcessState};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use tracing_subscriber::EnvFilter;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cancellation_parent = CancellationToken::new();
let cancellation = cancellation_parent.child_token();
tokio::spawn({
async move {
let _ = tokio::signal::ctrl_c().await;
cancellation_parent.cancel();
}
});
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::from_default_env()
.add_directive("nocontrol=trace".parse().unwrap())
.add_directive("postgres_backend=trace".parse().unwrap())
.add_directive("sqlx=warn".parse().unwrap())
.add_directive("debug".parse().unwrap()),
)
.with_file(false)
.with_line_number(false)
.with_target(false)
.without_time()
.init();
let database_url = "postgres://devuser:devpassword@localhost:5432/dev";
let process_manager = ProcessManager::new();
let operator = OperatorState::new(ProcessOperator::new(process_manager));
let control_plane =
nocontrol::ControlPlane::new(operator, BackingStore::postgres(database_url).await?);
// Add initial process manifest - desired state is Running
control_plane
.add_manifest(Manifest {
name: "worker-process".into(),
metadata: ManifestMetadata {},
spec: Specifications::Process(ProcessManifest {
name: "data-worker".into(),
desired_state: DesiredProcessState::Running,
}),
})
.await?;
// Spawn control plane
tokio::spawn({
let control_plane = control_plane.clone();
async move {
let _ = control_plane.execute().await;
}
});
// Run TUI
nocontrol_tui::run(control_plane).await?;
Ok(())
}
// The operator that manages processes via noprocess
#[derive(Clone)]
pub struct ProcessOperator {
process_manager: ProcessManager,
// Maps manifest name -> process handle ID in the ProcessManager
process_ids: Arc<RwLock<HashMap<String, HandleID>>>,
}
impl ProcessOperator {
pub fn new(process_manager: ProcessManager) -> Self {
Self {
process_manager,
process_ids: Arc::new(RwLock::new(HashMap::new())),
}
}
}
impl Operator for ProcessOperator {
type Specifications = Specifications;
type Error = anyhow::Error;
async fn reconcile(
&self,
manifest_state: &mut ManifestState<Specifications>,
) -> anyhow::Result<Action> {
match &manifest_state.manifest.spec {
Specifications::Process(spec) => {
let manifest_name = &manifest_state.manifest.name;
// Get or create the process
let process_id = {
let ids = self.process_ids.read().await;
ids.get(manifest_name).cloned()
};
match spec.desired_state {
DesiredProcessState::Running => {
if let Some(id) = &process_id {
// Check if process is already running
let status = self.process_manager.get_process_state(id).await;
match status {
Some(ProcessState::Running) => {
// Process is running as desired
manifest_state.status.status = ManifestStatusState::Running;
tracing::info!("Process {} is running as desired", spec.name);
}
Some(ProcessState::Pending) | Some(ProcessState::Stopped) => {
// Process is pending or stopped, start it
tracing::info!(
"Process {} is pending/stopped, starting",
spec.name
);
self.process_manager.start_process(id).await?;
manifest_state.status.status = ManifestStatusState::Started;
}
Some(ProcessState::Errored) => {
// Process errored, update status and try to restart
tracing::warn!(
"Process {} errored, attempting restart",
spec.name
);
manifest_state.status.status = ManifestStatusState::Errored;
// Try to restart
let _ = self.process_manager.restart_process(id).await;
}
None => {
// Process doesn't exist in manager, recreate
tracing::info!("Process {} not found, creating new", spec.name);
let new_id = self
.process_manager
.add_process(Process::new(WorkerProcess {
name: spec.name.clone(),
}))
.await;
self.process_manager.start_process(&new_id).await?;
self.process_ids
.write()
.await
.insert(manifest_name.clone(), new_id);
manifest_state.status.status = ManifestStatusState::Started;
}
}
} else {
// No process exists yet, create and start one
tracing::info!("Creating and starting process {}", spec.name);
let id = self
.process_manager
.add_process(Process::new(WorkerProcess {
name: spec.name.clone(),
}))
.await;
self.process_manager.start_process(&id).await?;
self.process_ids
.write()
.await
.insert(manifest_name.clone(), id);
manifest_state.status.status = ManifestStatusState::Started;
}
}
DesiredProcessState::Stopped => {
if let Some(id) = &process_id {
let status = self.process_manager.get_process_state(id).await;
match status {
Some(ProcessState::Running) => {
// Process is running but should be stopped
tracing::info!("Stopping process {} as requested", spec.name);
manifest_state.status.status = ManifestStatusState::Stopping;
self.process_manager.stop_process(id).await?;
}
Some(ProcessState::Pending) | Some(ProcessState::Stopped) => {
// Already pending/stopped as desired
manifest_state.status.status = ManifestStatusState::Pending;
tracing::info!(
"Process {} is pending/stopped as desired",
spec.name
);
}
Some(ProcessState::Errored) => {
// Errored and should be stopped - that's fine
tracing::info!(
"Process {} errored, desired state is stopped",
spec.name
);
manifest_state.status.status = ManifestStatusState::Errored;
}
None => {
// Doesn't exist, which is fine for stopped state
manifest_state.status.status = ManifestStatusState::Pending;
}
}
} else {
// No process exists, which is fine for stopped state
manifest_state.status.status = ManifestStatusState::Pending;
}
}
}
}
}
Ok(Action::Requeue(Duration::from_secs(5)))
}
async fn on_lease_lost(
&self,
manifest: &ManifestState<Self::Specifications>,
) -> Result<(), Self::Error> {
let process_id = {
let ids = self.process_ids.write().await;
ids.get(&manifest.manifest.name).cloned()
};
let Some(process_id) = process_id else {
tracing::info!("found no process, skipping");
return Ok(());
};
tracing::info!("stopping process");
self.process_manager
.stop_process(&process_id)
.await
.context("stop process")?;
Ok(())
}
}
// A simple worker process that does periodic work
#[derive(Clone)]
struct WorkerProcess {
name: String,
}
impl ProcessHandler for WorkerProcess {
fn call(&self, cancel: CancellationToken) -> impl Future<Output = ProcessResult> + Send {
let name = self.name.clone();
async move {
tracing::info!("Worker {} started", name);
let mut iteration = 0u64;
loop {
tokio::select! {
_ = cancel.cancelled() => {
tracing::info!("Worker {} received cancellation, shutting down gracefully", name);
break;
}
_ = tokio::time::sleep(Duration::from_secs(2)) => {
iteration += 1;
tracing::info!("Worker {} iteration {}: doing work...", name, iteration);
}
}
}
tracing::info!("Worker {} stopped", name);
Ok(())
}
}
}
// Specifications enum for the operator
#[derive(Clone, Serialize, Deserialize)]
pub enum Specifications {
Process(ProcessManifest),
}
impl Specification for Specifications {
fn kind(&self) -> &'static str {
match self {
Specifications::Process(_) => "process",
}
}
}
// The manifest specification for a process
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProcessManifest {
pub name: String,
pub desired_state: DesiredProcessState,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum DesiredProcessState {
Running,
Stopped,
}

View File

@@ -1,4 +1,5 @@
[env]
_.file = ".env"
RUST_LOG = "nocontrol=debug,info"
[tasks.test]
@@ -8,3 +9,28 @@ run = "cargo nextest run"
[tasks.example]
alias = "e"
run = "cargo run --bin kubernetes-like"
[tasks."example:postgres"]
run = "cargo run --bin postgres-backed"
[tasks."local:up"]
run = "docker compose -f ./templates/docker/docker-compose.yml up -d --remove-orphans --wait"
[tasks."local:down"]
run = "docker compose -f ./templates/docker/docker-compose.yml down"
[tasks."local:logs"]
run = "docker compose -f ./templates/docker/docker-compose.yml logs -f --tail=500"
[tasks."db:migrate"]
run = """
mise run local:down
sleep 1
mise run local:up
cargo sqlx migrate run --source ./crates/nocontrol/migrations/postgres
"""
[tasks."db:shell"]
env = {PGPASSWORD = "devpassword"}
run = "psql -h localhost -p 5432 -d dev -U devuser"

View File

@@ -0,0 +1,20 @@
services:
postgres:
image: postgres:18
environment:
POSTGRES_USER: devuser
POSTGRES_PASSWORD: devpassword
POSTGRES_DB: dev
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U devuser -d dev"]
interval: 5s
timeout: 5s
retries: 5
volumes:
- ${PWD}/fs/volumes/postgres/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d
command:
- "postgres"
- "-c"
- "wal_level=logical" #required for MaterializedPostgreSQL