feat: replace macros
This commit is contained in:
366
Cargo.lock
generated
366
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -3,7 +3,7 @@ use std::marker::PhantomData;
|
|||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use jiff::Timestamp;
|
use jiff::Timestamp;
|
||||||
use sha2::Digest;
|
use sha2::Digest;
|
||||||
use sqlx::PgPool;
|
use sqlx::{PgPool, Row};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
Specification,
|
Specification,
|
||||||
@@ -13,6 +13,8 @@ use crate::{
|
|||||||
stores::BackingStoreEdge,
|
stores::BackingStoreEdge,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
type PgTimestamp = sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc>;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct BackingStorePostgres<T: Specification> {
|
pub struct BackingStorePostgres<T: Specification> {
|
||||||
pool: PgPool,
|
pool: PgPool,
|
||||||
@@ -44,7 +46,7 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
|
|||||||
&self,
|
&self,
|
||||||
worker_id: &uuid::Uuid,
|
worker_id: &uuid::Uuid,
|
||||||
) -> anyhow::Result<Vec<crate::manifests::ManifestState<T>>> {
|
) -> anyhow::Result<Vec<crate::manifests::ManifestState<T>>> {
|
||||||
let recs = sqlx::query!(
|
let recs = sqlx::query(
|
||||||
r#"
|
r#"
|
||||||
SELECT
|
SELECT
|
||||||
id,
|
id,
|
||||||
@@ -66,24 +68,32 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
|
|||||||
OR lease_owner_id IS NULL
|
OR lease_owner_id IS NULL
|
||||||
|
|
||||||
"#,
|
"#,
|
||||||
worker_id
|
|
||||||
)
|
)
|
||||||
|
.bind(worker_id)
|
||||||
.fetch_all(&self.pool)
|
.fetch_all(&self.pool)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
recs.into_iter()
|
recs.into_iter()
|
||||||
.map(|r| {
|
.map(|r| {
|
||||||
let content: Manifest<T> = serde_json::from_value(r.manifest_content)?;
|
let content: Manifest<T> =
|
||||||
|
serde_json::from_value(r.get::<serde_json::Value, _>("manifest_content"))?;
|
||||||
|
|
||||||
Ok(ManifestState {
|
Ok(ManifestState {
|
||||||
manifest: content,
|
manifest: content,
|
||||||
manifest_hash: r.manifest_hash,
|
manifest_hash: r.get::<Vec<u8>, _>("manifest_hash"),
|
||||||
generation: r.generation as u64,
|
generation: r.get::<i64, _>("generation") as u64,
|
||||||
status: serde_json::from_value(r.status)?,
|
status: serde_json::from_value(r.get::<serde_json::Value, _>("status"))?,
|
||||||
created: Timestamp::from_millisecond(r.created.timestamp_millis())?,
|
created: Timestamp::from_millisecond(
|
||||||
updated: Timestamp::from_millisecond(r.updated.timestamp_millis())?,
|
r.get::<PgTimestamp, _>("created").timestamp_millis(),
|
||||||
|
)?,
|
||||||
|
updated: Timestamp::from_millisecond(
|
||||||
|
r.get::<PgTimestamp, _>("updated").timestamp_millis(),
|
||||||
|
)?,
|
||||||
lease: {
|
lease: {
|
||||||
match (r.lease_owner_id, r.lease_last_updated) {
|
match (
|
||||||
|
r.get::<Option<uuid::Uuid>, _>("lease_owner_id"),
|
||||||
|
r.get::<Option<PgTimestamp>, _>("lease_last_updated"),
|
||||||
|
) {
|
||||||
(Some(owner_id), Some(last_updated)) => Some(ManifestLease {
|
(Some(owner_id), Some(last_updated)) => Some(ManifestLease {
|
||||||
owner: owner_id,
|
owner: owner_id,
|
||||||
last_seen: Timestamp::from_millisecond(
|
last_seen: Timestamp::from_millisecond(
|
||||||
@@ -99,7 +109,7 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn get_manifests(&self) -> anyhow::Result<Vec<crate::manifests::ManifestState<T>>> {
|
async fn get_manifests(&self) -> anyhow::Result<Vec<crate::manifests::ManifestState<T>>> {
|
||||||
let recs = sqlx::query!(
|
let recs = sqlx::query(
|
||||||
r#"
|
r#"
|
||||||
SELECT
|
SELECT
|
||||||
id,
|
id,
|
||||||
@@ -115,7 +125,7 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
|
|||||||
lease_last_updated
|
lease_last_updated
|
||||||
FROM
|
FROM
|
||||||
manifests
|
manifests
|
||||||
"#
|
"#,
|
||||||
)
|
)
|
||||||
.fetch_all(&self.pool)
|
.fetch_all(&self.pool)
|
||||||
.await
|
.await
|
||||||
@@ -123,17 +133,25 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
|
|||||||
|
|
||||||
recs.into_iter()
|
recs.into_iter()
|
||||||
.map(|r| {
|
.map(|r| {
|
||||||
let content: Manifest<T> = serde_json::from_value(r.manifest_content)?;
|
let content: Manifest<T> =
|
||||||
|
serde_json::from_value(r.get::<serde_json::Value, _>("manifest_content"))?;
|
||||||
|
|
||||||
Ok(ManifestState {
|
Ok(ManifestState {
|
||||||
manifest: content,
|
manifest: content,
|
||||||
manifest_hash: r.manifest_hash,
|
manifest_hash: r.get::<Vec<u8>, _>("manifest_hash"),
|
||||||
generation: r.generation as u64,
|
generation: r.get::<i64, _>("generation") as u64,
|
||||||
status: serde_json::from_value(r.status)?,
|
status: serde_json::from_value(r.get::<serde_json::Value, _>("status"))?,
|
||||||
created: Timestamp::from_millisecond(r.created.timestamp_millis())?,
|
created: Timestamp::from_millisecond(
|
||||||
updated: Timestamp::from_millisecond(r.updated.timestamp_millis())?,
|
r.get::<PgTimestamp, _>("created").timestamp_millis(),
|
||||||
|
)?,
|
||||||
|
updated: Timestamp::from_millisecond(
|
||||||
|
r.get::<PgTimestamp, _>("updated").timestamp_millis(),
|
||||||
|
)?,
|
||||||
lease: {
|
lease: {
|
||||||
match (r.lease_owner_id, r.lease_last_updated) {
|
match (
|
||||||
|
r.get::<Option<uuid::Uuid>, _>("lease_owner_id"),
|
||||||
|
r.get::<Option<PgTimestamp>, _>("lease_last_updated"),
|
||||||
|
) {
|
||||||
(Some(owner_id), Some(last_updated)) => Some(ManifestLease {
|
(Some(owner_id), Some(last_updated)) => Some(ManifestLease {
|
||||||
owner: owner_id,
|
owner: owner_id,
|
||||||
last_seen: Timestamp::from_millisecond(
|
last_seen: Timestamp::from_millisecond(
|
||||||
@@ -149,7 +167,7 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn get(&self, name: &str) -> anyhow::Result<Option<ManifestState<T>>> {
|
async fn get(&self, name: &str) -> anyhow::Result<Option<ManifestState<T>>> {
|
||||||
let rec = sqlx::query!(
|
let rec = sqlx::query(
|
||||||
r#"
|
r#"
|
||||||
SELECT
|
SELECT
|
||||||
id,
|
id,
|
||||||
@@ -168,25 +186,33 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
|
|||||||
WHERE
|
WHERE
|
||||||
name = $1
|
name = $1
|
||||||
"#,
|
"#,
|
||||||
name
|
|
||||||
)
|
)
|
||||||
|
.bind(name)
|
||||||
.fetch_optional(&self.pool)
|
.fetch_optional(&self.pool)
|
||||||
.await
|
.await
|
||||||
.context("failed to get")?;
|
.context("failed to get")?;
|
||||||
|
|
||||||
let Some(rec) = rec else { return Ok(None) };
|
let Some(rec) = rec else { return Ok(None) };
|
||||||
|
|
||||||
let content: Manifest<T> = serde_json::from_value(rec.manifest_content)?;
|
let content: Manifest<T> =
|
||||||
|
serde_json::from_value(rec.get::<serde_json::Value, _>("manifest_content"))?;
|
||||||
|
|
||||||
Ok(Some(ManifestState {
|
Ok(Some(ManifestState {
|
||||||
manifest: content,
|
manifest: content,
|
||||||
manifest_hash: rec.manifest_hash,
|
manifest_hash: rec.get::<Vec<u8>, _>("manifest_hash"),
|
||||||
generation: rec.generation as u64,
|
generation: rec.get::<i64, _>("generation") as u64,
|
||||||
status: serde_json::from_value(rec.status)?,
|
status: serde_json::from_value(rec.get::<serde_json::Value, _>("status"))?,
|
||||||
created: Timestamp::from_millisecond(rec.created.timestamp_millis())?,
|
created: Timestamp::from_millisecond(
|
||||||
updated: Timestamp::from_millisecond(rec.updated.timestamp_millis())?,
|
rec.get::<PgTimestamp, _>("created").timestamp_millis(),
|
||||||
|
)?,
|
||||||
|
updated: Timestamp::from_millisecond(
|
||||||
|
rec.get::<PgTimestamp, _>("updated").timestamp_millis(),
|
||||||
|
)?,
|
||||||
lease: {
|
lease: {
|
||||||
match (rec.lease_owner_id, rec.lease_last_updated) {
|
match (
|
||||||
|
rec.get::<Option<uuid::Uuid>, _>("lease_owner_id"),
|
||||||
|
rec.get::<Option<PgTimestamp>, _>("lease_last_updated"),
|
||||||
|
) {
|
||||||
(Some(owner_id), Some(last_updated)) => Some(ManifestLease {
|
(Some(owner_id), Some(last_updated)) => Some(ManifestLease {
|
||||||
owner: owner_id,
|
owner: owner_id,
|
||||||
last_seen: Timestamp::from_millisecond(last_updated.timestamp_millis())?,
|
last_seen: Timestamp::from_millisecond(last_updated.timestamp_millis())?,
|
||||||
@@ -201,7 +227,7 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
|
|||||||
&self,
|
&self,
|
||||||
manifest_state: &crate::manifests::ManifestState<T>,
|
manifest_state: &crate::manifests::ManifestState<T>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let resp = sqlx::query!(
|
let resp = sqlx::query(
|
||||||
r#"
|
r#"
|
||||||
UPDATE manifests
|
UPDATE manifests
|
||||||
SET
|
SET
|
||||||
@@ -212,11 +238,10 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
|
|||||||
AND generation = $3
|
AND generation = $3
|
||||||
-- AND owner_id = $4
|
-- AND owner_id = $4
|
||||||
"#,
|
"#,
|
||||||
manifest_state.manifest.name,
|
|
||||||
manifest_state.manifest.spec.kind(),
|
|
||||||
manifest_state.generation as i64,
|
|
||||||
// worker_id,
|
|
||||||
)
|
)
|
||||||
|
.bind(&manifest_state.manifest.name)
|
||||||
|
.bind(manifest_state.manifest.spec.kind())
|
||||||
|
.bind(manifest_state.generation as i64)
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await
|
.await
|
||||||
.context("failed to update lease")?;
|
.context("failed to update lease")?;
|
||||||
@@ -237,7 +262,7 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
|
|||||||
let kind = manifest_state.manifest.spec.kind();
|
let kind = manifest_state.manifest.spec.kind();
|
||||||
let generation = manifest_state.generation;
|
let generation = manifest_state.generation;
|
||||||
|
|
||||||
let resp = sqlx::query!(
|
let resp = sqlx::query(
|
||||||
r#"
|
r#"
|
||||||
UPDATE manifests
|
UPDATE manifests
|
||||||
SET
|
SET
|
||||||
@@ -249,11 +274,11 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
|
|||||||
AND kind = $2
|
AND kind = $2
|
||||||
AND generation = $3
|
AND generation = $3
|
||||||
"#,
|
"#,
|
||||||
name,
|
|
||||||
kind,
|
|
||||||
generation as i64,
|
|
||||||
worker_id
|
|
||||||
)
|
)
|
||||||
|
.bind(name)
|
||||||
|
.bind(kind)
|
||||||
|
.bind(generation as i64)
|
||||||
|
.bind(worker_id)
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await
|
.await
|
||||||
.context("failed to acquire lease")?;
|
.context("failed to acquire lease")?;
|
||||||
@@ -280,7 +305,7 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
|
|||||||
changes: vec![],
|
changes: vec![],
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
sqlx::query!(
|
sqlx::query(
|
||||||
r#"
|
r#"
|
||||||
INSERT INTO manifests (
|
INSERT INTO manifests (
|
||||||
id,
|
id,
|
||||||
@@ -312,13 +337,13 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
|
|||||||
manifest_content = $5,
|
manifest_content = $5,
|
||||||
updated = now()
|
updated = now()
|
||||||
"#,
|
"#,
|
||||||
id,
|
|
||||||
name,
|
|
||||||
kind,
|
|
||||||
status,
|
|
||||||
content,
|
|
||||||
hash
|
|
||||||
)
|
)
|
||||||
|
.bind(id)
|
||||||
|
.bind(name)
|
||||||
|
.bind(kind)
|
||||||
|
.bind(status)
|
||||||
|
.bind(content)
|
||||||
|
.bind(hash)
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await
|
.await
|
||||||
.context("failed to upsert manifest")?;
|
.context("failed to upsert manifest")?;
|
||||||
@@ -333,7 +358,7 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
|
|||||||
let generation = manifest.generation;
|
let generation = manifest.generation;
|
||||||
let status = serde_json::to_value(&manifest.status)?;
|
let status = serde_json::to_value(&manifest.status)?;
|
||||||
|
|
||||||
let resp = sqlx::query!(
|
let resp = sqlx::query(
|
||||||
r#"
|
r#"
|
||||||
UPDATE manifests
|
UPDATE manifests
|
||||||
SET
|
SET
|
||||||
@@ -345,11 +370,11 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
|
|||||||
AND kind = $2
|
AND kind = $2
|
||||||
AND generation = $3
|
AND generation = $3
|
||||||
"#,
|
"#,
|
||||||
manifest.manifest.name,
|
|
||||||
manifest.manifest.spec.kind(),
|
|
||||||
generation as i32,
|
|
||||||
status
|
|
||||||
)
|
)
|
||||||
|
.bind(&manifest.manifest.name)
|
||||||
|
.bind(manifest.manifest.spec.kind())
|
||||||
|
.bind(generation as i32)
|
||||||
|
.bind(status)
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await
|
.await
|
||||||
.context("failed to update state")?;
|
.context("failed to update state")?;
|
||||||
@@ -370,7 +395,7 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
|
|||||||
let kind = manifest.manifest.spec.kind();
|
let kind = manifest.manifest.spec.kind();
|
||||||
let generation = manifest.generation;
|
let generation = manifest.generation;
|
||||||
|
|
||||||
let resp = sqlx::query!(
|
let resp = sqlx::query(
|
||||||
r#"
|
r#"
|
||||||
UPDATE manifests
|
UPDATE manifests
|
||||||
SET
|
SET
|
||||||
@@ -382,11 +407,11 @@ impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
|
|||||||
AND generation = $3
|
AND generation = $3
|
||||||
AND lease_owner_id = $4
|
AND lease_owner_id = $4
|
||||||
"#,
|
"#,
|
||||||
name,
|
|
||||||
kind,
|
|
||||||
generation as i64,
|
|
||||||
worker_id,
|
|
||||||
)
|
)
|
||||||
|
.bind(name)
|
||||||
|
.bind(kind)
|
||||||
|
.bind(generation as i64)
|
||||||
|
.bind(worker_id)
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await
|
.await
|
||||||
.context("failed to update lease")?;
|
.context("failed to update lease")?;
|
||||||
|
|||||||
Reference in New Issue
Block a user