feat: add postgresql example

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
2026-01-18 22:54:36 +01:00
parent 2b7014e038
commit 2729d674f8
3 changed files with 380 additions and 0 deletions

1
examples/postgres-backed/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
/target

View File

@@ -0,0 +1,18 @@
[package]
name = "postgres-backed"
edition = "2024"
version.workspace = true
[dependencies]
nocontrol = { workspace = true, features = ["postgres"] }
nocontrol-tui.workspace = true
anyhow.workspace = true
tokio.workspace = true
serde.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true
rand.workspace = true
uuid.workspace = true
noprocess.workspace = true
tokio-util = { version = "0.7", features = ["rt"] }

View File

@@ -0,0 +1,361 @@
use std::{collections::HashMap, future::Future, sync::Arc, time::Duration};
use anyhow::Context;
use nocontrol::{
Operator, OperatorState, Specification,
manifests::{Action, Manifest, ManifestMetadata, ManifestState, ManifestStatusState},
stores::BackingStore,
};
use noprocess::{HandleID, Process, ProcessHandler, ProcessManager, ProcessResult, ProcessState};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use tracing_subscriber::EnvFilter;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// let output_file = std::fs::File::create("target/nocontrol.log")?;
let cancellation_parent = CancellationToken::new();
let cancellation = cancellation_parent.child_token();
tokio::spawn({
async move {
let _ = tokio::signal::ctrl_c().await;
cancellation_parent.cancel();
}
});
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::from_default_env()
.add_directive("nocontrol=trace".parse().unwrap())
.add_directive("postgres_backend=trace".parse().unwrap())
.add_directive("sqlx=warn".parse().unwrap())
.add_directive("debug".parse().unwrap()),
)
// .with_writer(output_file)
.with_file(false)
.with_line_number(false)
.with_target(false)
.without_time()
.init();
let database_url = "postgres://devuser:devpassword@localhost:5432/dev";
// 1. Start up worker planes
// start_worker(database_url).await?;
// start_worker(database_url).await?;
// start_worker(database_url).await?;
let process_manager = ProcessManager::new();
let operator = OperatorState::new(ProcessOperator::new(process_manager));
let control_plane =
nocontrol::ControlPlane::new(operator, BackingStore::postgres(database_url).await?);
// Add initial process manifest - desired state is Running
control_plane
.add_manifest(Manifest {
name: "worker-process".into(),
metadata: ManifestMetadata {},
spec: Specifications::Process(ProcessManifest {
name: "data-worker".into(),
desired_state: DesiredProcessState::Running,
}),
})
.await?;
// Spawn a task that toggles the desired state periodically
// tokio::spawn({
// let control_plane = control_plane.clone();
// async move {
// let mut running = true;
// loop {
// tokio::time::sleep(Duration::from_secs(10)).await;
// running = !running;
// let desired_state = if running {
// DesiredProcessState::Running
// } else {
// DesiredProcessState::Stopped
// };
// tracing::info!("Toggling worker-process to {:?}", desired_state);
// let _ = control_plane
// .add_manifest(Manifest {
// name: "worker-process".into(),
// metadata: ManifestMetadata {},
// spec: Specifications::Process(ProcessManifest {
// name: "data-worker".into(),
// desired_state,
// }),
// })
// .await;
// }
// }
// });
// // Spawn control plane
// tokio::spawn({
// let control_plane = control_plane.clone();
// async move {
// let _ = control_plane.execute().await;
// }
// });
control_plane
.execute_with_cancellation(cancellation)
.await?;
// // Run TUI
// nocontrol_tui::run(control_plane).await?;
Ok(())
}
// async fn start_worker(database_url: &str) -> anyhow::Result<()> {
// let database_url = database_url.to_string();
// tokio::spawn(async move {
// let process_manager = ProcessManager::new();
// let operator = OperatorState::new(ProcessOperator::new(process_manager));
// let control_plane = nocontrol::ControlPlane::new(
// operator,
// BackingStore::postgres(&database_url)
// .await
// .expect("to be able to connect to database"),
// );
// control_plane
// .execute()
// .await
// .expect("control plane crashed");
// });
// Ok(())
// }
// The operator that manages processes via noprocess
#[derive(Clone)]
pub struct ProcessOperator {
process_manager: ProcessManager,
// Maps manifest name -> process handle ID in the ProcessManager
process_ids: Arc<RwLock<HashMap<String, HandleID>>>,
}
impl ProcessOperator {
pub fn new(process_manager: ProcessManager) -> Self {
Self {
process_manager,
process_ids: Arc::new(RwLock::new(HashMap::new())),
}
}
}
impl Operator for ProcessOperator {
type Specifications = Specifications;
type Error = anyhow::Error;
async fn reconcile(
&self,
manifest_state: &mut ManifestState<Specifications>,
) -> anyhow::Result<Action> {
match &manifest_state.manifest.spec {
Specifications::Process(spec) => {
let manifest_name = &manifest_state.manifest.name;
// Get or create the process
let process_id = {
let ids = self.process_ids.read().await;
ids.get(manifest_name).cloned()
};
match spec.desired_state {
DesiredProcessState::Running => {
if let Some(id) = &process_id {
// Check if process is already running
let status = self.process_manager.get_process_state(id).await;
match status {
Some(ProcessState::Running) => {
// Process is running as desired
manifest_state.status.status = ManifestStatusState::Running;
tracing::info!("Process {} is running as desired", spec.name);
}
Some(ProcessState::Pending) | Some(ProcessState::Stopped) => {
// Process is pending or stopped, start it
tracing::info!(
"Process {} is pending/stopped, starting",
spec.name
);
self.process_manager.start_process(id).await?;
manifest_state.status.status = ManifestStatusState::Started;
}
Some(ProcessState::Errored) => {
// Process errored, update status and try to restart
tracing::warn!(
"Process {} errored, attempting restart",
spec.name
);
manifest_state.status.status = ManifestStatusState::Errored;
// Try to restart
let _ = self.process_manager.restart_process(id).await;
}
None => {
// Process doesn't exist in manager, recreate
tracing::info!("Process {} not found, creating new", spec.name);
let new_id = self
.process_manager
.add_process(Process::new(WorkerProcess {
name: spec.name.clone(),
}))
.await;
self.process_manager.start_process(&new_id).await?;
self.process_ids
.write()
.await
.insert(manifest_name.clone(), new_id);
manifest_state.status.status = ManifestStatusState::Started;
}
}
} else {
// No process exists yet, create and start one
tracing::info!("Creating and starting process {}", spec.name);
let id = self
.process_manager
.add_process(Process::new(WorkerProcess {
name: spec.name.clone(),
}))
.await;
self.process_manager.start_process(&id).await?;
self.process_ids
.write()
.await
.insert(manifest_name.clone(), id);
manifest_state.status.status = ManifestStatusState::Started;
}
}
DesiredProcessState::Stopped => {
if let Some(id) = &process_id {
let status = self.process_manager.get_process_state(id).await;
match status {
Some(ProcessState::Running) => {
// Process is running but should be stopped
tracing::info!("Stopping process {} as requested", spec.name);
manifest_state.status.status = ManifestStatusState::Stopping;
self.process_manager.stop_process(id).await?;
}
Some(ProcessState::Pending) | Some(ProcessState::Stopped) => {
// Already pending/stopped as desired
manifest_state.status.status = ManifestStatusState::Pending;
tracing::info!(
"Process {} is pending/stopped as desired",
spec.name
);
}
Some(ProcessState::Errored) => {
// Errored and should be stopped - that's fine
tracing::info!(
"Process {} errored, desired state is stopped",
spec.name
);
manifest_state.status.status = ManifestStatusState::Errored;
}
None => {
// Doesn't exist, which is fine for stopped state
manifest_state.status.status = ManifestStatusState::Pending;
}
}
} else {
// No process exists, which is fine for stopped state
manifest_state.status.status = ManifestStatusState::Pending;
}
}
}
}
}
Ok(Action::Requeue(Duration::from_secs(5)))
}
async fn on_lease_lost(
&self,
manifest: &ManifestState<Self::Specifications>,
) -> Result<(), Self::Error> {
let process_id = {
let ids = self.process_ids.write().await;
ids.get(&manifest.manifest.name).cloned()
};
let Some(process_id) = process_id else {
tracing::info!("found no process, skipping");
return Ok(());
};
tracing::info!("stopping process");
self.process_manager
.stop_process(&process_id)
.await
.context("stop process")?;
Ok(())
}
}
// A simple worker process that does periodic work
#[derive(Clone)]
struct WorkerProcess {
name: String,
}
impl ProcessHandler for WorkerProcess {
fn call(&self, cancel: CancellationToken) -> impl Future<Output = ProcessResult> + Send {
let name = self.name.clone();
async move {
tracing::info!("Worker {} started", name);
let mut iteration = 0u64;
loop {
tokio::select! {
_ = cancel.cancelled() => {
tracing::info!("Worker {} received cancellation, shutting down gracefully", name);
break;
}
_ = tokio::time::sleep(Duration::from_secs(2)) => {
iteration += 1;
tracing::info!("Worker {} iteration {}: doing work...", name, iteration);
}
}
}
tracing::info!("Worker {} stopped", name);
Ok(())
}
}
}
// Specifications enum for the operator
#[derive(Clone, Serialize, Deserialize)]
pub enum Specifications {
Process(ProcessManifest),
}
impl Specification for Specifications {
fn kind(&self) -> &'static str {
match self {
Specifications::Process(_) => "process",
}
}
}
// The manifest specification for a process
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProcessManifest {
pub name: String,
pub desired_state: DesiredProcessState,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum DesiredProcessState {
Running,
Stopped,
}