@@ -24,6 +24,19 @@ serde_json = "1.0.148"
|
||||
sha2 = "0.10.9"
|
||||
tokio-util = "0.7.18"
|
||||
|
||||
sqlx = { version = "0.8.6", optional = true, features = [
|
||||
"chrono",
|
||||
"json",
|
||||
"postgres",
|
||||
"runtime-tokio",
|
||||
"uuid",
|
||||
] }
|
||||
|
||||
|
||||
[dev-dependencies]
|
||||
insta = "1.46.0"
|
||||
tracing-test = { version = "0.2.5", features = ["no-env-filter"] }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
postgres = ["dep:sqlx"]
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
-- Add migration script here
|
||||
@@ -10,6 +10,9 @@ use crate::{
|
||||
|
||||
pub mod in_process;
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
pub mod postgres;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct BackingStore<T: Specification, TStore: BackingStoreEdge<T>> {
|
||||
inner: TStore,
|
||||
@@ -42,6 +45,16 @@ impl<T: Specification> BackingStore<T, BackingStoreInProcess<T>> {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
impl<T: Specification> BackingStore<T, postgres::BackingStorePostgres<T>> {
|
||||
pub fn postgres(database_url: &str) -> Self {
|
||||
Self {
|
||||
inner: postgres::BackingStorePostgres::new(database_url),
|
||||
_marker: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait BackingStoreEdge<T: Specification>: Send + Sync + Clone {
|
||||
fn get_owned_and_potential_leases(
|
||||
&self,
|
||||
|
||||
72
crates/nocontrol/src/control_plane/backing_store/postgres.rs
Normal file
72
crates/nocontrol/src/control_plane/backing_store/postgres.rs
Normal file
@@ -0,0 +1,72 @@
|
||||
use std::marker::PhantomData;
|
||||
|
||||
use anyhow::Context;
|
||||
use sqlx::PgPool;
|
||||
|
||||
use crate::{Specification, stores::BackingStoreEdge};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct BackingStorePostgres<T: Specification> {
|
||||
pool: PgPool,
|
||||
_marker: PhantomData<T>,
|
||||
}
|
||||
impl<T: Specification> BackingStorePostgres<T> {
|
||||
pub(crate) async fn new(database_url: &str) -> anyhow::Result<Self> {
|
||||
let pool = sqlx::PgPool::connect(database_url)
|
||||
.await
|
||||
.context("failed to connect to database")?;
|
||||
|
||||
sqlx::migrate!("migrations/postgres/")
|
||||
.run(&pool)
|
||||
.await
|
||||
.context("failed to migrate")?;
|
||||
|
||||
Ok(Self {
|
||||
_marker: PhantomData,
|
||||
pool,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Specification> BackingStoreEdge<T> for BackingStorePostgres<T> {
|
||||
async fn get_owned_and_potential_leases(
|
||||
&self,
|
||||
worker_id: &uuid::Uuid,
|
||||
) -> anyhow::Result<Vec<crate::manifests::ManifestState<T>>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn get_manifests(&self) -> anyhow::Result<Vec<crate::manifests::ManifestState<T>>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn get(&self, name: &str) -> anyhow::Result<Option<crate::manifests::ManifestState<T>>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn update_lease(
|
||||
&self,
|
||||
manifest_state: &crate::manifests::ManifestState<T>,
|
||||
) -> anyhow::Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn acquire_lease(
|
||||
&self,
|
||||
manifest_state: &crate::manifests::ManifestState<T>,
|
||||
worker_id: &crate::manifests::WorkerId,
|
||||
) -> anyhow::Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn upsert_manifest(&self, manifest: crate::manifests::Manifest<T>) -> anyhow::Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn update_state(
|
||||
&self,
|
||||
manifest: &crate::manifests::ManifestState<T>,
|
||||
) -> anyhow::Result<()> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user