diff --git a/.env b/.env index d92ed64..919bd63 100644 --- a/.env +++ b/.env @@ -1 +1,3 @@ DATABASE_URL=postgres://devuser:devpassword@localhost:5432/dev + +#SQLX_OFFLINE=true diff --git a/.sqlx/query-0461b6433be16583d2480de11d5b712de1229dff78624ecab5edcf9f05a2e0e4.json b/.sqlx/query-0461b6433be16583d2480de11d5b712de1229dff78624ecab5edcf9f05a2e0e4.json new file mode 100644 index 0000000..6325c99 --- /dev/null +++ b/.sqlx/query-0461b6433be16583d2480de11d5b712de1229dff78624ecab5edcf9f05a2e0e4.json @@ -0,0 +1,40 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT value, revision\n FROM noleader_leaders\n WHERE\n key = $1\n AND heartbeat >= now() - interval '60 seconds'\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "value", + "type_info": "Text", + "origin": { + "Table": { + "table": "noleader_leaders", + "name": "value" + } + } + }, + { + "ordinal": 1, + "name": "revision", + "type_info": "Int8", + "origin": { + "Table": { + "table": "noleader_leaders", + "name": "revision" + } + } + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "0461b6433be16583d2480de11d5b712de1229dff78624ecab5edcf9f05a2e0e4" +} diff --git a/.sqlx/query-1aa9d51fee3918db168e3704d1ac0e80e5038e2619e5029597fd28d4967538c2.json b/.sqlx/query-1aa9d51fee3918db168e3704d1ac0e80e5038e2619e5029597fd28d4967538c2.json new file mode 100644 index 0000000..e9fa77e --- /dev/null +++ b/.sqlx/query-1aa9d51fee3918db168e3704d1ac0e80e5038e2619e5029597fd28d4967538c2.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM noleader_leaders\n WHERE\n key = $1\n AND value = $2\n AND revision = $3\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "1aa9d51fee3918db168e3704d1ac0e80e5038e2619e5029597fd28d4967538c2" +} diff --git a/.sqlx/query-976e720a4dee2911278524a199d6d5ece23f141e4b4c094efe5fc3123e376b04.json b/.sqlx/query-976e720a4dee2911278524a199d6d5ece23f141e4b4c094efe5fc3123e376b04.json new file mode 100644 index 0000000..b51e2d9 --- /dev/null +++ b/.sqlx/query-976e720a4dee2911278524a199d6d5ece23f141e4b4c094efe5fc3123e376b04.json @@ -0,0 +1,43 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO noleader_leaders (key, value, revision, heartbeat)\n VALUES ($1, $2, $3, now())\n ON CONFLICT (key)\n DO UPDATE SET\n value = EXCLUDED.value,\n revision = EXCLUDED.revision,\n heartbeat = now()\n WHERE \n (\n -- Normal case: revision matches (we're the current leader updating)\n noleader_leaders.revision = $4\n OR\n -- Override case: heartbeat is old (stale leader)\n noleader_leaders.heartbeat < now() - INTERVAL '60 seconds'\n )\n RETURNING value, revision\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "value", + "type_info": "Text", + "origin": { + "Table": { + "table": "noleader_leaders", + "name": "value" + } + } + }, + { + "ordinal": 1, + "name": "revision", + "type_info": "Int8", + "origin": { + "Table": { + "table": "noleader_leaders", + "name": "revision" + } + } + } + ], + "parameters": { + "Left": [ + "Text", + "Text", + "Int8", + "Int8" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "976e720a4dee2911278524a199d6d5ece23f141e4b4c094efe5fc3123e376b04" +} diff --git a/crates/noleader/Cargo.toml b/crates/noleader/Cargo.toml index 705a688..3f24d4c 100644 --- a/crates/noleader/Cargo.toml +++ b/crates/noleader/Cargo.toml @@ -6,26 +6,34 @@ version.workspace = true license.workspace = true repository = "https://git.front.kjuulh.io/kjuulh/noleader" authors = ["kjuulh "] -description = "A small leader election package using NATS keyvalue store as the distributed locking mechanism. Does not require a min / max set of nodes" +description = "A small leader election package using NATS/Postgres keyvalue store as the distributed locking mechanism. Does not require a min / max set of nodes" [dependencies] anyhow.workspace = true tracing.workspace = true +tokio.workspace = true -async-nats = "0.42" uuid = { version = "1", features = ["v4", "v7"] } bytes = "1" -tokio.workspace = true tokio-util = "0.7" -rand = "0.9.1" -async-trait = "0.1.89" +rand = "0.9" +async-trait = "0.1" + +async-nats = { version = "0.42", optional = true } + # fork until dangerous set migrate table name is stable. Should be any version after 8.6 sqlx = { git = "https://github.com/launchbadge/sqlx", features = [ "uuid", "postgres", "runtime-tokio", "tls-rustls", -], rev = "064d649abdfd1742e5fdcc20176a6b415b9c25d3" } +], rev = "064d649abdfd1742e5fdcc20176a6b415b9c25d3", optional = true } [dev-dependencies] tracing-subscriber = { version = "0.3", features = ["env-filter"] } + + +[features] +default = ["nats", "postgres"] +nats = ["dep:async-nats"] +postgres = ["dep:sqlx"] diff --git a/crates/noleader/src/backend.rs b/crates/noleader/src/backend.rs index ca7ebac..e635353 100644 --- a/crates/noleader/src/backend.rs +++ b/crates/noleader/src/backend.rs @@ -1,8 +1,9 @@ use std::{ops::Deref, sync::Arc}; -use crate::backend::{nats::NatsBackend, postgres::PostgresBackend}; + #[cfg(feature = "nats")] mod nats; + #[cfg(feature = "postgres")] mod postgres; pub struct Backend { @@ -16,21 +17,24 @@ impl Backend { } } + #[cfg(feature = "nats")] pub fn nats(client: async_nats::Client, bucket: &str) -> Self { Self { - inner: Arc::new(NatsBackend::new(client, bucket)), + inner: Arc::new(nats::NatsBackend::new(client, bucket)), } } + #[cfg(feature = "postgres")] pub fn postgres(database_url: &str) -> Self { Self { - inner: Arc::new(PostgresBackend::new(database_url)), + inner: Arc::new(postgres::PostgresBackend::new(database_url)), } } + #[cfg(feature = "postgres")] pub fn postgres_with_pool(pool: sqlx::PgPool) -> Self { Self { - inner: Arc::new(PostgresBackend::new_with_pool("bogus", pool)), + inner: Arc::new(postgres::PostgresBackend::new_with_pool("bogus", pool)), } } } diff --git a/crates/noleader/src/backend/postgres.rs b/crates/noleader/src/backend/postgres.rs index 60399f7..a3c5277 100644 --- a/crates/noleader/src/backend/postgres.rs +++ b/crates/noleader/src/backend/postgres.rs @@ -81,16 +81,17 @@ impl BackendEdge for PostgresBackend { } async fn get(&self, key: &Key) -> anyhow::Result { - let rec = sqlx::query!( + let rec: Option = sqlx::query_as( " SELECT value, revision FROM noleader_leaders WHERE key = $1 AND heartbeat >= now() - interval '60 seconds' + LIMIT 1; ", - key.0 ) + .bind(&key.0) .fetch_optional(&self.db().await?) .await .context("get noleader key")?; @@ -114,7 +115,7 @@ impl BackendEdge for PostgresBackend { let current_rev = self.revision.load(Ordering::Relaxed); let new_rev = current_rev + 1; - let res = sqlx::query!( + let res: Result, sqlx::Error> = sqlx::query_as( r#" INSERT INTO noleader_leaders (key, value, revision, heartbeat) VALUES ($1, $2, $3, now()) @@ -133,11 +134,11 @@ impl BackendEdge for PostgresBackend { ) RETURNING value, revision "#, - key.0, - val.0.to_string(), - new_rev as i64, // new revision - current_rev as i64, // expected current revision ) + .bind(&key.0) + .bind(val.0.to_string()) + .bind(new_rev as i64) // new revision + .bind(current_rev as i64) // expected current revision .fetch_optional(&self.db().await?) .await; @@ -190,7 +191,7 @@ impl BackendEdge for PostgresBackend { async fn release(&self, key: &Key, val: &LeaderId) -> anyhow::Result<()> { let rev = self.revision.load(Ordering::Relaxed); - sqlx::query!( + sqlx::query( " DELETE FROM noleader_leaders WHERE @@ -198,10 +199,10 @@ impl BackendEdge for PostgresBackend { AND value = $2 AND revision = $3 ", - key.0, - val.0.to_string(), - rev as i64, // new revision ) + .bind(&key.0) + .bind(val.0.to_string()) + .bind(rev as i64) // new revision .execute(&self.db().await?) .await .context("failed to release lock, it will expire naturally")?; @@ -209,3 +210,15 @@ impl BackendEdge for PostgresBackend { Ok(()) } } + +#[derive(sqlx::FromRow)] +struct GetResult { + value: String, + revision: i64, +} + +#[derive(sqlx::FromRow)] +struct UpdateResult { + value: String, + revision: i64, +} diff --git a/crates/noleader/src/lib.rs b/crates/noleader/src/lib.rs index b1b906e..cc60627 100644 --- a/crates/noleader/src/lib.rs +++ b/crates/noleader/src/lib.rs @@ -37,14 +37,17 @@ impl Leader { } } + #[cfg(feature = "nats")] pub fn new_nats(key: &str, bucket: &str, client: async_nats::Client) -> Self { Self::new(key, Backend::nats(client, bucket)) } + #[cfg(feature = "postgres")] pub fn new_postgres(key: &str, database_url: &str) -> Self { Self::new(key, Backend::postgres(database_url)) } + #[cfg(feature = "postgres")] pub fn new_postgres_pool(key: &str, pool: sqlx::PgPool) -> Self { Self::new(key, Backend::postgres_with_pool(pool)) } diff --git a/mise.toml b/mise.toml index 7f77107..1a99571 100644 --- a/mise.toml +++ b/mise.toml @@ -9,3 +9,12 @@ run = "cargo nextest run" [tasks.example] alias = ["e"] run = "cargo run --example" + +[tasks."local:up"] +run = "docker compose -f ./templates/docker/docker-compose.yml up -d" + +[tasks."local:down"] +run = "docker compose -f ./templates/docker/docker-compose.yml down -v" + +[tasks."db:prepare"] +run = "cargo sqlx prepare --workspace"