diff --git a/examples/postgres-backed/.gitignore b/examples/postgres-backed/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/examples/postgres-backed/.gitignore @@ -0,0 +1 @@ +/target diff --git a/examples/postgres-backed/Cargo.toml b/examples/postgres-backed/Cargo.toml new file mode 100644 index 0000000..7eb1507 --- /dev/null +++ b/examples/postgres-backed/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "postgres-backed" +edition = "2024" +version.workspace = true + +[dependencies] +nocontrol = { workspace = true, features = ["postgres"] } +nocontrol-tui.workspace = true + +anyhow.workspace = true +tokio.workspace = true +serde.workspace = true +tracing-subscriber.workspace = true +tracing.workspace = true +rand.workspace = true +uuid.workspace = true +noprocess.workspace = true +tokio-util = { version = "0.7", features = ["rt"] } diff --git a/examples/postgres-backed/src/main.rs b/examples/postgres-backed/src/main.rs new file mode 100644 index 0000000..c439e21 --- /dev/null +++ b/examples/postgres-backed/src/main.rs @@ -0,0 +1,361 @@ +use std::{collections::HashMap, future::Future, sync::Arc, time::Duration}; + +use anyhow::Context; +use nocontrol::{ + Operator, OperatorState, Specification, + manifests::{Action, Manifest, ManifestMetadata, ManifestState, ManifestStatusState}, + stores::BackingStore, +}; +use noprocess::{HandleID, Process, ProcessHandler, ProcessManager, ProcessResult, ProcessState}; +use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; +use tokio_util::sync::CancellationToken; +use tracing_subscriber::EnvFilter; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + // let output_file = std::fs::File::create("target/nocontrol.log")?; + + let cancellation_parent = CancellationToken::new(); + let cancellation = cancellation_parent.child_token(); + + tokio::spawn({ + async move { + let _ = tokio::signal::ctrl_c().await; + cancellation_parent.cancel(); + } + }); + + tracing_subscriber::fmt() + .with_env_filter( + EnvFilter::from_default_env() + .add_directive("nocontrol=trace".parse().unwrap()) + .add_directive("postgres_backend=trace".parse().unwrap()) + .add_directive("sqlx=warn".parse().unwrap()) + .add_directive("debug".parse().unwrap()), + ) + // .with_writer(output_file) + .with_file(false) + .with_line_number(false) + .with_target(false) + .without_time() + .init(); + + let database_url = "postgres://devuser:devpassword@localhost:5432/dev"; + + // 1. Start up worker planes + // start_worker(database_url).await?; + // start_worker(database_url).await?; + // start_worker(database_url).await?; + + let process_manager = ProcessManager::new(); + let operator = OperatorState::new(ProcessOperator::new(process_manager)); + let control_plane = + nocontrol::ControlPlane::new(operator, BackingStore::postgres(database_url).await?); + + // Add initial process manifest - desired state is Running + control_plane + .add_manifest(Manifest { + name: "worker-process".into(), + metadata: ManifestMetadata {}, + spec: Specifications::Process(ProcessManifest { + name: "data-worker".into(), + desired_state: DesiredProcessState::Running, + }), + }) + .await?; + + // Spawn a task that toggles the desired state periodically + // tokio::spawn({ + // let control_plane = control_plane.clone(); + // async move { + // let mut running = true; + // loop { + // tokio::time::sleep(Duration::from_secs(10)).await; + + // running = !running; + // let desired_state = if running { + // DesiredProcessState::Running + // } else { + // DesiredProcessState::Stopped + // }; + + // tracing::info!("Toggling worker-process to {:?}", desired_state); + + // let _ = control_plane + // .add_manifest(Manifest { + // name: "worker-process".into(), + // metadata: ManifestMetadata {}, + // spec: Specifications::Process(ProcessManifest { + // name: "data-worker".into(), + // desired_state, + // }), + // }) + // .await; + // } + // } + // }); + + // // Spawn control plane + // tokio::spawn({ + // let control_plane = control_plane.clone(); + // async move { + // let _ = control_plane.execute().await; + // } + // }); + + control_plane + .execute_with_cancellation(cancellation) + .await?; + // // Run TUI + // nocontrol_tui::run(control_plane).await?; + + Ok(()) +} + +// async fn start_worker(database_url: &str) -> anyhow::Result<()> { +// let database_url = database_url.to_string(); +// tokio::spawn(async move { +// let process_manager = ProcessManager::new(); +// let operator = OperatorState::new(ProcessOperator::new(process_manager)); +// let control_plane = nocontrol::ControlPlane::new( +// operator, +// BackingStore::postgres(&database_url) +// .await +// .expect("to be able to connect to database"), +// ); + +// control_plane +// .execute() +// .await +// .expect("control plane crashed"); +// }); + +// Ok(()) +// } + +// The operator that manages processes via noprocess +#[derive(Clone)] +pub struct ProcessOperator { + process_manager: ProcessManager, + // Maps manifest name -> process handle ID in the ProcessManager + process_ids: Arc>>, +} + +impl ProcessOperator { + pub fn new(process_manager: ProcessManager) -> Self { + Self { + process_manager, + process_ids: Arc::new(RwLock::new(HashMap::new())), + } + } +} + +impl Operator for ProcessOperator { + type Specifications = Specifications; + type Error = anyhow::Error; + + async fn reconcile( + &self, + manifest_state: &mut ManifestState, + ) -> anyhow::Result { + match &manifest_state.manifest.spec { + Specifications::Process(spec) => { + let manifest_name = &manifest_state.manifest.name; + + // Get or create the process + let process_id = { + let ids = self.process_ids.read().await; + ids.get(manifest_name).cloned() + }; + + match spec.desired_state { + DesiredProcessState::Running => { + if let Some(id) = &process_id { + // Check if process is already running + let status = self.process_manager.get_process_state(id).await; + match status { + Some(ProcessState::Running) => { + // Process is running as desired + manifest_state.status.status = ManifestStatusState::Running; + tracing::info!("Process {} is running as desired", spec.name); + } + Some(ProcessState::Pending) | Some(ProcessState::Stopped) => { + // Process is pending or stopped, start it + tracing::info!( + "Process {} is pending/stopped, starting", + spec.name + ); + self.process_manager.start_process(id).await?; + manifest_state.status.status = ManifestStatusState::Started; + } + Some(ProcessState::Errored) => { + // Process errored, update status and try to restart + tracing::warn!( + "Process {} errored, attempting restart", + spec.name + ); + manifest_state.status.status = ManifestStatusState::Errored; + // Try to restart + let _ = self.process_manager.restart_process(id).await; + } + None => { + // Process doesn't exist in manager, recreate + tracing::info!("Process {} not found, creating new", spec.name); + let new_id = self + .process_manager + .add_process(Process::new(WorkerProcess { + name: spec.name.clone(), + })) + .await; + self.process_manager.start_process(&new_id).await?; + self.process_ids + .write() + .await + .insert(manifest_name.clone(), new_id); + manifest_state.status.status = ManifestStatusState::Started; + } + } + } else { + // No process exists yet, create and start one + tracing::info!("Creating and starting process {}", spec.name); + let id = self + .process_manager + .add_process(Process::new(WorkerProcess { + name: spec.name.clone(), + })) + .await; + self.process_manager.start_process(&id).await?; + self.process_ids + .write() + .await + .insert(manifest_name.clone(), id); + manifest_state.status.status = ManifestStatusState::Started; + } + } + DesiredProcessState::Stopped => { + if let Some(id) = &process_id { + let status = self.process_manager.get_process_state(id).await; + match status { + Some(ProcessState::Running) => { + // Process is running but should be stopped + tracing::info!("Stopping process {} as requested", spec.name); + manifest_state.status.status = ManifestStatusState::Stopping; + self.process_manager.stop_process(id).await?; + } + Some(ProcessState::Pending) | Some(ProcessState::Stopped) => { + // Already pending/stopped as desired + manifest_state.status.status = ManifestStatusState::Pending; + tracing::info!( + "Process {} is pending/stopped as desired", + spec.name + ); + } + Some(ProcessState::Errored) => { + // Errored and should be stopped - that's fine + tracing::info!( + "Process {} errored, desired state is stopped", + spec.name + ); + manifest_state.status.status = ManifestStatusState::Errored; + } + None => { + // Doesn't exist, which is fine for stopped state + manifest_state.status.status = ManifestStatusState::Pending; + } + } + } else { + // No process exists, which is fine for stopped state + manifest_state.status.status = ManifestStatusState::Pending; + } + } + } + } + } + + Ok(Action::Requeue(Duration::from_secs(5))) + } + + async fn on_lease_lost( + &self, + manifest: &ManifestState, + ) -> Result<(), Self::Error> { + let process_id = { + let ids = self.process_ids.write().await; + ids.get(&manifest.manifest.name).cloned() + }; + + let Some(process_id) = process_id else { + tracing::info!("found no process, skipping"); + + return Ok(()); + }; + + tracing::info!("stopping process"); + self.process_manager + .stop_process(&process_id) + .await + .context("stop process")?; + + Ok(()) + } +} + +// A simple worker process that does periodic work +#[derive(Clone)] +struct WorkerProcess { + name: String, +} + +impl ProcessHandler for WorkerProcess { + fn call(&self, cancel: CancellationToken) -> impl Future + Send { + let name = self.name.clone(); + async move { + tracing::info!("Worker {} started", name); + let mut iteration = 0u64; + + loop { + tokio::select! { + _ = cancel.cancelled() => { + tracing::info!("Worker {} received cancellation, shutting down gracefully", name); + break; + } + _ = tokio::time::sleep(Duration::from_secs(2)) => { + iteration += 1; + tracing::info!("Worker {} iteration {}: doing work...", name, iteration); + } + } + } + + tracing::info!("Worker {} stopped", name); + Ok(()) + } + } +} + +// Specifications enum for the operator +#[derive(Clone, Serialize, Deserialize)] +pub enum Specifications { + Process(ProcessManifest), +} + +impl Specification for Specifications { + fn kind(&self) -> &'static str { + match self { + Specifications::Process(_) => "process", + } + } +} + +// The manifest specification for a process +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProcessManifest { + pub name: String, + pub desired_state: DesiredProcessState, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +pub enum DesiredProcessState { + Running, + Stopped, +}