12 Commits

Author SHA1 Message Date
1f739332a3 fix(deps): update rust crate uuid to v1.19.0
Some checks failed
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is failing
2025-12-02 04:44:24 +00:00
716649affd chore(deps): update tokio-tracing monorepo
Some checks failed
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is failing
2025-11-29 01:50:06 +00:00
5389bae737 fix(deps): update all dependencies
Some checks failed
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is failing
2025-11-16 01:43:10 +00:00
e855642ef5 chore(deps): update rust crate tracing-subscriber to v0.3.20
Some checks failed
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is failing
2025-11-13 02:23:15 +00:00
4527ee7829 feat: truly cancel if revision isn't as expected
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2025-09-25 11:07:14 +02:00
271823e278 feat: reset after failure
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2025-09-25 10:49:29 +02:00
b8bf6b6f43 feat: print as well
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2025-09-25 10:21:39 +02:00
c4c0d82305 feat: add publish
Some checks failed
continuous-integration/drone/push Build is failing
2025-09-24 21:43:34 +02:00
a69387c577 chore(release): v0.1.3 (#3)
Some checks failed
continuous-integration/drone/push Build is failing
chore(release): 0.1.3

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: #3
2025-09-24 21:43:06 +02:00
095d7d19d0 chore: Configure Renovate (#4)
Some checks failed
continuous-integration/drone/push Build is failing
Add renovate.json

Reviewed-on: #4
2025-09-24 21:42:52 +02:00
b1f43394d6 feat: add features for nats and postgres
All checks were successful
continuous-integration/drone/push Build is passing
2025-09-24 21:36:59 +02:00
fc190a12d4 feat: add postgres 2025-09-24 21:17:06 +02:00
19 changed files with 1438 additions and 187 deletions

3
.env Normal file
View File

@@ -0,0 +1,3 @@
DATABASE_URL=postgres://devuser:devpassword@localhost:5432/dev
#SQLX_OFFLINE=true

View File

@@ -0,0 +1,40 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT value, revision\n FROM noleader_leaders\n WHERE\n key = $1\n AND heartbeat >= now() - interval '60 seconds'\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "value",
"type_info": "Text",
"origin": {
"Table": {
"table": "noleader_leaders",
"name": "value"
}
}
},
{
"ordinal": 1,
"name": "revision",
"type_info": "Int8",
"origin": {
"Table": {
"table": "noleader_leaders",
"name": "revision"
}
}
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false,
false
]
},
"hash": "0461b6433be16583d2480de11d5b712de1229dff78624ecab5edcf9f05a2e0e4"
}

View File

@@ -0,0 +1,16 @@
{
"db_name": "PostgreSQL",
"query": "\n DELETE FROM noleader_leaders\n WHERE\n key = $1\n AND value = $2\n AND revision = $3\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Text",
"Int8"
]
},
"nullable": []
},
"hash": "1aa9d51fee3918db168e3704d1ac0e80e5038e2619e5029597fd28d4967538c2"
}

View File

@@ -0,0 +1,43 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO noleader_leaders (key, value, revision, heartbeat)\n VALUES ($1, $2, $3, now())\n ON CONFLICT (key)\n DO UPDATE SET\n value = EXCLUDED.value,\n revision = EXCLUDED.revision,\n heartbeat = now()\n WHERE \n (\n -- Normal case: revision matches (we're the current leader updating)\n noleader_leaders.revision = $4\n OR\n -- Override case: heartbeat is old (stale leader)\n noleader_leaders.heartbeat < now() - INTERVAL '60 seconds'\n )\n RETURNING value, revision\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "value",
"type_info": "Text",
"origin": {
"Table": {
"table": "noleader_leaders",
"name": "value"
}
}
},
{
"ordinal": 1,
"name": "revision",
"type_info": "Int8",
"origin": {
"Table": {
"table": "noleader_leaders",
"name": "revision"
}
}
}
],
"parameters": {
"Left": [
"Text",
"Text",
"Int8",
"Int8"
]
},
"nullable": [
false,
false
]
},
"hash": "976e720a4dee2911278524a199d6d5ece23f141e4b4c094efe5fc3123e376b04"
}

View File

@@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [0.1.3] - 2025-09-24 ## [0.1.3] - 2025-09-24
### Added ### Added
- add features for nats and postgres
- add postgres
- extract backend - extract backend
- do publish - do publish
- allow readme - allow readme

962
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -6,19 +6,34 @@ version.workspace = true
license.workspace = true license.workspace = true
repository = "https://git.front.kjuulh.io/kjuulh/noleader" repository = "https://git.front.kjuulh.io/kjuulh/noleader"
authors = ["kjuulh <contact@kasperhermansen.com>"] authors = ["kjuulh <contact@kasperhermansen.com>"]
description = "A small leader election package using NATS keyvalue store as the distributed locking mechanism. Does not require a min / max set of nodes" description = "A small leader election package using NATS/Postgres keyvalue store as the distributed locking mechanism. Does not require a min / max set of nodes"
[dependencies] [dependencies]
anyhow.workspace = true anyhow.workspace = true
tracing.workspace = true tracing.workspace = true
tokio.workspace = true
async-nats = "0.42"
uuid = { version = "1", features = ["v4", "v7"] } uuid = { version = "1", features = ["v4", "v7"] }
bytes = "1" bytes = "1"
tokio.workspace = true
tokio-util = "0.7" tokio-util = "0.7"
rand = "0.9.1" rand = "0.9"
async-trait = "0.1.89" async-trait = "0.1"
async-nats = { version = "0.45", optional = true }
# fork until dangerous set migrate table name is stable. Should be any version after 8.6
sqlx = { git = "https://github.com/launchbadge/sqlx", features = [
"uuid",
"postgres",
"runtime-tokio",
"tls-rustls",
], rev = "064d649abdfd1742e5fdcc20176a6b415b9c25d3", optional = true }
[dev-dependencies] [dev-dependencies]
tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] }
[features]
default = ["nats", "postgres"]
nats = ["dep:async-nats"]
postgres = ["dep:sqlx"]

View File

@@ -1,4 +1,3 @@
use tokio_util::sync::CancellationToken;
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
#[tokio::main] #[tokio::main]
@@ -32,10 +31,7 @@ async fn main() -> anyhow::Result<()> {
async move { async move {
tracing::debug!(leader_id, "starting leader"); tracing::debug!(leader_id, "starting leader");
leader leader.start().await.expect("to succeed");
.start(CancellationToken::default())
.await
.expect("to succeed");
} }
}); });

View File

@@ -0,0 +1,96 @@
use anyhow::Context;
use tokio_util::sync::CancellationToken;
use tracing_subscriber::EnvFilter;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Set up logger
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::from_default_env()
.add_directive("noleader=debug".parse().unwrap())
.add_directive("lots_of_candidates=debug".parse().unwrap())
.add_directive("info".parse().unwrap()),
)
.init();
let mykey = "myleaderkey";
let mut handles = Vec::new();
let db_url = &std::env::var("DATABASE_URL").context("DATABASE_URL is missing")?;
let pool = sqlx::PgPool::connect_lazy(db_url)?;
let cancel = CancellationToken::new();
let mut cancelled_resp = Vec::new();
tokio::spawn({
let cancel = cancel.clone();
async move {
tokio::signal::ctrl_c().await.expect("to receive shutdown");
cancel.cancel();
}
});
for _ in 0..100 {
let pool = pool.clone();
let cancel = cancel.child_token();
let item_cancellation = CancellationToken::new();
cancelled_resp.push(item_cancellation.child_token());
let handle = tokio::spawn(async move {
let mut leader = noleader::Leader::new_postgres_pool(mykey, pool);
leader.with_cancellation(cancel);
let leader_id = leader.leader_id().await.to_string();
tokio::spawn({
let leader = leader.clone();
let leader_id = leader_id.clone();
async move {
tracing::debug!(leader_id, "starting leader");
let res = leader.start().await;
tracing::warn!("shutting down");
item_cancellation.cancel();
if let Err(e) = res {
tracing::error!("lots failed: {e:?}");
}
}
});
loop {
tokio::time::sleep(std::time::Duration::from_millis(10000)).await;
match leader.is_leader().await {
noleader::Status::Leader => {
tracing::info!(leader_id, "is leader");
}
noleader::Status::Candidate => {
//tracing::debug!("is candiate");
}
}
}
#[allow(unreachable_code)]
Ok::<(), anyhow::Error>(())
});
handles.push(handle);
}
for cancel in cancelled_resp {
cancel.cancelled().await;
}
for handle in handles {
handle.abort();
}
Ok(())
}

View File

@@ -0,0 +1,49 @@
use anyhow::Context;
use tokio::signal;
use tracing_subscriber::EnvFilter;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Set up logger
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::from_default_env()
.add_directive("noleader=debug".parse().unwrap())
.add_directive("lots_of_candidates=debug".parse().unwrap())
.add_directive("info".parse().unwrap()),
)
.init();
let mykey = "postgres";
let mut leader = noleader::Leader::new_postgres(
mykey,
&std::env::var("DATABASE_URL").context("DATABASE_URL is missing")?,
);
leader.with_cancel_task(async move {
signal::ctrl_c().await.unwrap();
});
let leader_id = leader.leader_id().await.to_string();
leader
.acquire_and_run({
move |token| {
let leader_id = leader_id.clone();
async move {
loop {
if token.is_cancelled() {
return Ok(());
}
tracing::info!(leader_id, "do work as leader");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
}
})
.await?;
Ok(())
}

View File

@@ -0,0 +1,8 @@
-- Add migration script here
CREATE TABLE IF NOT EXISTS noleader_leaders (
key TEXT PRIMARY KEY NOT NULL,
value TEXT NOT NULL,
revision BIGINT NOT NULL,
heartbeat TIMESTAMPTZ NOT NULL DEFAULT now()
);

View File

@@ -1,8 +1,10 @@
use std::{ops::Deref, sync::Arc}; use std::{ops::Deref, sync::Arc};
use crate::backend::nats::NatsBackend;
#[cfg(feature = "nats")]
mod nats; mod nats;
#[cfg(feature = "postgres")]
mod postgres;
pub struct Backend { pub struct Backend {
inner: Arc<dyn BackendEdge + Send + Sync + 'static>, inner: Arc<dyn BackendEdge + Send + Sync + 'static>,
@@ -15,9 +17,24 @@ impl Backend {
} }
} }
#[cfg(feature = "nats")]
pub fn nats(client: async_nats::Client, bucket: &str) -> Self { pub fn nats(client: async_nats::Client, bucket: &str) -> Self {
Self { Self {
inner: Arc::new(NatsBackend::new(client, bucket)), inner: Arc::new(nats::NatsBackend::new(client, bucket)),
}
}
#[cfg(feature = "postgres")]
pub fn postgres(database_url: &str) -> Self {
Self {
inner: Arc::new(postgres::PostgresBackend::new(database_url)),
}
}
#[cfg(feature = "postgres")]
pub fn postgres_with_pool(pool: sqlx::PgPool) -> Self {
Self {
inner: Arc::new(postgres::PostgresBackend::new_with_pool("bogus", pool)),
} }
} }
} }
@@ -35,6 +52,7 @@ pub trait BackendEdge {
async fn setup(&self) -> anyhow::Result<()>; async fn setup(&self) -> anyhow::Result<()>;
async fn get(&self, key: &Key) -> anyhow::Result<LeaderValue>; async fn get(&self, key: &Key) -> anyhow::Result<LeaderValue>;
async fn update(&self, key: &Key, val: &LeaderId) -> anyhow::Result<()>; async fn update(&self, key: &Key, val: &LeaderId) -> anyhow::Result<()>;
async fn release(&self, key: &Key, val: &LeaderId) -> anyhow::Result<()>;
} }
pub enum LeaderValue { pub enum LeaderValue {

View File

@@ -122,4 +122,10 @@ impl BackendEdge for NatsBackend {
Ok(()) Ok(())
} }
async fn release(&self, _key: &Key, _val: &LeaderId) -> anyhow::Result<()> {
// TODO: implement release for nats
Ok(())
}
} }

View File

@@ -0,0 +1,237 @@
use std::{
sync::atomic::{AtomicU64, Ordering},
time::Duration,
};
use anyhow::Context;
use sqlx::{PgPool, postgres::PgPoolOptions};
use tokio::sync::OnceCell;
use crate::backend::{BackendEdge, Key, LeaderId, LeaderValue};
pub struct PostgresBackend {
database_url: String,
revision: AtomicU64,
pool: OnceCell<PgPool>,
migrated: OnceCell<()>,
}
impl PostgresBackend {
pub fn new(database_url: &str) -> Self {
Self {
database_url: database_url.into(),
revision: AtomicU64::new(0),
pool: OnceCell::new(),
migrated: OnceCell::new(),
}
}
pub fn new_with_pool(database_url: &str, pool: PgPool) -> Self {
Self {
database_url: database_url.into(),
revision: AtomicU64::new(0),
pool: OnceCell::new_with(Some(pool)),
migrated: OnceCell::new(),
}
}
async fn db(&self) -> anyhow::Result<PgPool> {
let pool = self
.pool
.get_or_try_init(|| async move {
PgPoolOptions::new()
.max_connections(1)
.min_connections(0)
.idle_timeout(Some(Duration::from_secs(5)))
.connect_lazy(&self.database_url)
.context("connect postgres noleader")
})
.await?;
Ok(pool.clone())
}
async fn migrate(&self) -> anyhow::Result<()> {
self.migrated
.get_or_try_init(|| async move {
let db = self.db().await?;
let mut migrate = sqlx::migrate!("./migrations/postgres/");
migrate
.set_locking(false)
.dangerous_set_table_name("_sqlx_noleader_migrations")
.run(&db)
.await
.context("migrate noleader")?;
Ok::<_, anyhow::Error>(())
})
.await?;
Ok(())
}
}
#[async_trait::async_trait]
impl BackendEdge for PostgresBackend {
async fn setup(&self) -> anyhow::Result<()> {
self.migrate().await?;
Ok(())
}
async fn get(&self, key: &Key) -> anyhow::Result<LeaderValue> {
let rec: Option<GetResult> = sqlx::query_as(
"
SELECT value, revision
FROM noleader_leaders
WHERE
key = $1
AND revision = $2
AND heartbeat >= now() - interval '60 seconds'
LIMIT 1;
",
)
.bind(&key.0)
.bind(self.revision.load(Ordering::Relaxed) as i64)
.fetch_optional(&self.db().await?)
.await
.context("get noleader key")?;
let Some(val) = rec else {
self.revision.store(0, Ordering::Relaxed);
anyhow::bail!("key doesn't exist, we've lost leadership status")
};
// Update our local revision to match what's in the database
self.revision.store(val.revision as u64, Ordering::Relaxed);
let Ok(id) = uuid::Uuid::parse_str(&val.value) else {
tracing::warn!("value is not a valid uuid: {}", val.value);
self.revision.store(0, Ordering::Relaxed);
return Ok(LeaderValue::Unknown);
};
Ok(LeaderValue::Found { id: id.into() })
}
async fn update(&self, key: &Key, val: &LeaderId) -> anyhow::Result<()> {
let current_rev = self.revision.load(Ordering::Relaxed);
let new_rev = current_rev + 1;
let res: Result<Option<UpdateResult>, sqlx::Error> = sqlx::query_as(
r#"
INSERT INTO noleader_leaders (key, value, revision, heartbeat)
VALUES ($1, $2, $3, now())
ON CONFLICT (key)
DO UPDATE SET
value = EXCLUDED.value,
revision = EXCLUDED.revision,
heartbeat = now()
WHERE
(
-- Normal case: revision matches (we're the current leader updating)
noleader_leaders.revision = $4
OR
-- Override case: heartbeat is old (stale leader)
noleader_leaders.heartbeat < now() - INTERVAL '60 seconds'
)
RETURNING value, revision
"#,
)
.bind(&key.0)
.bind(val.0.to_string())
.bind(new_rev as i64) // new revision
.bind(current_rev as i64) // expected current revision
.fetch_optional(&self.db().await?)
.await;
let res = match res {
Ok(res) => res,
Err(e) => {
self.revision.store(0, Ordering::Relaxed);
match &e {
sqlx::Error::Database(database_error) => {
if database_error.is_unique_violation() {
anyhow::bail!("update conflict: another leader holds lock")
} else {
anyhow::bail!(e);
}
}
_ => {
anyhow::bail!(e);
}
}
}
};
match res {
Some(rec) => {
if rec.value == val.0.to_string() && rec.revision == new_rev as i64 {
tracing::debug!(
val = val.0.to_string(),
revision = rec.revision,
"successfully updated leader"
);
// Only update our local revision if the update succeeded with our expected value
self.revision.store(rec.revision as u64, Ordering::Relaxed);
} else {
self.revision.store(0, Ordering::Relaxed);
anyhow::bail!(
"update conflict: expected value={}, revision={}, got value={}, revision={}",
val.0.to_string(),
new_rev,
rec.value,
rec.revision
);
}
}
None => {
anyhow::bail!(
"update rejected: another leader is holding the lock or revision mismatch"
)
}
}
Ok(())
}
async fn release(&self, key: &Key, val: &LeaderId) -> anyhow::Result<()> {
let rev = self.revision.load(Ordering::Relaxed);
sqlx::query(
"
DELETE FROM noleader_leaders
WHERE
key = $1
AND value = $2
AND revision = $3
",
)
.bind(&key.0)
.bind(val.0.to_string())
.bind(rev as i64) // new revision
.execute(&self.db().await?)
.await
.context("failed to release lock, it will expire naturally")?;
self.revision.store(0, Ordering::Relaxed);
Ok(())
}
}
#[derive(sqlx::FromRow)]
struct GetResult {
value: String,
revision: i64,
}
#[derive(sqlx::FromRow)]
struct UpdateResult {
value: String,
revision: i64,
}

View File

@@ -20,6 +20,8 @@ pub struct Leader {
shutting_down: Arc<AtomicBool>, shutting_down: Arc<AtomicBool>,
is_leader: Arc<AtomicBool>, is_leader: Arc<AtomicBool>,
inner: Arc<RwLock<InnerLeader>>, inner: Arc<RwLock<InnerLeader>>,
cancellation: CancellationToken,
} }
const DEFAULT_INTERVAL: Duration = std::time::Duration::from_secs(10); const DEFAULT_INTERVAL: Duration = std::time::Duration::from_secs(10);
@@ -31,28 +33,59 @@ impl Leader {
shutting_down: Arc::new(AtomicBool::new(false)), shutting_down: Arc::new(AtomicBool::new(false)),
is_leader: Arc::new(AtomicBool::new(false)), is_leader: Arc::new(AtomicBool::new(false)),
inner: Arc::new(RwLock::new(InnerLeader::new(backend, key))), inner: Arc::new(RwLock::new(InnerLeader::new(backend, key))),
cancellation: CancellationToken::new(),
} }
} }
#[cfg(feature = "nats")]
pub fn new_nats(key: &str, bucket: &str, client: async_nats::Client) -> Self { pub fn new_nats(key: &str, bucket: &str, client: async_nats::Client) -> Self {
Self::new(key, Backend::nats(client, bucket)) Self::new(key, Backend::nats(client, bucket))
} }
#[cfg(feature = "postgres")]
pub fn new_postgres(key: &str, database_url: &str) -> Self {
Self::new(key, Backend::postgres(database_url))
}
#[cfg(feature = "postgres")]
pub fn new_postgres_pool(key: &str, pool: sqlx::PgPool) -> Self {
Self::new(key, Backend::postgres_with_pool(pool))
}
pub fn with_cancellation(&mut self, cancellation: CancellationToken) -> &mut Self {
self.cancellation = cancellation;
self
}
pub fn with_cancel_task<T>(&mut self, f: T) -> &mut Self
where
T: Future<Output = ()> + Send + 'static,
{
let cancel = self.cancellation.clone();
tokio::spawn(async move {
f.await;
cancel.cancel();
});
self
}
pub async fn acquire_and_run<F, Fut>(&self, f: F) -> anyhow::Result<()> pub async fn acquire_and_run<F, Fut>(&self, f: F) -> anyhow::Result<()>
where where
F: Fn(CancellationToken) -> Fut, F: Fn(CancellationToken) -> Fut,
Fut: Future<Output = anyhow::Result<()>> + Send + 'static, Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
{ {
let parent_token = CancellationToken::default(); let parent_token = self.cancellation.clone();
let s = self.clone(); let s = self.clone();
let server_token = parent_token.child_token(); let server_token = parent_token.child_token();
// Start the server election process in another task, this is because start is blocking // Start the server election process in another task, this is because start is blocking
let handle = tokio::spawn({ let handle = tokio::spawn({
let server_token = server_token.child_token();
async move { async move {
match s.start(server_token).await { match s.start().await {
Ok(_) => {} Ok(_) => {}
Err(e) => tracing::error!("leader election process failed: {}", e), Err(e) => tracing::error!("leader election process failed: {}", e),
} }
@@ -72,6 +105,11 @@ impl Leader {
server_token.cancel(); server_token.cancel();
// Close down the task as well, it should already be stopped, but this forces the task to close // Close down the task as well, it should already be stopped, but this forces the task to close
handle.abort(); handle.abort();
{
self.inner.write().await.cleanup().await?;
}
res?; res?;
Ok(()) Ok(())
@@ -95,12 +133,24 @@ impl Leader {
F: Fn(CancellationToken) -> Fut, F: Fn(CancellationToken) -> Fut,
Fut: Future<Output = anyhow::Result<()>> + Send + 'static, Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
{ {
let cancellation_token = cancellation_token.child_token();
loop { loop {
if cancellation_token.is_cancelled() {
return Ok(());
}
let cancellation_token = cancellation_token.child_token(); let cancellation_token = cancellation_token.child_token();
let is_leader = self.is_leader.clone(); let is_leader = self.is_leader.clone();
if !is_leader.load(Ordering::Relaxed) { if !is_leader.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_secs(1)).await; tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(1)) => {}
_ = cancellation_token.cancelled() => {
return Ok(());
}
}
continue; continue;
} }
@@ -111,7 +161,7 @@ impl Leader {
tokio::select! { tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_millis(500)) => {} _ = tokio::time::sleep(std::time::Duration::from_millis(500)) => {}
_ = cancellation_token.cancelled() => { _ = cancellation_token.cancelled() => {
break; return;
} }
} }
@@ -122,7 +172,9 @@ impl Leader {
} }
}); });
tracing::info!("starting leader actions");
let res = f(child_token).await; let res = f(child_token).await;
guard.abort(); guard.abort();
res?; res?;
} }
@@ -133,7 +185,7 @@ impl Leader {
inner.leader_id.clone().into() inner.leader_id.clone().into()
} }
pub async fn start(&self, cancellation_token: CancellationToken) -> anyhow::Result<()> { pub async fn start(&self) -> anyhow::Result<()> {
let mut attempts = 1; let mut attempts = 1;
{ {
@@ -153,7 +205,7 @@ impl Leader {
tokio::select! { tokio::select! {
_ = sleep_fut => {}, _ = sleep_fut => {},
_ = cancellation_token.cancelled() => { _ = self.cancellation.cancelled() => {
self.shutting_down.store(true, std::sync::atomic::Ordering::Relaxed); // Ordering can be relaxed, because our operation is an atomic update self.shutting_down.store(true, std::sync::atomic::Ordering::Relaxed); // Ordering can be relaxed, because our operation is an atomic update
return Ok(()) return Ok(())
} }
@@ -214,7 +266,6 @@ struct InnerLeader {
key: Key, key: Key,
leader_id: LeaderId, leader_id: LeaderId,
revision: u64,
} }
#[derive(Default, Clone)] #[derive(Default, Clone)]
@@ -230,7 +281,6 @@ impl InnerLeader {
Self { Self {
backend, backend,
leader_id: LeaderId::new(), leader_id: LeaderId::new(),
revision: u64::MIN,
key: key.into(), key: key.into(),
@@ -275,6 +325,15 @@ impl InnerLeader {
Ok(()) Ok(())
} }
pub async fn cleanup(&self) -> anyhow::Result<()> {
self.backend
.release(&self.key, &self.leader_id)
.await
.context("cleanup")?;
Ok(())
}
async fn update_leadership(&mut self) -> anyhow::Result<()> { async fn update_leadership(&mut self) -> anyhow::Result<()> {
let val = self let val = self
.backend .backend

View File

@@ -5,6 +5,8 @@ base: "git@git.front.kjuulh.io:kjuulh/cuddle-rust-lib-plan.git"
vars: vars:
service: "noleader" service: "noleader"
registry: kasperhermansen registry: kasperhermansen
rust:
publish: {}
please: please:
project: project:

View File

@@ -1,3 +1,7 @@
[env]
_.file = ".env"
[tasks.test] [tasks.test]
alias = ["t"] alias = ["t"]
run = "cargo nextest run" run = "cargo nextest run"
@@ -5,3 +9,12 @@ run = "cargo nextest run"
[tasks.example] [tasks.example]
alias = ["e"] alias = ["e"]
run = "cargo run --example" run = "cargo run --example"
[tasks."local:up"]
run = "docker compose -f ./templates/docker/docker-compose.yml up -d"
[tasks."local:down"]
run = "docker compose -f ./templates/docker/docker-compose.yml down -v"
[tasks."db:prepare"]
run = "cargo sqlx prepare --workspace"

3
renovate.json Normal file
View File

@@ -0,0 +1,3 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
}

View File

@@ -7,3 +7,18 @@ services:
- "4222:4222" # Client connections - "4222:4222" # Client connections
- "8222:8222" # HTTP monitoring - "8222:8222" # HTTP monitoring
- "6222:6222" # Clustering - "6222:6222" # Clustering
postgres:
image: postgres:17-alpine
environment:
POSTGRES_USER: devuser
POSTGRES_PASSWORD: devpassword
POSTGRES_DB: dev
shm_size: 128mb
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U devuser -d dev"]
interval: 5s
timeout: 5s
retries: 5