Compare commits
2 Commits
1904654a06
...
2b7014e038
| Author | SHA1 | Date | |
|---|---|---|---|
|
2b7014e038
|
|||
|
e0d6172e21
|
1
.env
Normal file
1
.env
Normal file
@@ -0,0 +1 @@
|
|||||||
|
DATABASE_URL=postgres://devuser:devpassword@localhost:5432/dev
|
||||||
1351
Cargo.lock
generated
1351
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -24,6 +24,19 @@ serde_json = "1.0.148"
|
|||||||
sha2 = "0.10.9"
|
sha2 = "0.10.9"
|
||||||
tokio-util = "0.7.18"
|
tokio-util = "0.7.18"
|
||||||
|
|
||||||
|
sqlx = { version = "0.8.6", optional = true, features = [
|
||||||
|
"chrono",
|
||||||
|
"json",
|
||||||
|
"postgres",
|
||||||
|
"runtime-tokio",
|
||||||
|
"uuid",
|
||||||
|
] }
|
||||||
|
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
insta = "1.46.0"
|
insta = "1.46.0"
|
||||||
tracing-test = { version = "0.2.5", features = ["no-env-filter"] }
|
tracing-test = { version = "0.2.5", features = ["no-env-filter"] }
|
||||||
|
|
||||||
|
[features]
|
||||||
|
default = []
|
||||||
|
postgres = ["dep:sqlx"]
|
||||||
|
|||||||
@@ -0,0 +1,21 @@
|
|||||||
|
-- Add migration script here
|
||||||
|
|
||||||
|
create table manifests (
|
||||||
|
id UUID PRIMARY KEY NOT NULL,
|
||||||
|
generation BIGINT NOT NULL,
|
||||||
|
|
||||||
|
name TEXT NOT NULL,
|
||||||
|
kind TEXT NOT NULL,
|
||||||
|
|
||||||
|
status JSONB NOT NULL,
|
||||||
|
|
||||||
|
manifest_content JSONB NOT NULL,
|
||||||
|
manifest_hash BYTEA NOT NULL,
|
||||||
|
|
||||||
|
created TIMESTAMPTZ NOT NULL,
|
||||||
|
updated TIMESTAMPTZ NOT NULL,
|
||||||
|
|
||||||
|
lease_owner_id UUID,
|
||||||
|
lease_last_updated TIMESTAMPTZ
|
||||||
|
);
|
||||||
|
CREATE UNIQUE INDEX idx_manifest_name ON manifests(name, kind);
|
||||||
@@ -17,6 +17,7 @@ pub struct ControlPlane<TOperator: Operator, TStore: BackingStoreEdge<TOperator:
|
|||||||
reconciler: Reconciler<TOperator, TStore>,
|
reconciler: Reconciler<TOperator, TStore>,
|
||||||
worker_id: uuid::Uuid,
|
worker_id: uuid::Uuid,
|
||||||
store: BackingStore<TOperator::Specifications, TStore>,
|
store: BackingStore<TOperator::Specifications, TStore>,
|
||||||
|
cancellation: CancellationToken,
|
||||||
|
|
||||||
deadline: Option<std::time::Duration>,
|
deadline: Option<std::time::Duration>,
|
||||||
}
|
}
|
||||||
@@ -33,6 +34,7 @@ impl<TOperator: Operator, TStore: BackingStoreEdge<TOperator::Specifications>>
|
|||||||
let reconciler = Reconciler::new(worker_id, &store, operator);
|
let reconciler = Reconciler::new(worker_id, &store, operator);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
|
cancellation: CancellationToken::new(),
|
||||||
reconciler,
|
reconciler,
|
||||||
worker_id,
|
worker_id,
|
||||||
deadline: None,
|
deadline: None,
|
||||||
@@ -59,12 +61,24 @@ impl<TOperator: Operator, TStore: BackingStoreEdge<TOperator::Specifications>>
|
|||||||
let cancellation_token = cancellation;
|
let cancellation_token = cancellation;
|
||||||
let child_token = cancellation_token.child_token();
|
let child_token = cancellation_token.child_token();
|
||||||
if let Some(deadline) = self.deadline {
|
if let Some(deadline) = self.deadline {
|
||||||
tokio::spawn(async move {
|
tokio::spawn({
|
||||||
tokio::time::sleep(deadline).await;
|
let cancellation_token = cancellation_token.clone();
|
||||||
cancellation_token.cancel();
|
async move {
|
||||||
|
tokio::time::sleep(deadline).await;
|
||||||
|
cancellation_token.cancel();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
tokio::spawn({
|
||||||
|
let self_cancel = self.cancellation.child_token();
|
||||||
|
async move {
|
||||||
|
self_cancel.cancelled().await;
|
||||||
|
cancellation_token.cancel();
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tracing::debug_span!("reconcile", worker = self.worker_id.to_string());
|
||||||
self.reconciler.reconcile(&child_token).await?;
|
self.reconciler.reconcile(&child_token).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -86,4 +100,8 @@ impl<TOperator: Operator, TStore: BackingStoreEdge<TOperator::Specifications>>
|
|||||||
) -> anyhow::Result<Vec<ManifestState<TOperator::Specifications>>> {
|
) -> anyhow::Result<Vec<ManifestState<TOperator::Specifications>>> {
|
||||||
self.store.get_manifests().await
|
self.store.get_manifests().await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn shutdown(&self) -> anyhow::Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,6 +10,9 @@ use crate::{
|
|||||||
|
|
||||||
pub mod in_process;
|
pub mod in_process;
|
||||||
|
|
||||||
|
#[cfg(feature = "postgres")]
|
||||||
|
pub mod postgres;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct BackingStore<T: Specification, TStore: BackingStoreEdge<T>> {
|
pub struct BackingStore<T: Specification, TStore: BackingStoreEdge<T>> {
|
||||||
inner: TStore,
|
inner: TStore,
|
||||||
@@ -42,6 +45,16 @@ impl<T: Specification> BackingStore<T, BackingStoreInProcess<T>> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "postgres")]
|
||||||
|
impl<T: Specification> BackingStore<T, postgres::BackingStorePostgres<T>> {
|
||||||
|
pub async fn postgres(database_url: &str) -> anyhow::Result<Self> {
|
||||||
|
Ok(Self {
|
||||||
|
inner: postgres::BackingStorePostgres::new(database_url).await?,
|
||||||
|
_marker: PhantomData,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub trait BackingStoreEdge<T: Specification>: Send + Sync + Clone {
|
pub trait BackingStoreEdge<T: Specification>: Send + Sync + Clone {
|
||||||
fn get_owned_and_potential_leases(
|
fn get_owned_and_potential_leases(
|
||||||
&self,
|
&self,
|
||||||
@@ -68,6 +81,12 @@ pub trait BackingStoreEdge<T: Specification>: Send + Sync + Clone {
|
|||||||
worker_id: &WorkerId,
|
worker_id: &WorkerId,
|
||||||
) -> impl std::future::Future<Output = anyhow::Result<()>> + Send;
|
) -> impl std::future::Future<Output = anyhow::Result<()>> + Send;
|
||||||
|
|
||||||
|
fn delete_lease(
|
||||||
|
&self,
|
||||||
|
manifest: &ManifestState<T>,
|
||||||
|
worker_id: &WorkerId,
|
||||||
|
) -> impl std::future::Future<Output = anyhow::Result<()>> + Send;
|
||||||
|
|
||||||
fn upsert_manifest(
|
fn upsert_manifest(
|
||||||
&self,
|
&self,
|
||||||
manifest: Manifest<T>,
|
manifest: Manifest<T>,
|
||||||
|
|||||||
@@ -166,6 +166,23 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStoreInProcess<T> {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn delete_lease(
|
||||||
|
&self,
|
||||||
|
manifest: &ManifestState<T>,
|
||||||
|
_worker_id: &WorkerId,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let mut manifests = self.manifests.write().await;
|
||||||
|
|
||||||
|
if let Some(manifest) = manifests
|
||||||
|
.iter_mut()
|
||||||
|
.find(|m| m.manifest.name == manifest.manifest.name)
|
||||||
|
{
|
||||||
|
manifest.lease = None;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Specification> BackingStoreInProcess<T> {
|
impl<T: Specification> BackingStoreInProcess<T> {
|
||||||
|
|||||||
400
crates/nocontrol/src/control_plane/backing_store/postgres.rs
Normal file
400
crates/nocontrol/src/control_plane/backing_store/postgres.rs
Normal file
@@ -0,0 +1,400 @@
|
|||||||
|
use std::marker::PhantomData;
|
||||||
|
|
||||||
|
use anyhow::Context;
|
||||||
|
use jiff::Timestamp;
|
||||||
|
use sha2::Digest;
|
||||||
|
use sqlx::PgPool;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
Specification,
|
||||||
|
manifests::{
|
||||||
|
Manifest, ManifestLease, ManifestState, ManifestStatus, ManifestStatusState, WorkerId,
|
||||||
|
},
|
||||||
|
stores::BackingStoreEdge,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct BackingStorePostgres<T: Specification> {
|
||||||
|
pool: PgPool,
|
||||||
|
_marker: PhantomData<T>,
|
||||||
|
}
|
||||||
|
impl<T: Specification> BackingStorePostgres<T> {
|
||||||
|
pub(crate) async fn new(database_url: &str) -> anyhow::Result<Self> {
|
||||||
|
tracing::debug!("connecting to postgres database");
|
||||||
|
|
||||||
|
let pool = sqlx::PgPool::connect(database_url)
|
||||||
|
.await
|
||||||
|
.context("failed to connect to database")?;
|
||||||
|
|
||||||
|
tracing::debug!("migrating database");
|
||||||
|
sqlx::migrate!("migrations/postgres/")
|
||||||
|
.run(&pool)
|
||||||
|
.await
|
||||||
|
.context("failed to migrate")?;
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
_marker: PhantomData,
|
||||||
|
pool,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
|
||||||
|
async fn get_owned_and_potential_leases(
|
||||||
|
&self,
|
||||||
|
worker_id: &uuid::Uuid,
|
||||||
|
) -> anyhow::Result<Vec<crate::manifests::ManifestState<T>>> {
|
||||||
|
let recs = sqlx::query!(
|
||||||
|
r#"
|
||||||
|
SELECT
|
||||||
|
id,
|
||||||
|
generation,
|
||||||
|
name,
|
||||||
|
kind,
|
||||||
|
status,
|
||||||
|
manifest_content,
|
||||||
|
manifest_hash,
|
||||||
|
created,
|
||||||
|
updated,
|
||||||
|
lease_owner_id,
|
||||||
|
lease_last_updated
|
||||||
|
FROM
|
||||||
|
manifests
|
||||||
|
WHERE
|
||||||
|
lease_last_updated < now() - INTERVAL '30 seconds'
|
||||||
|
OR (lease_owner_id = $1 AND lease_last_updated > now() - INTERVAL '15 seconds')
|
||||||
|
OR lease_owner_id IS NULL
|
||||||
|
|
||||||
|
"#,
|
||||||
|
worker_id
|
||||||
|
)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
recs.into_iter()
|
||||||
|
.map(|r| {
|
||||||
|
let content: Manifest<T> = serde_json::from_value(r.manifest_content)?;
|
||||||
|
|
||||||
|
Ok(ManifestState {
|
||||||
|
manifest: content,
|
||||||
|
manifest_hash: r.manifest_hash,
|
||||||
|
generation: r.generation as u64,
|
||||||
|
status: serde_json::from_value(r.status)?,
|
||||||
|
created: Timestamp::from_millisecond(r.created.timestamp_millis())?,
|
||||||
|
updated: Timestamp::from_millisecond(r.updated.timestamp_millis())?,
|
||||||
|
lease: {
|
||||||
|
match (r.lease_owner_id, r.lease_last_updated) {
|
||||||
|
(Some(owner_id), Some(last_updated)) => Some(ManifestLease {
|
||||||
|
owner: owner_id,
|
||||||
|
last_seen: Timestamp::from_millisecond(
|
||||||
|
last_updated.timestamp_millis(),
|
||||||
|
)?,
|
||||||
|
}),
|
||||||
|
(_, _) => None,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect::<anyhow::Result<Vec<_>>>()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_manifests(&self) -> anyhow::Result<Vec<crate::manifests::ManifestState<T>>> {
|
||||||
|
let recs = sqlx::query!(
|
||||||
|
r#"
|
||||||
|
SELECT
|
||||||
|
id,
|
||||||
|
generation,
|
||||||
|
name,
|
||||||
|
kind,
|
||||||
|
status,
|
||||||
|
manifest_content,
|
||||||
|
manifest_hash,
|
||||||
|
created,
|
||||||
|
updated,
|
||||||
|
lease_owner_id,
|
||||||
|
lease_last_updated
|
||||||
|
FROM
|
||||||
|
manifests
|
||||||
|
"#
|
||||||
|
)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await
|
||||||
|
.context("failed to get manifests from database")?;
|
||||||
|
|
||||||
|
recs.into_iter()
|
||||||
|
.map(|r| {
|
||||||
|
let content: Manifest<T> = serde_json::from_value(r.manifest_content)?;
|
||||||
|
|
||||||
|
Ok(ManifestState {
|
||||||
|
manifest: content,
|
||||||
|
manifest_hash: r.manifest_hash,
|
||||||
|
generation: r.generation as u64,
|
||||||
|
status: serde_json::from_value(r.status)?,
|
||||||
|
created: Timestamp::from_millisecond(r.created.timestamp_millis())?,
|
||||||
|
updated: Timestamp::from_millisecond(r.updated.timestamp_millis())?,
|
||||||
|
lease: {
|
||||||
|
match (r.lease_owner_id, r.lease_last_updated) {
|
||||||
|
(Some(owner_id), Some(last_updated)) => Some(ManifestLease {
|
||||||
|
owner: owner_id,
|
||||||
|
last_seen: Timestamp::from_millisecond(
|
||||||
|
last_updated.timestamp_millis(),
|
||||||
|
)?,
|
||||||
|
}),
|
||||||
|
(_, _) => None,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect::<anyhow::Result<Vec<_>>>()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get(&self, name: &str) -> anyhow::Result<Option<ManifestState<T>>> {
|
||||||
|
let rec = sqlx::query!(
|
||||||
|
r#"
|
||||||
|
SELECT
|
||||||
|
id,
|
||||||
|
generation,
|
||||||
|
name,
|
||||||
|
kind,
|
||||||
|
status,
|
||||||
|
manifest_content,
|
||||||
|
manifest_hash,
|
||||||
|
created,
|
||||||
|
updated,
|
||||||
|
lease_owner_id,
|
||||||
|
lease_last_updated
|
||||||
|
FROM
|
||||||
|
manifests
|
||||||
|
WHERE
|
||||||
|
name = $1
|
||||||
|
"#,
|
||||||
|
name
|
||||||
|
)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await
|
||||||
|
.context("failed to get")?;
|
||||||
|
|
||||||
|
let Some(rec) = rec else { return Ok(None) };
|
||||||
|
|
||||||
|
let content: Manifest<T> = serde_json::from_value(rec.manifest_content)?;
|
||||||
|
|
||||||
|
Ok(Some(ManifestState {
|
||||||
|
manifest: content,
|
||||||
|
manifest_hash: rec.manifest_hash,
|
||||||
|
generation: rec.generation as u64,
|
||||||
|
status: serde_json::from_value(rec.status)?,
|
||||||
|
created: Timestamp::from_millisecond(rec.created.timestamp_millis())?,
|
||||||
|
updated: Timestamp::from_millisecond(rec.updated.timestamp_millis())?,
|
||||||
|
lease: {
|
||||||
|
match (rec.lease_owner_id, rec.lease_last_updated) {
|
||||||
|
(Some(owner_id), Some(last_updated)) => Some(ManifestLease {
|
||||||
|
owner: owner_id,
|
||||||
|
last_seen: Timestamp::from_millisecond(last_updated.timestamp_millis())?,
|
||||||
|
}),
|
||||||
|
(_, _) => None,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_lease(
|
||||||
|
&self,
|
||||||
|
manifest_state: &crate::manifests::ManifestState<T>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let resp = sqlx::query!(
|
||||||
|
r#"
|
||||||
|
UPDATE manifests
|
||||||
|
SET
|
||||||
|
lease_last_updated = now()
|
||||||
|
WHERE
|
||||||
|
name = $1
|
||||||
|
AND kind = $2
|
||||||
|
AND generation = $3
|
||||||
|
-- AND owner_id = $4
|
||||||
|
"#,
|
||||||
|
manifest_state.manifest.name,
|
||||||
|
manifest_state.manifest.spec.kind(),
|
||||||
|
manifest_state.generation as i64,
|
||||||
|
// worker_id,
|
||||||
|
)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.context("failed to update lease")?;
|
||||||
|
|
||||||
|
if resp.rows_affected() == 0 {
|
||||||
|
anyhow::bail!("failed to update lease, the host is no longer the owner")
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn acquire_lease(
|
||||||
|
&self,
|
||||||
|
manifest_state: &ManifestState<T>,
|
||||||
|
worker_id: &WorkerId,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let name = &manifest_state.manifest.name;
|
||||||
|
let kind = manifest_state.manifest.spec.kind();
|
||||||
|
let generation = manifest_state.generation;
|
||||||
|
|
||||||
|
let resp = sqlx::query!(
|
||||||
|
r#"
|
||||||
|
UPDATE manifests
|
||||||
|
SET
|
||||||
|
lease_owner_id = $4,
|
||||||
|
lease_last_updated = now(),
|
||||||
|
generation = $3 + 1
|
||||||
|
WHERE
|
||||||
|
name = $1
|
||||||
|
AND kind = $2
|
||||||
|
AND generation = $3
|
||||||
|
"#,
|
||||||
|
name,
|
||||||
|
kind,
|
||||||
|
generation as i64,
|
||||||
|
worker_id
|
||||||
|
)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.context("failed to acquire lease")?;
|
||||||
|
|
||||||
|
if resp.rows_affected() == 0 {
|
||||||
|
anyhow::bail!("failed to acquire lease: {}/{}@{}", kind, name, generation);
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: maybe we should update fence as well
|
||||||
|
// manifest_state.generation = generation + 1;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn upsert_manifest(&self, manifest: crate::manifests::Manifest<T>) -> anyhow::Result<()> {
|
||||||
|
let id = uuid::Uuid::now_v7();
|
||||||
|
let name = &manifest.name;
|
||||||
|
let kind = manifest.spec.kind();
|
||||||
|
let content = serde_json::to_value(&manifest)?;
|
||||||
|
let hash = &sha2::Sha256::digest(serde_json::to_vec(&content)?)[..];
|
||||||
|
let status = serde_json::to_value(ManifestStatus {
|
||||||
|
status: ManifestStatusState::Pending,
|
||||||
|
events: vec![],
|
||||||
|
changes: vec![],
|
||||||
|
})?;
|
||||||
|
|
||||||
|
sqlx::query!(
|
||||||
|
r#"
|
||||||
|
INSERT INTO manifests (
|
||||||
|
id,
|
||||||
|
generation,
|
||||||
|
name,
|
||||||
|
kind,
|
||||||
|
status,
|
||||||
|
manifest_content,
|
||||||
|
manifest_hash,
|
||||||
|
lease_owner_id,
|
||||||
|
lease_last_updated,
|
||||||
|
created,
|
||||||
|
updated
|
||||||
|
) VALUES (
|
||||||
|
$1,
|
||||||
|
0,
|
||||||
|
$2,
|
||||||
|
$3,
|
||||||
|
$4,
|
||||||
|
$5,
|
||||||
|
$6,
|
||||||
|
NULL,
|
||||||
|
NULL,
|
||||||
|
now(),
|
||||||
|
now()
|
||||||
|
)
|
||||||
|
ON CONFLICT (name, kind) DO UPDATE
|
||||||
|
SET
|
||||||
|
manifest_content = $5,
|
||||||
|
updated = now()
|
||||||
|
"#,
|
||||||
|
id,
|
||||||
|
name,
|
||||||
|
kind,
|
||||||
|
status,
|
||||||
|
content,
|
||||||
|
hash
|
||||||
|
)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.context("failed to upsert manifest")?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_state(
|
||||||
|
&self,
|
||||||
|
manifest: &crate::manifests::ManifestState<T>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let generation = manifest.generation;
|
||||||
|
let status = serde_json::to_value(&manifest.status)?;
|
||||||
|
|
||||||
|
let resp = sqlx::query!(
|
||||||
|
r#"
|
||||||
|
UPDATE manifests
|
||||||
|
SET
|
||||||
|
generation = $3 + 1,
|
||||||
|
status = $4,
|
||||||
|
updated = now()
|
||||||
|
WHERE
|
||||||
|
name = $1
|
||||||
|
AND kind = $2
|
||||||
|
AND generation = $3
|
||||||
|
"#,
|
||||||
|
manifest.manifest.name,
|
||||||
|
manifest.manifest.spec.kind(),
|
||||||
|
generation as i32,
|
||||||
|
status
|
||||||
|
)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.context("failed to update state")?;
|
||||||
|
|
||||||
|
if resp.rows_affected() == 0 {
|
||||||
|
anyhow::bail!("failed to update state")
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn delete_lease(
|
||||||
|
&self,
|
||||||
|
manifest: &ManifestState<T>,
|
||||||
|
worker_id: &WorkerId,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let name = &manifest.manifest.name;
|
||||||
|
let kind = manifest.manifest.spec.kind();
|
||||||
|
let generation = manifest.generation;
|
||||||
|
|
||||||
|
let resp = sqlx::query!(
|
||||||
|
r#"
|
||||||
|
UPDATE manifests
|
||||||
|
SET
|
||||||
|
lease_owner_id = NULL,
|
||||||
|
lease_last_updated = NULL
|
||||||
|
WHERE
|
||||||
|
name = $1
|
||||||
|
AND kind = $2
|
||||||
|
AND generation = $3
|
||||||
|
AND lease_owner_id = $4
|
||||||
|
"#,
|
||||||
|
name,
|
||||||
|
kind,
|
||||||
|
generation as i64,
|
||||||
|
worker_id,
|
||||||
|
)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.context("failed to update lease")?;
|
||||||
|
|
||||||
|
if resp.rows_affected() == 0 {
|
||||||
|
anyhow::bail!("failed to delete lease, the host is no longer the owner")
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -55,6 +55,8 @@ impl<T: Operator, TStore: BackingStoreEdge<T::Specifications>> Reconciler<T, TSt
|
|||||||
}
|
}
|
||||||
|
|
||||||
tracing::debug!("reconciler shutting down");
|
tracing::debug!("reconciler shutting down");
|
||||||
|
self.relinquish_manifests().await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -73,7 +75,27 @@ impl<T: Operator, TStore: BackingStoreEdge<T::Specifications>> Reconciler<T, TSt
|
|||||||
tracing::warn!(error = %e, "failed to sync manifests");
|
tracing::warn!(error = %e, "failed to sync manifests");
|
||||||
}
|
}
|
||||||
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
|
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn relinquish_manifests(&self) -> anyhow::Result<()> {
|
||||||
|
tracing::info!("relinquishing all known manifests");
|
||||||
|
|
||||||
|
let manifests = self
|
||||||
|
.store
|
||||||
|
.get_owned_and_potential_leases(&self.worker_id)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
for manifest in manifests {
|
||||||
|
if let Some(lease) = &manifest.lease
|
||||||
|
&& lease.owner == self.worker_id
|
||||||
|
&& let Err(e) = self.store.delete_lease(&manifest, &self.worker_id).await
|
||||||
|
{
|
||||||
|
tracing::warn!("failed to relinquish manifest: {e:#}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -81,15 +103,20 @@ impl<T: Operator, TStore: BackingStoreEdge<T::Specifications>> Reconciler<T, TSt
|
|||||||
|
|
||||||
/// Single sync iteration - check for manifests, acquire leases, enqueue work.
|
/// Single sync iteration - check for manifests, acquire leases, enqueue work.
|
||||||
async fn sync_once(&self) -> anyhow::Result<()> {
|
async fn sync_once(&self) -> anyhow::Result<()> {
|
||||||
for manifest_state in self
|
let manifests = self
|
||||||
.store
|
.store
|
||||||
.get_owned_and_potential_leases(&self.worker_id)
|
.get_owned_and_potential_leases(&self.worker_id)
|
||||||
.await?
|
.await?;
|
||||||
{
|
|
||||||
|
tracing::trace!(manifests = manifests.len(), "sync once manifests");
|
||||||
|
|
||||||
|
for manifest_state in manifests {
|
||||||
let manifest_name = manifest_state.manifest.name.clone();
|
let manifest_name = manifest_state.manifest.name.clone();
|
||||||
|
|
||||||
match &manifest_state.lease {
|
match &manifest_state.lease {
|
||||||
Some(lease) if lease.owner == self.worker_id => {
|
Some(lease) if lease.owner == self.worker_id => {
|
||||||
|
tracing::trace!("updating lease");
|
||||||
|
|
||||||
// We own the lease, update it
|
// We own the lease, update it
|
||||||
self.store
|
self.store
|
||||||
.update_lease(&manifest_state)
|
.update_lease(&manifest_state)
|
||||||
@@ -101,7 +128,9 @@ impl<T: Operator, TStore: BackingStoreEdge<T::Specifications>> Reconciler<T, TSt
|
|||||||
self.reconcile_queue.enqueue(manifest_name).await;
|
self.reconcile_queue.enqueue(manifest_name).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => {
|
_ => {
|
||||||
|
tracing::trace!("acquiring lease");
|
||||||
|
|
||||||
// No lease, try to acquire
|
// No lease, try to acquire
|
||||||
self.store
|
self.store
|
||||||
.acquire_lease(&manifest_state, &self.worker_id)
|
.acquire_lease(&manifest_state, &self.worker_id)
|
||||||
@@ -111,10 +140,6 @@ impl<T: Operator, TStore: BackingStoreEdge<T::Specifications>> Reconciler<T, TSt
|
|||||||
// Enqueue for reconciliation
|
// Enqueue for reconciliation
|
||||||
self.reconcile_queue.enqueue(manifest_name).await;
|
self.reconcile_queue.enqueue(manifest_name).await;
|
||||||
}
|
}
|
||||||
_ => {
|
|
||||||
// Someone else owns the lease, skip
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -218,7 +243,13 @@ impl<T: Operator, TStore: BackingStoreEdge<T::Specifications>> Reconciler<T, TSt
|
|||||||
match &manifest.lease {
|
match &manifest.lease {
|
||||||
Some(lease) if lease.owner == self.worker_id => {}
|
Some(lease) if lease.owner == self.worker_id => {}
|
||||||
_ => {
|
_ => {
|
||||||
tracing::debug!(%manifest_name, "we don't own the lease, skipping");
|
tracing::debug!(%manifest_name, "we don't own the lease, shutting down owned resources");
|
||||||
|
|
||||||
|
self.operator
|
||||||
|
.on_lease_lost(&manifest)
|
||||||
|
.await
|
||||||
|
.map_err(|_e| anyhow::anyhow!("failed handle lease lost event"))?;
|
||||||
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,6 +16,13 @@ pub trait Operator: Send + Sync + 'static {
|
|||||||
desired_manifest: &mut ManifestState<Self::Specifications>,
|
desired_manifest: &mut ManifestState<Self::Specifications>,
|
||||||
) -> impl Future<Output = Result<Action, Self::Error>>;
|
) -> impl Future<Output = Result<Action, Self::Error>>;
|
||||||
|
|
||||||
|
fn on_lease_lost(
|
||||||
|
&self,
|
||||||
|
manifest: &ManifestState<Self::Specifications>,
|
||||||
|
) -> impl Future<Output = Result<(), Self::Error>> {
|
||||||
|
async { Ok(()) }
|
||||||
|
}
|
||||||
|
|
||||||
fn on_error(
|
fn on_error(
|
||||||
&self,
|
&self,
|
||||||
desired_manifest: &mut ManifestState<Self::Specifications>,
|
desired_manifest: &mut ManifestState<Self::Specifications>,
|
||||||
|
|||||||
@@ -41,6 +41,13 @@ impl<T: Operator> Operator for OperatorState<T> {
|
|||||||
self.inner.lock().await.reconcile(desired_manifest).await
|
self.inner.lock().await.reconcile(desired_manifest).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn on_lease_lost(
|
||||||
|
&self,
|
||||||
|
manifest: &crate::manifests::ManifestState<Self::Specifications>,
|
||||||
|
) -> Result<(), Self::Error> {
|
||||||
|
self.inner.lock().await.on_lease_lost(manifest).await
|
||||||
|
}
|
||||||
|
|
||||||
async fn on_error(
|
async fn on_error(
|
||||||
&self,
|
&self,
|
||||||
desired_manifest: &mut crate::manifests::ManifestState<Self::Specifications>,
|
desired_manifest: &mut crate::manifests::ManifestState<Self::Specifications>,
|
||||||
|
|||||||
26
mise.toml
26
mise.toml
@@ -1,4 +1,5 @@
|
|||||||
[env]
|
[env]
|
||||||
|
_.file = ".env"
|
||||||
RUST_LOG = "nocontrol=debug,info"
|
RUST_LOG = "nocontrol=debug,info"
|
||||||
|
|
||||||
[tasks.test]
|
[tasks.test]
|
||||||
@@ -8,3 +9,28 @@ run = "cargo nextest run"
|
|||||||
[tasks.example]
|
[tasks.example]
|
||||||
alias = "e"
|
alias = "e"
|
||||||
run = "cargo run --bin kubernetes-like"
|
run = "cargo run --bin kubernetes-like"
|
||||||
|
|
||||||
|
[tasks."example:postgres"]
|
||||||
|
run = "cargo run --bin postgres-backed"
|
||||||
|
|
||||||
|
[tasks."local:up"]
|
||||||
|
run = "docker compose -f ./templates/docker/docker-compose.yml up -d --remove-orphans --wait"
|
||||||
|
|
||||||
|
[tasks."local:down"]
|
||||||
|
run = "docker compose -f ./templates/docker/docker-compose.yml down"
|
||||||
|
|
||||||
|
[tasks."local:logs"]
|
||||||
|
run = "docker compose -f ./templates/docker/docker-compose.yml logs -f --tail=500"
|
||||||
|
|
||||||
|
[tasks."db:migrate"]
|
||||||
|
run = """
|
||||||
|
mise run local:down
|
||||||
|
sleep 1
|
||||||
|
mise run local:up
|
||||||
|
cargo sqlx migrate run --source ./crates/nocontrol/migrations/postgres
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
[tasks."db:shell"]
|
||||||
|
env = {PGPASSWORD = "devpassword"}
|
||||||
|
run = "psql -h localhost -p 5432 -d dev -U devuser"
|
||||||
|
|||||||
20
templates/docker/docker-compose.yml
Normal file
20
templates/docker/docker-compose.yml
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
services:
|
||||||
|
postgres:
|
||||||
|
image: postgres:18
|
||||||
|
environment:
|
||||||
|
POSTGRES_USER: devuser
|
||||||
|
POSTGRES_PASSWORD: devpassword
|
||||||
|
POSTGRES_DB: dev
|
||||||
|
ports:
|
||||||
|
- "5432:5432"
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD-SHELL", "pg_isready -U devuser -d dev"]
|
||||||
|
interval: 5s
|
||||||
|
timeout: 5s
|
||||||
|
retries: 5
|
||||||
|
volumes:
|
||||||
|
- ${PWD}/fs/volumes/postgres/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d
|
||||||
|
command:
|
||||||
|
- "postgres"
|
||||||
|
- "-c"
|
||||||
|
- "wal_level=logical" #required for MaterializedPostgreSQL
|
||||||
Reference in New Issue
Block a user