diff --git a/.env b/.env new file mode 100644 index 0000000..d92ed64 --- /dev/null +++ b/.env @@ -0,0 +1 @@ +DATABASE_URL=postgres://devuser:devpassword@localhost:5432/dev diff --git a/crates/nocontrol/migrations/postgres/20260118161914_initial.sql b/crates/nocontrol/migrations/postgres/20260118161914_initial.sql index 8ddc1d3..79b9fa3 100644 --- a/crates/nocontrol/migrations/postgres/20260118161914_initial.sql +++ b/crates/nocontrol/migrations/postgres/20260118161914_initial.sql @@ -1 +1,21 @@ -- Add migration script here + +create table manifests ( + id UUID PRIMARY KEY NOT NULL, + generation BIGINT NOT NULL, + + name TEXT NOT NULL, + kind TEXT NOT NULL, + + status JSONB NOT NULL, + + manifest_content JSONB NOT NULL, + manifest_hash BYTEA NOT NULL, + + created TIMESTAMPTZ NOT NULL, + updated TIMESTAMPTZ NOT NULL, + + lease_owner_id UUID, + lease_last_updated TIMESTAMPTZ +); +CREATE UNIQUE INDEX idx_manifest_name ON manifests(name, kind); diff --git a/crates/nocontrol/src/control_plane.rs b/crates/nocontrol/src/control_plane.rs index 6ac4c59..8bb06a3 100644 --- a/crates/nocontrol/src/control_plane.rs +++ b/crates/nocontrol/src/control_plane.rs @@ -17,6 +17,7 @@ pub struct ControlPlane, worker_id: uuid::Uuid, store: BackingStore, + cancellation: CancellationToken, deadline: Option, } @@ -33,6 +34,7 @@ impl> let reconciler = Reconciler::new(worker_id, &store, operator); Self { + cancellation: CancellationToken::new(), reconciler, worker_id, deadline: None, @@ -59,12 +61,24 @@ impl> let cancellation_token = cancellation; let child_token = cancellation_token.child_token(); if let Some(deadline) = self.deadline { - tokio::spawn(async move { - tokio::time::sleep(deadline).await; - cancellation_token.cancel(); + tokio::spawn({ + let cancellation_token = cancellation_token.clone(); + async move { + tokio::time::sleep(deadline).await; + cancellation_token.cancel(); + } + }); + + tokio::spawn({ + let self_cancel = self.cancellation.child_token(); + async move { + self_cancel.cancelled().await; + cancellation_token.cancel(); + } }); } + tracing::debug_span!("reconcile", worker = self.worker_id.to_string()); self.reconciler.reconcile(&child_token).await?; Ok(()) @@ -86,4 +100,8 @@ impl> ) -> anyhow::Result>> { self.store.get_manifests().await } + + pub async fn shutdown(&self) -> anyhow::Result<()> { + Ok(()) + } } diff --git a/crates/nocontrol/src/control_plane/backing_store.rs b/crates/nocontrol/src/control_plane/backing_store.rs index fbc98cd..2caa008 100644 --- a/crates/nocontrol/src/control_plane/backing_store.rs +++ b/crates/nocontrol/src/control_plane/backing_store.rs @@ -47,11 +47,11 @@ impl BackingStore> { #[cfg(feature = "postgres")] impl BackingStore> { - pub fn postgres(database_url: &str) -> Self { - Self { - inner: postgres::BackingStorePostgres::new(database_url), + pub async fn postgres(database_url: &str) -> anyhow::Result { + Ok(Self { + inner: postgres::BackingStorePostgres::new(database_url).await?, _marker: PhantomData, - } + }) } } @@ -81,6 +81,12 @@ pub trait BackingStoreEdge: Send + Sync + Clone { worker_id: &WorkerId, ) -> impl std::future::Future> + Send; + fn delete_lease( + &self, + manifest: &ManifestState, + worker_id: &WorkerId, + ) -> impl std::future::Future> + Send; + fn upsert_manifest( &self, manifest: Manifest, diff --git a/crates/nocontrol/src/control_plane/backing_store/in_process.rs b/crates/nocontrol/src/control_plane/backing_store/in_process.rs index 90c646d..49d426d 100644 --- a/crates/nocontrol/src/control_plane/backing_store/in_process.rs +++ b/crates/nocontrol/src/control_plane/backing_store/in_process.rs @@ -166,6 +166,23 @@ impl BackingStoreEdge for BackingStoreInProcess { Ok(()) } + + async fn delete_lease( + &self, + manifest: &ManifestState, + _worker_id: &WorkerId, + ) -> anyhow::Result<()> { + let mut manifests = self.manifests.write().await; + + if let Some(manifest) = manifests + .iter_mut() + .find(|m| m.manifest.name == manifest.manifest.name) + { + manifest.lease = None; + } + + Ok(()) + } } impl BackingStoreInProcess { diff --git a/crates/nocontrol/src/control_plane/backing_store/postgres.rs b/crates/nocontrol/src/control_plane/backing_store/postgres.rs index 8e366ad..d693878 100644 --- a/crates/nocontrol/src/control_plane/backing_store/postgres.rs +++ b/crates/nocontrol/src/control_plane/backing_store/postgres.rs @@ -1,9 +1,17 @@ use std::marker::PhantomData; use anyhow::Context; +use jiff::Timestamp; +use sha2::Digest; use sqlx::PgPool; -use crate::{Specification, stores::BackingStoreEdge}; +use crate::{ + Specification, + manifests::{ + Manifest, ManifestLease, ManifestState, ManifestStatus, ManifestStatusState, WorkerId, + }, + stores::BackingStoreEdge, +}; #[derive(Clone)] pub struct BackingStorePostgres { @@ -12,10 +20,13 @@ pub struct BackingStorePostgres { } impl BackingStorePostgres { pub(crate) async fn new(database_url: &str) -> anyhow::Result { + tracing::debug!("connecting to postgres database"); + let pool = sqlx::PgPool::connect(database_url) .await .context("failed to connect to database")?; + tracing::debug!("migrating database"); sqlx::migrate!("migrations/postgres/") .run(&pool) .await @@ -33,40 +44,357 @@ impl BackingStoreEdge for BackingStorePostgres { &self, worker_id: &uuid::Uuid, ) -> anyhow::Result>> { - todo!() + let recs = sqlx::query!( + r#" + SELECT + id, + generation, + name, + kind, + status, + manifest_content, + manifest_hash, + created, + updated, + lease_owner_id, + lease_last_updated + FROM + manifests + WHERE + lease_last_updated < now() - INTERVAL '30 seconds' + OR (lease_owner_id = $1 AND lease_last_updated > now() - INTERVAL '15 seconds') + OR lease_owner_id IS NULL + + "#, + worker_id + ) + .fetch_all(&self.pool) + .await?; + + recs.into_iter() + .map(|r| { + let content: Manifest = serde_json::from_value(r.manifest_content)?; + + Ok(ManifestState { + manifest: content, + manifest_hash: r.manifest_hash, + generation: r.generation as u64, + status: serde_json::from_value(r.status)?, + created: Timestamp::from_millisecond(r.created.timestamp_millis())?, + updated: Timestamp::from_millisecond(r.updated.timestamp_millis())?, + lease: { + match (r.lease_owner_id, r.lease_last_updated) { + (Some(owner_id), Some(last_updated)) => Some(ManifestLease { + owner: owner_id, + last_seen: Timestamp::from_millisecond( + last_updated.timestamp_millis(), + )?, + }), + (_, _) => None, + } + }, + }) + }) + .collect::>>() } async fn get_manifests(&self) -> anyhow::Result>> { - todo!() + let recs = sqlx::query!( + r#" + SELECT + id, + generation, + name, + kind, + status, + manifest_content, + manifest_hash, + created, + updated, + lease_owner_id, + lease_last_updated + FROM + manifests + "# + ) + .fetch_all(&self.pool) + .await + .context("failed to get manifests from database")?; + + recs.into_iter() + .map(|r| { + let content: Manifest = serde_json::from_value(r.manifest_content)?; + + Ok(ManifestState { + manifest: content, + manifest_hash: r.manifest_hash, + generation: r.generation as u64, + status: serde_json::from_value(r.status)?, + created: Timestamp::from_millisecond(r.created.timestamp_millis())?, + updated: Timestamp::from_millisecond(r.updated.timestamp_millis())?, + lease: { + match (r.lease_owner_id, r.lease_last_updated) { + (Some(owner_id), Some(last_updated)) => Some(ManifestLease { + owner: owner_id, + last_seen: Timestamp::from_millisecond( + last_updated.timestamp_millis(), + )?, + }), + (_, _) => None, + } + }, + }) + }) + .collect::>>() } - async fn get(&self, name: &str) -> anyhow::Result>> { - todo!() + async fn get(&self, name: &str) -> anyhow::Result>> { + let rec = sqlx::query!( + r#" + SELECT + id, + generation, + name, + kind, + status, + manifest_content, + manifest_hash, + created, + updated, + lease_owner_id, + lease_last_updated + FROM + manifests + WHERE + name = $1 + "#, + name + ) + .fetch_optional(&self.pool) + .await + .context("failed to get")?; + + let Some(rec) = rec else { return Ok(None) }; + + let content: Manifest = serde_json::from_value(rec.manifest_content)?; + + Ok(Some(ManifestState { + manifest: content, + manifest_hash: rec.manifest_hash, + generation: rec.generation as u64, + status: serde_json::from_value(rec.status)?, + created: Timestamp::from_millisecond(rec.created.timestamp_millis())?, + updated: Timestamp::from_millisecond(rec.updated.timestamp_millis())?, + lease: { + match (rec.lease_owner_id, rec.lease_last_updated) { + (Some(owner_id), Some(last_updated)) => Some(ManifestLease { + owner: owner_id, + last_seen: Timestamp::from_millisecond(last_updated.timestamp_millis())?, + }), + (_, _) => None, + } + }, + })) } async fn update_lease( &self, manifest_state: &crate::manifests::ManifestState, ) -> anyhow::Result<()> { - todo!() + let resp = sqlx::query!( + r#" + UPDATE manifests + SET + lease_last_updated = now() + WHERE + name = $1 + AND kind = $2 + AND generation = $3 + -- AND owner_id = $4 + "#, + manifest_state.manifest.name, + manifest_state.manifest.spec.kind(), + manifest_state.generation as i64, + // worker_id, + ) + .execute(&self.pool) + .await + .context("failed to update lease")?; + + if resp.rows_affected() == 0 { + anyhow::bail!("failed to update lease, the host is no longer the owner") + } + + Ok(()) } async fn acquire_lease( &self, - manifest_state: &crate::manifests::ManifestState, - worker_id: &crate::manifests::WorkerId, + manifest_state: &ManifestState, + worker_id: &WorkerId, ) -> anyhow::Result<()> { - todo!() + let name = &manifest_state.manifest.name; + let kind = manifest_state.manifest.spec.kind(); + let generation = manifest_state.generation; + + let resp = sqlx::query!( + r#" + UPDATE manifests + SET + lease_owner_id = $4, + lease_last_updated = now(), + generation = $3 + 1 + WHERE + name = $1 + AND kind = $2 + AND generation = $3 + "#, + name, + kind, + generation as i64, + worker_id + ) + .execute(&self.pool) + .await + .context("failed to acquire lease")?; + + if resp.rows_affected() == 0 { + anyhow::bail!("failed to acquire lease: {}/{}@{}", kind, name, generation); + } + + // TODO: maybe we should update fence as well + // manifest_state.generation = generation + 1; + + Ok(()) } async fn upsert_manifest(&self, manifest: crate::manifests::Manifest) -> anyhow::Result<()> { - todo!() + let id = uuid::Uuid::now_v7(); + let name = &manifest.name; + let kind = manifest.spec.kind(); + let content = serde_json::to_value(&manifest)?; + let hash = &sha2::Sha256::digest(serde_json::to_vec(&content)?)[..]; + let status = serde_json::to_value(ManifestStatus { + status: ManifestStatusState::Pending, + events: vec![], + changes: vec![], + })?; + + sqlx::query!( + r#" + INSERT INTO manifests ( + id, + generation, + name, + kind, + status, + manifest_content, + manifest_hash, + lease_owner_id, + lease_last_updated, + created, + updated + ) VALUES ( + $1, + 0, + $2, + $3, + $4, + $5, + $6, + NULL, + NULL, + now(), + now() + ) + ON CONFLICT (name, kind) DO UPDATE + SET + manifest_content = $5, + updated = now() + "#, + id, + name, + kind, + status, + content, + hash + ) + .execute(&self.pool) + .await + .context("failed to upsert manifest")?; + + Ok(()) } async fn update_state( &self, manifest: &crate::manifests::ManifestState, ) -> anyhow::Result<()> { - todo!() + let generation = manifest.generation; + let status = serde_json::to_value(&manifest.status)?; + + let resp = sqlx::query!( + r#" + UPDATE manifests + SET + generation = $3 + 1, + status = $4, + updated = now() + WHERE + name = $1 + AND kind = $2 + AND generation = $3 + "#, + manifest.manifest.name, + manifest.manifest.spec.kind(), + generation as i32, + status + ) + .execute(&self.pool) + .await + .context("failed to update state")?; + + if resp.rows_affected() == 0 { + anyhow::bail!("failed to update state") + } + + Ok(()) + } + + async fn delete_lease( + &self, + manifest: &ManifestState, + worker_id: &WorkerId, + ) -> anyhow::Result<()> { + let name = &manifest.manifest.name; + let kind = manifest.manifest.spec.kind(); + let generation = manifest.generation; + + let resp = sqlx::query!( + r#" + UPDATE manifests + SET + lease_owner_id = NULL, + lease_last_updated = NULL + WHERE + name = $1 + AND kind = $2 + AND generation = $3 + AND lease_owner_id = $4 + "#, + name, + kind, + generation as i64, + worker_id, + ) + .execute(&self.pool) + .await + .context("failed to update lease")?; + + if resp.rows_affected() == 0 { + anyhow::bail!("failed to delete lease, the host is no longer the owner") + } + + Ok(()) } } diff --git a/crates/nocontrol/src/control_plane/reconciler.rs b/crates/nocontrol/src/control_plane/reconciler.rs index 039090b..19baebd 100644 --- a/crates/nocontrol/src/control_plane/reconciler.rs +++ b/crates/nocontrol/src/control_plane/reconciler.rs @@ -55,6 +55,8 @@ impl> Reconciler> Reconciler anyhow::Result<()> { + tracing::info!("relinquishing all known manifests"); + + let manifests = self + .store + .get_owned_and_potential_leases(&self.worker_id) + .await?; + + for manifest in manifests { + if let Some(lease) = &manifest.lease + && lease.owner == self.worker_id + && let Err(e) = self.store.delete_lease(&manifest, &self.worker_id).await + { + tracing::warn!("failed to relinquish manifest: {e:#}"); + } } Ok(()) @@ -81,15 +103,20 @@ impl> Reconciler anyhow::Result<()> { - for manifest_state in self + let manifests = self .store .get_owned_and_potential_leases(&self.worker_id) - .await? - { + .await?; + + tracing::trace!(manifests = manifests.len(), "sync once manifests"); + + for manifest_state in manifests { let manifest_name = manifest_state.manifest.name.clone(); match &manifest_state.lease { Some(lease) if lease.owner == self.worker_id => { + tracing::trace!("updating lease"); + // We own the lease, update it self.store .update_lease(&manifest_state) @@ -101,7 +128,9 @@ impl> Reconciler { + _ => { + tracing::trace!("acquiring lease"); + // No lease, try to acquire self.store .acquire_lease(&manifest_state, &self.worker_id) @@ -111,10 +140,6 @@ impl> Reconciler { - // Someone else owns the lease, skip - continue; - } } } @@ -218,7 +243,13 @@ impl> Reconciler {} _ => { - tracing::debug!(%manifest_name, "we don't own the lease, skipping"); + tracing::debug!(%manifest_name, "we don't own the lease, shutting down owned resources"); + + self.operator + .on_lease_lost(&manifest) + .await + .map_err(|_e| anyhow::anyhow!("failed handle lease lost event"))?; + return Ok(()); } } diff --git a/crates/nocontrol/src/operator.rs b/crates/nocontrol/src/operator.rs index 094f9c8..595f63d 100644 --- a/crates/nocontrol/src/operator.rs +++ b/crates/nocontrol/src/operator.rs @@ -16,6 +16,13 @@ pub trait Operator: Send + Sync + 'static { desired_manifest: &mut ManifestState, ) -> impl Future>; + fn on_lease_lost( + &self, + manifest: &ManifestState, + ) -> impl Future> { + async { Ok(()) } + } + fn on_error( &self, desired_manifest: &mut ManifestState, diff --git a/crates/nocontrol/src/operator_state.rs b/crates/nocontrol/src/operator_state.rs index c970543..03da63e 100644 --- a/crates/nocontrol/src/operator_state.rs +++ b/crates/nocontrol/src/operator_state.rs @@ -41,6 +41,13 @@ impl Operator for OperatorState { self.inner.lock().await.reconcile(desired_manifest).await } + async fn on_lease_lost( + &self, + manifest: &crate::manifests::ManifestState, + ) -> Result<(), Self::Error> { + self.inner.lock().await.on_lease_lost(manifest).await + } + async fn on_error( &self, desired_manifest: &mut crate::manifests::ManifestState, diff --git a/mise.toml b/mise.toml index ca8e1a3..c040565 100644 --- a/mise.toml +++ b/mise.toml @@ -1,4 +1,5 @@ [env] +_.file = ".env" RUST_LOG = "nocontrol=debug,info" [tasks.test] @@ -20,3 +21,16 @@ run = "docker compose -f ./templates/docker/docker-compose.yml down" [tasks."local:logs"] run = "docker compose -f ./templates/docker/docker-compose.yml logs -f --tail=500" + +[tasks."db:migrate"] +run = """ + mise run local:down + sleep 1 + mise run local:up + cargo sqlx migrate run --source ./crates/nocontrol/migrations/postgres +""" + + +[tasks."db:shell"] +env = {PGPASSWORD = "devpassword"} +run = "psql -h localhost -p 5432 -d dev -U devuser"