feat: ship without sqlx

This commit is contained in:
2026-03-05 18:56:49 +01:00
parent 0f24a41435
commit 4977cb0485
5 changed files with 353 additions and 1220 deletions

1078
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -24,13 +24,12 @@ serde_json = "1.0.148"
sha2 = "0.10.9" sha2 = "0.10.9"
tokio-util = "0.7.18" tokio-util = "0.7.18"
sqlx = { version = "0.8.6", optional = true, features = [ tokio-postgres = { version = "0.7", optional = true, features = [
"chrono", "with-uuid-1",
"json", "with-serde_json-1",
"postgres", "with-chrono-0_4",
"runtime-tokio",
"uuid",
] } ] }
chrono = { version = "0.4", optional = true }
[dev-dependencies] [dev-dependencies]
@@ -39,4 +38,4 @@ tracing-test = { version = "0.2.5", features = ["no-env-filter"] }
[features] [features]
default = [] default = []
postgres = ["dep:sqlx"] postgres = ["dep:tokio-postgres", "dep:chrono"]

View File

@@ -1,21 +0,0 @@
-- Add migration script here
create table manifests (
id UUID PRIMARY KEY NOT NULL,
generation BIGINT NOT NULL,
name TEXT NOT NULL,
kind TEXT NOT NULL,
status JSONB NOT NULL,
manifest_content JSONB NOT NULL,
manifest_hash BYTEA NOT NULL,
created TIMESTAMPTZ NOT NULL,
updated TIMESTAMPTZ NOT NULL,
lease_owner_id UUID,
lease_last_updated TIMESTAMPTZ
);
CREATE UNIQUE INDEX idx_manifest_name ON manifests(name, kind);

View File

@@ -1,9 +1,10 @@
use std::marker::PhantomData; use std::{marker::PhantomData, sync::Arc};
use anyhow::Context; use anyhow::Context;
use chrono::{DateTime, Utc};
use jiff::Timestamp; use jiff::Timestamp;
use sha2::Digest; use sha2::Digest;
use sqlx::{PgPool, Row}; use tokio_postgres::{Client, Row};
use crate::{ use crate::{
Specification, Specification,
@@ -13,30 +14,81 @@ use crate::{
stores::BackingStoreEdge, stores::BackingStoreEdge,
}; };
type PgTimestamp = sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc>; fn row_to_manifest_state<T: Specification>(row: &Row) -> anyhow::Result<ManifestState<T>> {
let manifest_content: serde_json::Value = row.get("manifest_content");
let status: serde_json::Value = row.get("status");
let manifest_hash: Vec<u8> = row.get("manifest_hash");
let generation: i64 = row.get("generation");
let created: DateTime<Utc> = row.get("created");
let updated: DateTime<Utc> = row.get("updated");
let lease_owner_id: Option<uuid::Uuid> = row.get("lease_owner_id");
let lease_last_updated: Option<DateTime<Utc>> = row.get("lease_last_updated");
let content: Manifest<T> = serde_json::from_value(manifest_content)?;
Ok(ManifestState {
manifest: content,
manifest_hash,
generation: generation as u64,
status: serde_json::from_value(status)?,
created: Timestamp::from_millisecond(created.timestamp_millis())?,
updated: Timestamp::from_millisecond(updated.timestamp_millis())?,
lease: match (lease_owner_id, lease_last_updated) {
(Some(owner_id), Some(last_updated)) => Some(ManifestLease {
owner: owner_id,
last_seen: Timestamp::from_millisecond(last_updated.timestamp_millis())?,
}),
_ => None,
},
})
}
#[derive(Clone)] #[derive(Clone)]
pub struct BackingStorePostgres<T: Specification> { pub struct BackingStorePostgres<T: Specification> {
pool: PgPool, client: Arc<Client>,
_marker: PhantomData<T>, _marker: PhantomData<T>,
} }
impl<T: Specification> BackingStorePostgres<T> { impl<T: Specification> BackingStorePostgres<T> {
pub(crate) async fn new(database_url: &str) -> anyhow::Result<Self> { pub(crate) async fn new(database_url: &str) -> anyhow::Result<Self> {
tracing::debug!("connecting to postgres database"); tracing::debug!("connecting to postgres database");
let pool = sqlx::PgPool::connect(database_url) let (client, connection) = tokio_postgres::connect(database_url, tokio_postgres::NoTls)
.await .await
.context("failed to connect to database")?; .context("failed to connect to database")?;
tokio::spawn(async move {
if let Err(e) = connection.await {
tracing::error!("postgres connection error: {e}");
}
});
tracing::debug!("migrating database"); tracing::debug!("migrating database");
sqlx::migrate!("migrations/postgres/") client
.run(&pool) .batch_execute(
r#"
CREATE TABLE IF NOT EXISTS manifests (
id UUID PRIMARY KEY NOT NULL,
generation BIGINT NOT NULL,
name TEXT NOT NULL,
kind TEXT NOT NULL,
status JSONB NOT NULL,
manifest_content JSONB NOT NULL,
manifest_hash BYTEA NOT NULL,
created TIMESTAMPTZ NOT NULL,
updated TIMESTAMPTZ NOT NULL,
lease_owner_id UUID,
lease_last_updated TIMESTAMPTZ
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_manifest_name ON manifests(name, kind);
"#,
)
.await .await
.context("failed to migrate")?; .context("failed to migrate")?;
Ok(Self { Ok(Self {
_marker: PhantomData, _marker: PhantomData,
pool, client: Arc::new(client),
}) })
} }
} }
@@ -45,208 +97,98 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
async fn get_owned_and_potential_leases( async fn get_owned_and_potential_leases(
&self, &self,
worker_id: &uuid::Uuid, worker_id: &uuid::Uuid,
) -> anyhow::Result<Vec<crate::manifests::ManifestState<T>>> { ) -> anyhow::Result<Vec<ManifestState<T>>> {
let recs = sqlx::query( let rows = self
r#" .client
.query(
r#"
SELECT SELECT
id, id, generation, name, kind, status,
generation, manifest_content, manifest_hash,
name, created, updated,
kind, lease_owner_id, lease_last_updated
status, FROM manifests
manifest_content,
manifest_hash,
created,
updated,
lease_owner_id,
lease_last_updated
FROM
manifests
WHERE WHERE
lease_last_updated < now() - INTERVAL '30 seconds' lease_last_updated < now() - INTERVAL '30 seconds'
OR (lease_owner_id = $1 AND lease_last_updated > now() - INTERVAL '15 seconds') OR (lease_owner_id = $1 AND lease_last_updated > now() - INTERVAL '15 seconds')
OR lease_owner_id IS NULL OR lease_owner_id IS NULL
"#,
&[worker_id],
)
.await?;
"#, rows.iter()
) .map(row_to_manifest_state)
.bind(worker_id)
.fetch_all(&self.pool)
.await?;
recs.into_iter()
.map(|r| {
let content: Manifest<T> =
serde_json::from_value(r.get::<serde_json::Value, _>("manifest_content"))?;
Ok(ManifestState {
manifest: content,
manifest_hash: r.get::<Vec<u8>, _>("manifest_hash"),
generation: r.get::<i64, _>("generation") as u64,
status: serde_json::from_value(r.get::<serde_json::Value, _>("status"))?,
created: Timestamp::from_millisecond(
r.get::<PgTimestamp, _>("created").timestamp_millis(),
)?,
updated: Timestamp::from_millisecond(
r.get::<PgTimestamp, _>("updated").timestamp_millis(),
)?,
lease: {
match (
r.get::<Option<uuid::Uuid>, _>("lease_owner_id"),
r.get::<Option<PgTimestamp>, _>("lease_last_updated"),
) {
(Some(owner_id), Some(last_updated)) => Some(ManifestLease {
owner: owner_id,
last_seen: Timestamp::from_millisecond(
last_updated.timestamp_millis(),
)?,
}),
(_, _) => None,
}
},
})
})
.collect::<anyhow::Result<Vec<_>>>() .collect::<anyhow::Result<Vec<_>>>()
} }
async fn get_manifests(&self) -> anyhow::Result<Vec<crate::manifests::ManifestState<T>>> { async fn get_manifests(&self) -> anyhow::Result<Vec<ManifestState<T>>> {
let recs = sqlx::query( let rows = self
r#" .client
.query(
r#"
SELECT SELECT
id, id, generation, name, kind, status,
generation, manifest_content, manifest_hash,
name, created, updated,
kind, lease_owner_id, lease_last_updated
status, FROM manifests
manifest_content, "#,
manifest_hash, &[],
created, )
updated, .await
lease_owner_id, .context("failed to get manifests from database")?;
lease_last_updated
FROM
manifests
"#,
)
.fetch_all(&self.pool)
.await
.context("failed to get manifests from database")?;
recs.into_iter() rows.iter()
.map(|r| { .map(row_to_manifest_state)
let content: Manifest<T> =
serde_json::from_value(r.get::<serde_json::Value, _>("manifest_content"))?;
Ok(ManifestState {
manifest: content,
manifest_hash: r.get::<Vec<u8>, _>("manifest_hash"),
generation: r.get::<i64, _>("generation") as u64,
status: serde_json::from_value(r.get::<serde_json::Value, _>("status"))?,
created: Timestamp::from_millisecond(
r.get::<PgTimestamp, _>("created").timestamp_millis(),
)?,
updated: Timestamp::from_millisecond(
r.get::<PgTimestamp, _>("updated").timestamp_millis(),
)?,
lease: {
match (
r.get::<Option<uuid::Uuid>, _>("lease_owner_id"),
r.get::<Option<PgTimestamp>, _>("lease_last_updated"),
) {
(Some(owner_id), Some(last_updated)) => Some(ManifestLease {
owner: owner_id,
last_seen: Timestamp::from_millisecond(
last_updated.timestamp_millis(),
)?,
}),
(_, _) => None,
}
},
})
})
.collect::<anyhow::Result<Vec<_>>>() .collect::<anyhow::Result<Vec<_>>>()
} }
async fn get(&self, name: &str) -> anyhow::Result<Option<ManifestState<T>>> { async fn get(&self, name: &str) -> anyhow::Result<Option<ManifestState<T>>> {
let rec = sqlx::query( let row = self
r#" .client
.query_opt(
r#"
SELECT SELECT
id, id, generation, name, kind, status,
generation, manifest_content, manifest_hash,
name, created, updated,
kind, lease_owner_id, lease_last_updated
status, FROM manifests
manifest_content, WHERE name = $1
manifest_hash, "#,
created, &[&name],
updated, )
lease_owner_id, .await
lease_last_updated .context("failed to get")?;
FROM
manifests
WHERE
name = $1
"#,
)
.bind(name)
.fetch_optional(&self.pool)
.await
.context("failed to get")?;
let Some(rec) = rec else { return Ok(None) }; let Some(row) = row else { return Ok(None) };
let content: Manifest<T> = Ok(Some(row_to_manifest_state(&row)?))
serde_json::from_value(rec.get::<serde_json::Value, _>("manifest_content"))?;
Ok(Some(ManifestState {
manifest: content,
manifest_hash: rec.get::<Vec<u8>, _>("manifest_hash"),
generation: rec.get::<i64, _>("generation") as u64,
status: serde_json::from_value(rec.get::<serde_json::Value, _>("status"))?,
created: Timestamp::from_millisecond(
rec.get::<PgTimestamp, _>("created").timestamp_millis(),
)?,
updated: Timestamp::from_millisecond(
rec.get::<PgTimestamp, _>("updated").timestamp_millis(),
)?,
lease: {
match (
rec.get::<Option<uuid::Uuid>, _>("lease_owner_id"),
rec.get::<Option<PgTimestamp>, _>("lease_last_updated"),
) {
(Some(owner_id), Some(last_updated)) => Some(ManifestLease {
owner: owner_id,
last_seen: Timestamp::from_millisecond(last_updated.timestamp_millis())?,
}),
(_, _) => None,
}
},
}))
} }
async fn update_lease( async fn update_lease(&self, manifest_state: &ManifestState<T>) -> anyhow::Result<()> {
&self, let rows = self
manifest_state: &crate::manifests::ManifestState<T>, .client
) -> anyhow::Result<()> { .execute(
let resp = sqlx::query( r#"
r#"
UPDATE manifests UPDATE manifests
SET SET lease_last_updated = now()
lease_last_updated = now()
WHERE WHERE
name = $1 name = $1
AND kind = $2 AND kind = $2
AND generation = $3 AND generation = $3
-- AND owner_id = $4 "#,
"#, &[
) &manifest_state.manifest.name,
.bind(&manifest_state.manifest.name) &manifest_state.manifest.spec.kind(),
.bind(manifest_state.manifest.spec.kind()) &(manifest_state.generation as i64),
.bind(manifest_state.generation as i64) ],
.execute(&self.pool) )
.await .await
.context("failed to update lease")?; .context("failed to update lease")?;
if resp.rows_affected() == 0 { if rows == 0 {
anyhow::bail!("failed to update lease, the host is no longer the owner") anyhow::bail!("failed to update lease, the host is no longer the owner")
} }
@@ -260,10 +202,12 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let name = &manifest_state.manifest.name; let name = &manifest_state.manifest.name;
let kind = manifest_state.manifest.spec.kind(); let kind = manifest_state.manifest.spec.kind();
let generation = manifest_state.generation; let generation = manifest_state.generation as i64;
let resp = sqlx::query( let rows = self
r#" .client
.execute(
r#"
UPDATE manifests UPDATE manifests
SET SET
lease_owner_id = $4, lease_owner_id = $4,
@@ -273,27 +217,20 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
name = $1 name = $1
AND kind = $2 AND kind = $2
AND generation = $3 AND generation = $3
"#, "#,
) &[&name, &kind, &generation, worker_id],
.bind(name) )
.bind(kind) .await
.bind(generation as i64) .context("failed to acquire lease")?;
.bind(worker_id)
.execute(&self.pool)
.await
.context("failed to acquire lease")?;
if resp.rows_affected() == 0 { if rows == 0 {
anyhow::bail!("failed to acquire lease: {}/{}@{}", kind, name, generation); anyhow::bail!("failed to acquire lease: {}/{}@{}", kind, name, generation);
} }
// TODO: maybe we should update fence as well
// manifest_state.generation = generation + 1;
Ok(()) Ok(())
} }
async fn upsert_manifest(&self, manifest: crate::manifests::Manifest<T>) -> anyhow::Result<()> { async fn upsert_manifest(&self, manifest: Manifest<T>) -> anyhow::Result<()> {
let id = uuid::Uuid::now_v7(); let id = uuid::Uuid::now_v7();
let name = &manifest.name; let name = &manifest.name;
let kind = manifest.spec.kind(); let kind = manifest.spec.kind();
@@ -305,61 +242,38 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
changes: vec![], changes: vec![],
})?; })?;
sqlx::query( self.client
r#" .execute(
INSERT INTO manifests ( r#"
id, INSERT INTO manifests (
generation, id, generation, name, kind, status,
name, manifest_content, manifest_hash,
kind, lease_owner_id, lease_last_updated,
status, created, updated
manifest_content, ) VALUES (
manifest_hash, $1, 0, $2, $3, $4, $5, $6, NULL, NULL, now(), now()
lease_owner_id, )
lease_last_updated, ON CONFLICT (name, kind) DO UPDATE
created, SET
updated manifest_content = $5,
) VALUES ( updated = now()
$1, "#,
0, &[&id, &name, &kind, &status, &content, &hash],
$2, )
$3, .await
$4, .context("failed to upsert manifest")?;
$5,
$6,
NULL,
NULL,
now(),
now()
)
ON CONFLICT (name, kind) DO UPDATE
SET
manifest_content = $5,
updated = now()
"#,
)
.bind(id)
.bind(name)
.bind(kind)
.bind(status)
.bind(content)
.bind(hash)
.execute(&self.pool)
.await
.context("failed to upsert manifest")?;
Ok(()) Ok(())
} }
async fn update_state( async fn update_state(&self, manifest: &ManifestState<T>) -> anyhow::Result<()> {
&self, let generation = manifest.generation as i64;
manifest: &crate::manifests::ManifestState<T>,
) -> anyhow::Result<()> {
let generation = manifest.generation;
let status = serde_json::to_value(&manifest.status)?; let status = serde_json::to_value(&manifest.status)?;
let resp = sqlx::query( let rows = self
r#" .client
.execute(
r#"
UPDATE manifests UPDATE manifests
SET SET
generation = $3 + 1, generation = $3 + 1,
@@ -369,17 +283,18 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
name = $1 name = $1
AND kind = $2 AND kind = $2
AND generation = $3 AND generation = $3
"#, "#,
) &[
.bind(&manifest.manifest.name) &manifest.manifest.name,
.bind(manifest.manifest.spec.kind()) &manifest.manifest.spec.kind(),
.bind(generation as i32) &generation,
.bind(status) &status,
.execute(&self.pool) ],
.await )
.context("failed to update state")?; .await
.context("failed to update state")?;
if resp.rows_affected() == 0 { if rows == 0 {
anyhow::bail!("failed to update state") anyhow::bail!("failed to update state")
} }
@@ -393,10 +308,12 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let name = &manifest.manifest.name; let name = &manifest.manifest.name;
let kind = manifest.manifest.spec.kind(); let kind = manifest.manifest.spec.kind();
let generation = manifest.generation; let generation = manifest.generation as i64;
let resp = sqlx::query( let rows = self
r#" .client
.execute(
r#"
UPDATE manifests UPDATE manifests
SET SET
lease_owner_id = NULL, lease_owner_id = NULL,
@@ -406,17 +323,13 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
AND kind = $2 AND kind = $2
AND generation = $3 AND generation = $3
AND lease_owner_id = $4 AND lease_owner_id = $4
"#, "#,
) &[&name, &kind, &generation, worker_id],
.bind(name) )
.bind(kind) .await
.bind(generation as i64) .context("failed to update lease")?;
.bind(worker_id)
.execute(&self.pool)
.await
.context("failed to update lease")?;
if resp.rows_affected() == 0 { if rows == 0 {
anyhow::bail!("failed to delete lease, the host is no longer the owner") anyhow::bail!("failed to delete lease, the host is no longer the owner")
} }

View File

@@ -29,7 +29,7 @@ async fn main() -> anyhow::Result<()> {
EnvFilter::from_default_env() EnvFilter::from_default_env()
.add_directive("nocontrol=trace".parse().unwrap()) .add_directive("nocontrol=trace".parse().unwrap())
.add_directive("postgres_backend=trace".parse().unwrap()) .add_directive("postgres_backend=trace".parse().unwrap())
.add_directive("sqlx=warn".parse().unwrap())
.add_directive("debug".parse().unwrap()), .add_directive("debug".parse().unwrap()),
) )
.with_file(false) .with_file(false)