feat: add features for nats and postgres
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2025-09-24 21:36:46 +02:00
parent fc190a12d4
commit b1f43394d6
9 changed files with 159 additions and 21 deletions

2
.env
View File

@@ -1 +1,3 @@
DATABASE_URL=postgres://devuser:devpassword@localhost:5432/dev DATABASE_URL=postgres://devuser:devpassword@localhost:5432/dev
#SQLX_OFFLINE=true

View File

@@ -0,0 +1,40 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT value, revision\n FROM noleader_leaders\n WHERE\n key = $1\n AND heartbeat >= now() - interval '60 seconds'\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "value",
"type_info": "Text",
"origin": {
"Table": {
"table": "noleader_leaders",
"name": "value"
}
}
},
{
"ordinal": 1,
"name": "revision",
"type_info": "Int8",
"origin": {
"Table": {
"table": "noleader_leaders",
"name": "revision"
}
}
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false,
false
]
},
"hash": "0461b6433be16583d2480de11d5b712de1229dff78624ecab5edcf9f05a2e0e4"
}

View File

@@ -0,0 +1,16 @@
{
"db_name": "PostgreSQL",
"query": "\n DELETE FROM noleader_leaders\n WHERE\n key = $1\n AND value = $2\n AND revision = $3\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Text",
"Int8"
]
},
"nullable": []
},
"hash": "1aa9d51fee3918db168e3704d1ac0e80e5038e2619e5029597fd28d4967538c2"
}

View File

@@ -0,0 +1,43 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO noleader_leaders (key, value, revision, heartbeat)\n VALUES ($1, $2, $3, now())\n ON CONFLICT (key)\n DO UPDATE SET\n value = EXCLUDED.value,\n revision = EXCLUDED.revision,\n heartbeat = now()\n WHERE \n (\n -- Normal case: revision matches (we're the current leader updating)\n noleader_leaders.revision = $4\n OR\n -- Override case: heartbeat is old (stale leader)\n noleader_leaders.heartbeat < now() - INTERVAL '60 seconds'\n )\n RETURNING value, revision\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "value",
"type_info": "Text",
"origin": {
"Table": {
"table": "noleader_leaders",
"name": "value"
}
}
},
{
"ordinal": 1,
"name": "revision",
"type_info": "Int8",
"origin": {
"Table": {
"table": "noleader_leaders",
"name": "revision"
}
}
}
],
"parameters": {
"Left": [
"Text",
"Text",
"Int8",
"Int8"
]
},
"nullable": [
false,
false
]
},
"hash": "976e720a4dee2911278524a199d6d5ece23f141e4b4c094efe5fc3123e376b04"
}

View File

@@ -6,26 +6,34 @@ version.workspace = true
license.workspace = true license.workspace = true
repository = "https://git.front.kjuulh.io/kjuulh/noleader" repository = "https://git.front.kjuulh.io/kjuulh/noleader"
authors = ["kjuulh <contact@kasperhermansen.com>"] authors = ["kjuulh <contact@kasperhermansen.com>"]
description = "A small leader election package using NATS keyvalue store as the distributed locking mechanism. Does not require a min / max set of nodes" description = "A small leader election package using NATS/Postgres keyvalue store as the distributed locking mechanism. Does not require a min / max set of nodes"
[dependencies] [dependencies]
anyhow.workspace = true anyhow.workspace = true
tracing.workspace = true tracing.workspace = true
tokio.workspace = true
async-nats = "0.42"
uuid = { version = "1", features = ["v4", "v7"] } uuid = { version = "1", features = ["v4", "v7"] }
bytes = "1" bytes = "1"
tokio.workspace = true
tokio-util = "0.7" tokio-util = "0.7"
rand = "0.9.1" rand = "0.9"
async-trait = "0.1.89" async-trait = "0.1"
async-nats = { version = "0.42", optional = true }
# fork until dangerous set migrate table name is stable. Should be any version after 8.6 # fork until dangerous set migrate table name is stable. Should be any version after 8.6
sqlx = { git = "https://github.com/launchbadge/sqlx", features = [ sqlx = { git = "https://github.com/launchbadge/sqlx", features = [
"uuid", "uuid",
"postgres", "postgres",
"runtime-tokio", "runtime-tokio",
"tls-rustls", "tls-rustls",
], rev = "064d649abdfd1742e5fdcc20176a6b415b9c25d3" } ], rev = "064d649abdfd1742e5fdcc20176a6b415b9c25d3", optional = true }
[dev-dependencies] [dev-dependencies]
tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] }
[features]
default = ["nats", "postgres"]
nats = ["dep:async-nats"]
postgres = ["dep:sqlx"]

View File

@@ -1,8 +1,9 @@
use std::{ops::Deref, sync::Arc}; use std::{ops::Deref, sync::Arc};
use crate::backend::{nats::NatsBackend, postgres::PostgresBackend};
#[cfg(feature = "nats")]
mod nats; mod nats;
#[cfg(feature = "postgres")]
mod postgres; mod postgres;
pub struct Backend { pub struct Backend {
@@ -16,21 +17,24 @@ impl Backend {
} }
} }
#[cfg(feature = "nats")]
pub fn nats(client: async_nats::Client, bucket: &str) -> Self { pub fn nats(client: async_nats::Client, bucket: &str) -> Self {
Self { Self {
inner: Arc::new(NatsBackend::new(client, bucket)), inner: Arc::new(nats::NatsBackend::new(client, bucket)),
} }
} }
#[cfg(feature = "postgres")]
pub fn postgres(database_url: &str) -> Self { pub fn postgres(database_url: &str) -> Self {
Self { Self {
inner: Arc::new(PostgresBackend::new(database_url)), inner: Arc::new(postgres::PostgresBackend::new(database_url)),
} }
} }
#[cfg(feature = "postgres")]
pub fn postgres_with_pool(pool: sqlx::PgPool) -> Self { pub fn postgres_with_pool(pool: sqlx::PgPool) -> Self {
Self { Self {
inner: Arc::new(PostgresBackend::new_with_pool("bogus", pool)), inner: Arc::new(postgres::PostgresBackend::new_with_pool("bogus", pool)),
} }
} }
} }

View File

@@ -81,16 +81,17 @@ impl BackendEdge for PostgresBackend {
} }
async fn get(&self, key: &Key) -> anyhow::Result<LeaderValue> { async fn get(&self, key: &Key) -> anyhow::Result<LeaderValue> {
let rec = sqlx::query!( let rec: Option<GetResult> = sqlx::query_as(
" "
SELECT value, revision SELECT value, revision
FROM noleader_leaders FROM noleader_leaders
WHERE WHERE
key = $1 key = $1
AND heartbeat >= now() - interval '60 seconds' AND heartbeat >= now() - interval '60 seconds'
LIMIT 1;
", ",
key.0
) )
.bind(&key.0)
.fetch_optional(&self.db().await?) .fetch_optional(&self.db().await?)
.await .await
.context("get noleader key")?; .context("get noleader key")?;
@@ -114,7 +115,7 @@ impl BackendEdge for PostgresBackend {
let current_rev = self.revision.load(Ordering::Relaxed); let current_rev = self.revision.load(Ordering::Relaxed);
let new_rev = current_rev + 1; let new_rev = current_rev + 1;
let res = sqlx::query!( let res: Result<Option<UpdateResult>, sqlx::Error> = sqlx::query_as(
r#" r#"
INSERT INTO noleader_leaders (key, value, revision, heartbeat) INSERT INTO noleader_leaders (key, value, revision, heartbeat)
VALUES ($1, $2, $3, now()) VALUES ($1, $2, $3, now())
@@ -133,11 +134,11 @@ impl BackendEdge for PostgresBackend {
) )
RETURNING value, revision RETURNING value, revision
"#, "#,
key.0,
val.0.to_string(),
new_rev as i64, // new revision
current_rev as i64, // expected current revision
) )
.bind(&key.0)
.bind(val.0.to_string())
.bind(new_rev as i64) // new revision
.bind(current_rev as i64) // expected current revision
.fetch_optional(&self.db().await?) .fetch_optional(&self.db().await?)
.await; .await;
@@ -190,7 +191,7 @@ impl BackendEdge for PostgresBackend {
async fn release(&self, key: &Key, val: &LeaderId) -> anyhow::Result<()> { async fn release(&self, key: &Key, val: &LeaderId) -> anyhow::Result<()> {
let rev = self.revision.load(Ordering::Relaxed); let rev = self.revision.load(Ordering::Relaxed);
sqlx::query!( sqlx::query(
" "
DELETE FROM noleader_leaders DELETE FROM noleader_leaders
WHERE WHERE
@@ -198,10 +199,10 @@ impl BackendEdge for PostgresBackend {
AND value = $2 AND value = $2
AND revision = $3 AND revision = $3
", ",
key.0,
val.0.to_string(),
rev as i64, // new revision
) )
.bind(&key.0)
.bind(val.0.to_string())
.bind(rev as i64) // new revision
.execute(&self.db().await?) .execute(&self.db().await?)
.await .await
.context("failed to release lock, it will expire naturally")?; .context("failed to release lock, it will expire naturally")?;
@@ -209,3 +210,15 @@ impl BackendEdge for PostgresBackend {
Ok(()) Ok(())
} }
} }
#[derive(sqlx::FromRow)]
struct GetResult {
value: String,
revision: i64,
}
#[derive(sqlx::FromRow)]
struct UpdateResult {
value: String,
revision: i64,
}

View File

@@ -37,14 +37,17 @@ impl Leader {
} }
} }
#[cfg(feature = "nats")]
pub fn new_nats(key: &str, bucket: &str, client: async_nats::Client) -> Self { pub fn new_nats(key: &str, bucket: &str, client: async_nats::Client) -> Self {
Self::new(key, Backend::nats(client, bucket)) Self::new(key, Backend::nats(client, bucket))
} }
#[cfg(feature = "postgres")]
pub fn new_postgres(key: &str, database_url: &str) -> Self { pub fn new_postgres(key: &str, database_url: &str) -> Self {
Self::new(key, Backend::postgres(database_url)) Self::new(key, Backend::postgres(database_url))
} }
#[cfg(feature = "postgres")]
pub fn new_postgres_pool(key: &str, pool: sqlx::PgPool) -> Self { pub fn new_postgres_pool(key: &str, pool: sqlx::PgPool) -> Self {
Self::new(key, Backend::postgres_with_pool(pool)) Self::new(key, Backend::postgres_with_pool(pool))
} }

View File

@@ -9,3 +9,12 @@ run = "cargo nextest run"
[tasks.example] [tasks.example]
alias = ["e"] alias = ["e"]
run = "cargo run --example" run = "cargo run --example"
[tasks."local:up"]
run = "docker compose -f ./templates/docker/docker-compose.yml up -d"
[tasks."local:down"]
run = "docker compose -f ./templates/docker/docker-compose.yml down -v"
[tasks."db:prepare"]
run = "cargo sqlx prepare --workspace"