Compare commits
16 Commits
v0.1.2
...
cuddle-ple
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
76eba6a284 | ||
| 1f739332a3 | |||
| 716649affd | |||
| 5389bae737 | |||
| e855642ef5 | |||
|
4527ee7829
|
|||
|
271823e278
|
|||
|
b8bf6b6f43
|
|||
| c4c0d82305 | |||
| a69387c577 | |||
| 095d7d19d0 | |||
| b1f43394d6 | |||
| fc190a12d4 | |||
| 8b27219af5 | |||
| c334dba445 | |||
| aaf3a72d3b |
3
.env
Normal file
3
.env
Normal file
@@ -0,0 +1,3 @@
|
||||
DATABASE_URL=postgres://devuser:devpassword@localhost:5432/dev
|
||||
|
||||
#SQLX_OFFLINE=true
|
||||
40
.sqlx/query-0461b6433be16583d2480de11d5b712de1229dff78624ecab5edcf9f05a2e0e4.json
generated
Normal file
40
.sqlx/query-0461b6433be16583d2480de11d5b712de1229dff78624ecab5edcf9f05a2e0e4.json
generated
Normal file
@@ -0,0 +1,40 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT value, revision\n FROM noleader_leaders\n WHERE\n key = $1\n AND heartbeat >= now() - interval '60 seconds'\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "value",
|
||||
"type_info": "Text",
|
||||
"origin": {
|
||||
"Table": {
|
||||
"table": "noleader_leaders",
|
||||
"name": "value"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "revision",
|
||||
"type_info": "Int8",
|
||||
"origin": {
|
||||
"Table": {
|
||||
"table": "noleader_leaders",
|
||||
"name": "revision"
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "0461b6433be16583d2480de11d5b712de1229dff78624ecab5edcf9f05a2e0e4"
|
||||
}
|
||||
16
.sqlx/query-1aa9d51fee3918db168e3704d1ac0e80e5038e2619e5029597fd28d4967538c2.json
generated
Normal file
16
.sqlx/query-1aa9d51fee3918db168e3704d1ac0e80e5038e2619e5029597fd28d4967538c2.json
generated
Normal file
@@ -0,0 +1,16 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n DELETE FROM noleader_leaders\n WHERE\n key = $1\n AND value = $2\n AND revision = $3\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text",
|
||||
"Text",
|
||||
"Int8"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "1aa9d51fee3918db168e3704d1ac0e80e5038e2619e5029597fd28d4967538c2"
|
||||
}
|
||||
43
.sqlx/query-976e720a4dee2911278524a199d6d5ece23f141e4b4c094efe5fc3123e376b04.json
generated
Normal file
43
.sqlx/query-976e720a4dee2911278524a199d6d5ece23f141e4b4c094efe5fc3123e376b04.json
generated
Normal file
@@ -0,0 +1,43 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n INSERT INTO noleader_leaders (key, value, revision, heartbeat)\n VALUES ($1, $2, $3, now())\n ON CONFLICT (key)\n DO UPDATE SET\n value = EXCLUDED.value,\n revision = EXCLUDED.revision,\n heartbeat = now()\n WHERE \n (\n -- Normal case: revision matches (we're the current leader updating)\n noleader_leaders.revision = $4\n OR\n -- Override case: heartbeat is old (stale leader)\n noleader_leaders.heartbeat < now() - INTERVAL '60 seconds'\n )\n RETURNING value, revision\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "value",
|
||||
"type_info": "Text",
|
||||
"origin": {
|
||||
"Table": {
|
||||
"table": "noleader_leaders",
|
||||
"name": "value"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "revision",
|
||||
"type_info": "Int8",
|
||||
"origin": {
|
||||
"Table": {
|
||||
"table": "noleader_leaders",
|
||||
"name": "revision"
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text",
|
||||
"Text",
|
||||
"Int8",
|
||||
"Int8"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "976e720a4dee2911278524a199d6d5ece23f141e4b4c094efe5fc3123e376b04"
|
||||
}
|
||||
34
CHANGELOG.md
34
CHANGELOG.md
@@ -6,6 +6,40 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [0.2.0] - 2025-12-02
|
||||
|
||||
### Added
|
||||
- truly cancel if revision isn't as expected
|
||||
- reset after failure
|
||||
- print as well
|
||||
- add publish
|
||||
- add features for nats and postgres
|
||||
- add postgres
|
||||
- extract backend
|
||||
- do publish
|
||||
- allow readme
|
||||
|
||||
### Fixed
|
||||
- *(deps)* update rust crate uuid to v1.19.0
|
||||
- *(deps)* update all dependencies
|
||||
|
||||
### Other
|
||||
- *(deps)* update tokio-tracing monorepo
|
||||
- *(deps)* update rust crate tracing-subscriber to v0.3.20
|
||||
- *(release)* v0.1.3 (#3)
|
||||
chore(release): 0.1.3
|
||||
- Configure Renovate (#4)
|
||||
Add renovate.json
|
||||
|
||||
## [0.1.3] - 2025-09-24
|
||||
|
||||
### Added
|
||||
- add features for nats and postgres
|
||||
- add postgres
|
||||
- extract backend
|
||||
- do publish
|
||||
- allow readme
|
||||
|
||||
## [0.1.2] - 2025-07-04
|
||||
|
||||
### Added
|
||||
|
||||
974
Cargo.lock
generated
974
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -3,7 +3,7 @@ members = ["crates/*"]
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
version = "0.1.2"
|
||||
version = "0.2.0"
|
||||
license = "MIT"
|
||||
|
||||
[workspace.dependencies]
|
||||
|
||||
29
README.md
29
README.md
@@ -22,9 +22,9 @@ This library is still young and the API is subject to change.
|
||||
|
||||
## Intended use-case
|
||||
|
||||
Noleader is not built for distributed consensus, or fast re-election produces. It take upwards to a minute to get reelected, state is the users responsibility to handle.
|
||||
Noleader is not built for distributed consensus, or fast re-election procedures. It take upwards to a minute to get re-elected, state is the users responsibility to handle.
|
||||
|
||||
Noleader is pretty much just a distributed lock, intended for use-cases where the use wants to only have a single node scheduling work etc.
|
||||
Noleader is pretty much just a distributed lock, intended for use-cases where the user wants to only have a single node scheduling work etc.
|
||||
|
||||
Good alternatives are:
|
||||
|
||||
@@ -97,29 +97,14 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
```
|
||||
|
||||
## API Overview
|
||||
## Examples
|
||||
|
||||
* **`Leader::new(bucket: &str, key: &str, client: async_nats::Client) -> Leader`**
|
||||
Create a new election participant.
|
||||
* **`create_bucket(&self) -> anyhow::Result<()>`**
|
||||
Ensures the KV bucket exists (no-op if already created).
|
||||
* **`start(&self, token: CancellationToken) -> anyhow::Result<()>`**
|
||||
Begins the background leader-election loop; renews TTL on success or retries on failure.
|
||||
* **`do_while_leader<F, Fut>(&self, f: F) -> anyhow::Result<()>`**
|
||||
Runs your closure as long as you hold leadership; cancels immediately on loss.
|
||||
* **`leader_id(&self) -> Uuid`**
|
||||
Returns your unique candidate ID.
|
||||
* **`is_leader(&self) -> Status`**
|
||||
Returns `Status::Leader` or `Status::Candidate`, taking shutdown into account.
|
||||
See the examples folder in ./crates/noleader/examples
|
||||
|
||||
### Types
|
||||
## Architecture
|
||||
|
||||
Noleader uses a simple election stealing
|
||||
|
||||
```rust
|
||||
pub enum Status {
|
||||
Leader,
|
||||
Candidate,
|
||||
}
|
||||
```
|
||||
|
||||
## License
|
||||
|
||||
|
||||
@@ -1,18 +1,39 @@
|
||||
[package]
|
||||
name = "noleader"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
edition = "2024"
|
||||
readme = "../../README.md"
|
||||
version.workspace = true
|
||||
license.workspace = true
|
||||
repository = "https://git.front.kjuulh.io/kjuulh/noleader"
|
||||
authors = ["kjuulh <contact@kasperhermansen.com>"]
|
||||
description = "A small leader election package using NATS/Postgres keyvalue store as the distributed locking mechanism. Does not require a min / max set of nodes"
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
async-nats = "0.42"
|
||||
uuid = { version = "1", features = ["v4"] }
|
||||
bytes = "1"
|
||||
tokio.workspace = true
|
||||
|
||||
uuid = { version = "1", features = ["v4", "v7"] }
|
||||
bytes = "1"
|
||||
tokio-util = "0.7"
|
||||
rand = "0.9.1"
|
||||
rand = "0.9"
|
||||
async-trait = "0.1"
|
||||
|
||||
async-nats = { version = "0.45", optional = true }
|
||||
|
||||
# fork until dangerous set migrate table name is stable. Should be any version after 8.6
|
||||
sqlx = { git = "https://github.com/launchbadge/sqlx", features = [
|
||||
"uuid",
|
||||
"postgres",
|
||||
"runtime-tokio",
|
||||
"tls-rustls",
|
||||
], rev = "064d649abdfd1742e5fdcc20176a6b415b9c25d3", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
|
||||
|
||||
[features]
|
||||
default = ["nats", "postgres"]
|
||||
nats = ["dep:async-nats"]
|
||||
postgres = ["dep:sqlx"]
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
#[tokio::main]
|
||||
@@ -14,15 +13,12 @@ async fn main() -> anyhow::Result<()> {
|
||||
.init();
|
||||
|
||||
let mybucket = "mytestbucket";
|
||||
let mykey = "myleaderkey";
|
||||
let mykey = "basic";
|
||||
let client = async_nats::connect("localhost:4222").await?;
|
||||
|
||||
let leader = noleader::Leader::new(mybucket, mykey, client);
|
||||
let leader = noleader::Leader::new_nats(mykey, mybucket, client);
|
||||
let leader_id = leader.leader_id().await.to_string();
|
||||
|
||||
tracing::info!("creating bucket");
|
||||
leader.create_bucket().await?;
|
||||
|
||||
leader
|
||||
.acquire_and_run({
|
||||
move |token| {
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
#[tokio::main]
|
||||
@@ -23,22 +22,16 @@ async fn main() -> anyhow::Result<()> {
|
||||
let client = client.clone();
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
let leader = noleader::Leader::new(mybucket, mykey, client);
|
||||
let leader = noleader::Leader::new_nats(mykey, mybucket, client);
|
||||
let leader_id = leader.leader_id().await.to_string();
|
||||
|
||||
tracing::info!("creating bucket");
|
||||
leader.create_bucket().await?;
|
||||
|
||||
tokio::spawn({
|
||||
let leader = leader.clone();
|
||||
let leader_id = leader_id.clone();
|
||||
|
||||
async move {
|
||||
tracing::debug!(leader_id, "starting leader");
|
||||
leader
|
||||
.start(CancellationToken::default())
|
||||
.await
|
||||
.expect("to succeed");
|
||||
leader.start().await.expect("to succeed");
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
96
crates/noleader/examples/lots_of_postgres/main.rs
Normal file
96
crates/noleader/examples/lots_of_postgres/main.rs
Normal file
@@ -0,0 +1,96 @@
|
||||
use anyhow::Context;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
// Set up logger
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(
|
||||
EnvFilter::from_default_env()
|
||||
.add_directive("noleader=debug".parse().unwrap())
|
||||
.add_directive("lots_of_candidates=debug".parse().unwrap())
|
||||
.add_directive("info".parse().unwrap()),
|
||||
)
|
||||
.init();
|
||||
|
||||
let mykey = "myleaderkey";
|
||||
|
||||
let mut handles = Vec::new();
|
||||
|
||||
let db_url = &std::env::var("DATABASE_URL").context("DATABASE_URL is missing")?;
|
||||
let pool = sqlx::PgPool::connect_lazy(db_url)?;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
let mut cancelled_resp = Vec::new();
|
||||
|
||||
tokio::spawn({
|
||||
let cancel = cancel.clone();
|
||||
|
||||
async move {
|
||||
tokio::signal::ctrl_c().await.expect("to receive shutdown");
|
||||
|
||||
cancel.cancel();
|
||||
}
|
||||
});
|
||||
|
||||
for _ in 0..100 {
|
||||
let pool = pool.clone();
|
||||
let cancel = cancel.child_token();
|
||||
|
||||
let item_cancellation = CancellationToken::new();
|
||||
cancelled_resp.push(item_cancellation.child_token());
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
let mut leader = noleader::Leader::new_postgres_pool(mykey, pool);
|
||||
|
||||
leader.with_cancellation(cancel);
|
||||
let leader_id = leader.leader_id().await.to_string();
|
||||
|
||||
tokio::spawn({
|
||||
let leader = leader.clone();
|
||||
let leader_id = leader_id.clone();
|
||||
|
||||
async move {
|
||||
tracing::debug!(leader_id, "starting leader");
|
||||
let res = leader.start().await;
|
||||
|
||||
tracing::warn!("shutting down");
|
||||
|
||||
item_cancellation.cancel();
|
||||
|
||||
if let Err(e) = res {
|
||||
tracing::error!("lots failed: {e:?}");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
loop {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(10000)).await;
|
||||
match leader.is_leader().await {
|
||||
noleader::Status::Leader => {
|
||||
tracing::info!(leader_id, "is leader");
|
||||
}
|
||||
noleader::Status::Candidate => {
|
||||
//tracing::debug!("is candiate");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unreachable_code)]
|
||||
Ok::<(), anyhow::Error>(())
|
||||
});
|
||||
|
||||
handles.push(handle);
|
||||
}
|
||||
|
||||
for cancel in cancelled_resp {
|
||||
cancel.cancelled().await;
|
||||
}
|
||||
|
||||
for handle in handles {
|
||||
handle.abort();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
49
crates/noleader/examples/postgres/main.rs
Normal file
49
crates/noleader/examples/postgres/main.rs
Normal file
@@ -0,0 +1,49 @@
|
||||
use anyhow::Context;
|
||||
use tokio::signal;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
// Set up logger
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(
|
||||
EnvFilter::from_default_env()
|
||||
.add_directive("noleader=debug".parse().unwrap())
|
||||
.add_directive("lots_of_candidates=debug".parse().unwrap())
|
||||
.add_directive("info".parse().unwrap()),
|
||||
)
|
||||
.init();
|
||||
|
||||
let mykey = "postgres";
|
||||
|
||||
let mut leader = noleader::Leader::new_postgres(
|
||||
mykey,
|
||||
&std::env::var("DATABASE_URL").context("DATABASE_URL is missing")?,
|
||||
);
|
||||
leader.with_cancel_task(async move {
|
||||
signal::ctrl_c().await.unwrap();
|
||||
});
|
||||
|
||||
let leader_id = leader.leader_id().await.to_string();
|
||||
|
||||
leader
|
||||
.acquire_and_run({
|
||||
move |token| {
|
||||
let leader_id = leader_id.clone();
|
||||
|
||||
async move {
|
||||
loop {
|
||||
if token.is_cancelled() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
tracing::info!(leader_id, "do work as leader");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
-- Add migration script here
|
||||
|
||||
CREATE TABLE IF NOT EXISTS noleader_leaders (
|
||||
key TEXT PRIMARY KEY NOT NULL,
|
||||
value TEXT NOT NULL,
|
||||
revision BIGINT NOT NULL,
|
||||
heartbeat TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
116
crates/noleader/src/backend.rs
Normal file
116
crates/noleader/src/backend.rs
Normal file
@@ -0,0 +1,116 @@
|
||||
use std::{ops::Deref, sync::Arc};
|
||||
|
||||
|
||||
#[cfg(feature = "nats")]
|
||||
mod nats;
|
||||
#[cfg(feature = "postgres")]
|
||||
mod postgres;
|
||||
|
||||
pub struct Backend {
|
||||
inner: Arc<dyn BackendEdge + Send + Sync + 'static>,
|
||||
}
|
||||
|
||||
impl Backend {
|
||||
pub fn new(edge: impl BackendEdge + Send + Sync + 'static) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(edge),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "nats")]
|
||||
pub fn nats(client: async_nats::Client, bucket: &str) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(nats::NatsBackend::new(client, bucket)),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
pub fn postgres(database_url: &str) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(postgres::PostgresBackend::new(database_url)),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
pub fn postgres_with_pool(pool: sqlx::PgPool) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(postgres::PostgresBackend::new_with_pool("bogus", pool)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for Backend {
|
||||
type Target = Arc<dyn BackendEdge + Send + Sync + 'static>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait BackendEdge {
|
||||
async fn setup(&self) -> anyhow::Result<()>;
|
||||
async fn get(&self, key: &Key) -> anyhow::Result<LeaderValue>;
|
||||
async fn update(&self, key: &Key, val: &LeaderId) -> anyhow::Result<()>;
|
||||
async fn release(&self, key: &Key, val: &LeaderId) -> anyhow::Result<()>;
|
||||
}
|
||||
|
||||
pub enum LeaderValue {
|
||||
Unknown,
|
||||
Found { id: LeaderId },
|
||||
}
|
||||
|
||||
pub struct Key(String);
|
||||
|
||||
impl From<String> for Key {
|
||||
fn from(value: String) -> Self {
|
||||
Self(value)
|
||||
}
|
||||
}
|
||||
impl From<&str> for Key {
|
||||
fn from(value: &str) -> Self {
|
||||
Self(value.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Key> for String {
|
||||
fn from(value: Key) -> Self {
|
||||
value.0
|
||||
}
|
||||
}
|
||||
impl From<&Key> for String {
|
||||
fn from(value: &Key) -> Self {
|
||||
value.0.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct LeaderId(uuid::Uuid);
|
||||
impl LeaderId {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self(uuid::Uuid::now_v7())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LeaderId> for uuid::Uuid {
|
||||
fn from(value: LeaderId) -> Self {
|
||||
value.0
|
||||
}
|
||||
}
|
||||
impl From<&LeaderId> for uuid::Uuid {
|
||||
fn from(value: &LeaderId) -> Self {
|
||||
value.0
|
||||
}
|
||||
}
|
||||
|
||||
impl From<uuid::Uuid> for LeaderId {
|
||||
fn from(value: uuid::Uuid) -> Self {
|
||||
Self(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl LeaderId {
|
||||
pub const fn as_bytes(&self) -> &[u8] {
|
||||
self.0.as_bytes()
|
||||
}
|
||||
}
|
||||
131
crates/noleader/src/backend/nats.rs
Normal file
131
crates/noleader/src/backend/nats.rs
Normal file
@@ -0,0 +1,131 @@
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
use anyhow::Context;
|
||||
use async_nats::jetstream::{self, kv};
|
||||
|
||||
use crate::backend::{BackendEdge, Key, LeaderId, LeaderValue};
|
||||
|
||||
pub struct NatsBackend {
|
||||
bucket: String,
|
||||
client: jetstream::Context,
|
||||
|
||||
revision: AtomicU64,
|
||||
}
|
||||
|
||||
impl NatsBackend {
|
||||
pub fn new(client: async_nats::Client, bucket: &str) -> Self {
|
||||
Self {
|
||||
bucket: bucket.into(),
|
||||
client: jetstream::new(client),
|
||||
revision: AtomicU64::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create_bucket(&self) -> anyhow::Result<()> {
|
||||
if (self.client.get_key_value(&self.bucket).await).is_ok() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Err(e) = self
|
||||
.client
|
||||
.create_key_value(kv::Config {
|
||||
bucket: self.bucket.clone(),
|
||||
description: "leadership bucket for noleader".into(),
|
||||
limit_markers: Some(std::time::Duration::from_secs(60)),
|
||||
max_age: std::time::Duration::from_secs(60),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
{
|
||||
tracing::info!(
|
||||
"bucket creation failed, it might have just been a conflict, testing again: {e}"
|
||||
);
|
||||
|
||||
if (self.client.get_key_value(&self.bucket).await).is_ok() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
anyhow::bail!("failed to create bucket: {}", e)
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl BackendEdge for NatsBackend {
|
||||
async fn setup(&self) -> anyhow::Result<()> {
|
||||
self.create_bucket().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
async fn get(&self, key: &Key) -> anyhow::Result<LeaderValue> {
|
||||
let bucket = self.client.get_key_value(&self.bucket).await?;
|
||||
|
||||
let Some(val) = bucket.get(key).await? else {
|
||||
anyhow::bail!("key doesn't exists, we've lost leadership status")
|
||||
};
|
||||
|
||||
let Ok(id) = uuid::Uuid::from_slice(&val) else {
|
||||
return Ok(LeaderValue::Unknown);
|
||||
};
|
||||
|
||||
Ok(LeaderValue::Found { id: id.into() })
|
||||
}
|
||||
async fn update(&self, key: &Key, val: &LeaderId) -> anyhow::Result<()> {
|
||||
let bucket = self
|
||||
.client
|
||||
.get_key_value(&self.bucket)
|
||||
.await
|
||||
.context("get bucket")?;
|
||||
|
||||
match bucket
|
||||
.update(
|
||||
&key.0,
|
||||
bytes::Bytes::copy_from_slice(val.as_bytes()),
|
||||
self.revision.load(Ordering::Relaxed),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(rev) => {
|
||||
self.revision.store(rev, Ordering::Relaxed);
|
||||
}
|
||||
Err(e) => match e.kind() {
|
||||
kv::UpdateErrorKind::WrongLastRevision => {
|
||||
tracing::trace!("creating nats entry");
|
||||
match bucket
|
||||
.create_with_ttl(
|
||||
&key.0,
|
||||
bytes::Bytes::copy_from_slice(val.as_bytes()),
|
||||
std::time::Duration::from_secs(60),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(rev) => {
|
||||
self.revision.store(rev, Ordering::Relaxed);
|
||||
}
|
||||
Err(e) => match e.kind() {
|
||||
kv::CreateErrorKind::AlreadyExists => {
|
||||
anyhow::bail!("another candidate has leadership status")
|
||||
}
|
||||
_ => {
|
||||
anyhow::bail!("{}", e);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
anyhow::bail!("failed to create bucket: {e}")
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn release(&self, _key: &Key, _val: &LeaderId) -> anyhow::Result<()> {
|
||||
// TODO: implement release for nats
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
237
crates/noleader/src/backend/postgres.rs
Normal file
237
crates/noleader/src/backend/postgres.rs
Normal file
@@ -0,0 +1,237 @@
|
||||
use std::{
|
||||
sync::atomic::{AtomicU64, Ordering},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use sqlx::{PgPool, postgres::PgPoolOptions};
|
||||
use tokio::sync::OnceCell;
|
||||
|
||||
use crate::backend::{BackendEdge, Key, LeaderId, LeaderValue};
|
||||
|
||||
pub struct PostgresBackend {
|
||||
database_url: String,
|
||||
revision: AtomicU64,
|
||||
pool: OnceCell<PgPool>,
|
||||
migrated: OnceCell<()>,
|
||||
}
|
||||
|
||||
impl PostgresBackend {
|
||||
pub fn new(database_url: &str) -> Self {
|
||||
Self {
|
||||
database_url: database_url.into(),
|
||||
revision: AtomicU64::new(0),
|
||||
pool: OnceCell::new(),
|
||||
migrated: OnceCell::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_pool(database_url: &str, pool: PgPool) -> Self {
|
||||
Self {
|
||||
database_url: database_url.into(),
|
||||
revision: AtomicU64::new(0),
|
||||
pool: OnceCell::new_with(Some(pool)),
|
||||
migrated: OnceCell::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn db(&self) -> anyhow::Result<PgPool> {
|
||||
let pool = self
|
||||
.pool
|
||||
.get_or_try_init(|| async move {
|
||||
PgPoolOptions::new()
|
||||
.max_connections(1)
|
||||
.min_connections(0)
|
||||
.idle_timeout(Some(Duration::from_secs(5)))
|
||||
.connect_lazy(&self.database_url)
|
||||
.context("connect postgres noleader")
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(pool.clone())
|
||||
}
|
||||
|
||||
async fn migrate(&self) -> anyhow::Result<()> {
|
||||
self.migrated
|
||||
.get_or_try_init(|| async move {
|
||||
let db = self.db().await?;
|
||||
|
||||
let mut migrate = sqlx::migrate!("./migrations/postgres/");
|
||||
|
||||
migrate
|
||||
.set_locking(false)
|
||||
.dangerous_set_table_name("_sqlx_noleader_migrations")
|
||||
.run(&db)
|
||||
.await
|
||||
.context("migrate noleader")?;
|
||||
|
||||
Ok::<_, anyhow::Error>(())
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl BackendEdge for PostgresBackend {
|
||||
async fn setup(&self) -> anyhow::Result<()> {
|
||||
self.migrate().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get(&self, key: &Key) -> anyhow::Result<LeaderValue> {
|
||||
let rec: Option<GetResult> = sqlx::query_as(
|
||||
"
|
||||
SELECT value, revision
|
||||
FROM noleader_leaders
|
||||
WHERE
|
||||
key = $1
|
||||
AND revision = $2
|
||||
AND heartbeat >= now() - interval '60 seconds'
|
||||
LIMIT 1;
|
||||
",
|
||||
)
|
||||
.bind(&key.0)
|
||||
.bind(self.revision.load(Ordering::Relaxed) as i64)
|
||||
.fetch_optional(&self.db().await?)
|
||||
.await
|
||||
.context("get noleader key")?;
|
||||
|
||||
let Some(val) = rec else {
|
||||
self.revision.store(0, Ordering::Relaxed);
|
||||
|
||||
anyhow::bail!("key doesn't exist, we've lost leadership status")
|
||||
};
|
||||
|
||||
// Update our local revision to match what's in the database
|
||||
self.revision.store(val.revision as u64, Ordering::Relaxed);
|
||||
|
||||
let Ok(id) = uuid::Uuid::parse_str(&val.value) else {
|
||||
tracing::warn!("value is not a valid uuid: {}", val.value);
|
||||
self.revision.store(0, Ordering::Relaxed);
|
||||
return Ok(LeaderValue::Unknown);
|
||||
};
|
||||
|
||||
Ok(LeaderValue::Found { id: id.into() })
|
||||
}
|
||||
|
||||
async fn update(&self, key: &Key, val: &LeaderId) -> anyhow::Result<()> {
|
||||
let current_rev = self.revision.load(Ordering::Relaxed);
|
||||
let new_rev = current_rev + 1;
|
||||
|
||||
let res: Result<Option<UpdateResult>, sqlx::Error> = sqlx::query_as(
|
||||
r#"
|
||||
INSERT INTO noleader_leaders (key, value, revision, heartbeat)
|
||||
VALUES ($1, $2, $3, now())
|
||||
ON CONFLICT (key)
|
||||
DO UPDATE SET
|
||||
value = EXCLUDED.value,
|
||||
revision = EXCLUDED.revision,
|
||||
heartbeat = now()
|
||||
WHERE
|
||||
(
|
||||
-- Normal case: revision matches (we're the current leader updating)
|
||||
noleader_leaders.revision = $4
|
||||
OR
|
||||
-- Override case: heartbeat is old (stale leader)
|
||||
noleader_leaders.heartbeat < now() - INTERVAL '60 seconds'
|
||||
)
|
||||
RETURNING value, revision
|
||||
"#,
|
||||
)
|
||||
.bind(&key.0)
|
||||
.bind(val.0.to_string())
|
||||
.bind(new_rev as i64) // new revision
|
||||
.bind(current_rev as i64) // expected current revision
|
||||
.fetch_optional(&self.db().await?)
|
||||
.await;
|
||||
|
||||
let res = match res {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
self.revision.store(0, Ordering::Relaxed);
|
||||
|
||||
match &e {
|
||||
sqlx::Error::Database(database_error) => {
|
||||
if database_error.is_unique_violation() {
|
||||
anyhow::bail!("update conflict: another leader holds lock")
|
||||
} else {
|
||||
anyhow::bail!(e);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
anyhow::bail!(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
match res {
|
||||
Some(rec) => {
|
||||
if rec.value == val.0.to_string() && rec.revision == new_rev as i64 {
|
||||
tracing::debug!(
|
||||
val = val.0.to_string(),
|
||||
revision = rec.revision,
|
||||
"successfully updated leader"
|
||||
);
|
||||
|
||||
// Only update our local revision if the update succeeded with our expected value
|
||||
self.revision.store(rec.revision as u64, Ordering::Relaxed);
|
||||
} else {
|
||||
self.revision.store(0, Ordering::Relaxed);
|
||||
|
||||
anyhow::bail!(
|
||||
"update conflict: expected value={}, revision={}, got value={}, revision={}",
|
||||
val.0.to_string(),
|
||||
new_rev,
|
||||
rec.value,
|
||||
rec.revision
|
||||
);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
anyhow::bail!(
|
||||
"update rejected: another leader is holding the lock or revision mismatch"
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn release(&self, key: &Key, val: &LeaderId) -> anyhow::Result<()> {
|
||||
let rev = self.revision.load(Ordering::Relaxed);
|
||||
sqlx::query(
|
||||
"
|
||||
DELETE FROM noleader_leaders
|
||||
WHERE
|
||||
key = $1
|
||||
AND value = $2
|
||||
AND revision = $3
|
||||
",
|
||||
)
|
||||
.bind(&key.0)
|
||||
.bind(val.0.to_string())
|
||||
.bind(rev as i64) // new revision
|
||||
.execute(&self.db().await?)
|
||||
.await
|
||||
.context("failed to release lock, it will expire naturally")?;
|
||||
|
||||
self.revision.store(0, Ordering::Relaxed);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct GetResult {
|
||||
value: String,
|
||||
revision: i64,
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct UpdateResult {
|
||||
value: String,
|
||||
revision: i64,
|
||||
}
|
||||
0
crates/noleader/src/inner.create_bucket
Normal file
0
crates/noleader/src/inner.create_bucket
Normal file
@@ -1,51 +1,91 @@
|
||||
use std::{
|
||||
future::Future,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
atomic::{AtomicBool, Ordering},
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use async_nats::jetstream::kv;
|
||||
use rand::Rng;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::backend::{Backend, Key, LeaderId};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Leader {
|
||||
shutting_down: Arc<AtomicBool>,
|
||||
is_leader: Arc<AtomicBool>,
|
||||
inner: Arc<RwLock<InnerLeader>>,
|
||||
|
||||
cancellation: CancellationToken,
|
||||
}
|
||||
const DEFAULT_INTERVAL: Duration = std::time::Duration::from_secs(10);
|
||||
|
||||
mod backend;
|
||||
|
||||
impl Leader {
|
||||
pub fn new(bucket: &str, key: &str, client: async_nats::Client) -> Self {
|
||||
pub fn new(key: &str, backend: Backend) -> Self {
|
||||
Self {
|
||||
shutting_down: Arc::new(AtomicBool::new(false)),
|
||||
is_leader: Arc::new(AtomicBool::new(false)),
|
||||
inner: Arc::new(RwLock::new(InnerLeader::new(bucket, key, client))),
|
||||
inner: Arc::new(RwLock::new(InnerLeader::new(backend, key))),
|
||||
cancellation: CancellationToken::new(),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "nats")]
|
||||
pub fn new_nats(key: &str, bucket: &str, client: async_nats::Client) -> Self {
|
||||
Self::new(key, Backend::nats(client, bucket))
|
||||
}
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
pub fn new_postgres(key: &str, database_url: &str) -> Self {
|
||||
Self::new(key, Backend::postgres(database_url))
|
||||
}
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
pub fn new_postgres_pool(key: &str, pool: sqlx::PgPool) -> Self {
|
||||
Self::new(key, Backend::postgres_with_pool(pool))
|
||||
}
|
||||
|
||||
pub fn with_cancellation(&mut self, cancellation: CancellationToken) -> &mut Self {
|
||||
self.cancellation = cancellation;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_cancel_task<T>(&mut self, f: T) -> &mut Self
|
||||
where
|
||||
T: Future<Output = ()> + Send + 'static,
|
||||
{
|
||||
let cancel = self.cancellation.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
f.await;
|
||||
|
||||
cancel.cancel();
|
||||
});
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn acquire_and_run<F, Fut>(&self, f: F) -> anyhow::Result<()>
|
||||
where
|
||||
F: Fn(CancellationToken) -> Fut,
|
||||
Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
|
||||
{
|
||||
let parent_token = CancellationToken::default();
|
||||
let parent_token = self.cancellation.clone();
|
||||
let s = self.clone();
|
||||
|
||||
let server_token = parent_token.child_token();
|
||||
|
||||
// Start the server election process in another task, this is because start is blocking
|
||||
let handle = tokio::spawn({
|
||||
let server_token = server_token.child_token();
|
||||
async move {
|
||||
match s.start(server_token).await {
|
||||
match s.start().await {
|
||||
Ok(_) => {}
|
||||
Err(e) => tracing::error!("leader election process failed: {}", e),
|
||||
}
|
||||
@@ -65,6 +105,11 @@ impl Leader {
|
||||
server_token.cancel();
|
||||
// Close down the task as well, it should already be stopped, but this forces the task to close
|
||||
handle.abort();
|
||||
|
||||
{
|
||||
self.inner.write().await.cleanup().await?;
|
||||
}
|
||||
|
||||
res?;
|
||||
|
||||
Ok(())
|
||||
@@ -88,12 +133,24 @@ impl Leader {
|
||||
F: Fn(CancellationToken) -> Fut,
|
||||
Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
|
||||
{
|
||||
let cancellation_token = cancellation_token.child_token();
|
||||
|
||||
loop {
|
||||
if cancellation_token.is_cancelled() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let cancellation_token = cancellation_token.child_token();
|
||||
|
||||
let is_leader = self.is_leader.clone();
|
||||
if !is_leader.load(Ordering::Relaxed) {
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(Duration::from_secs(1)) => {}
|
||||
_ = cancellation_token.cancelled() => {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -101,7 +158,13 @@ impl Leader {
|
||||
|
||||
let guard = tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(std::time::Duration::from_millis(500)) => {}
|
||||
_ = cancellation_token.cancelled() => {
|
||||
return;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if !is_leader.load(Ordering::Relaxed) {
|
||||
cancellation_token.cancel();
|
||||
@@ -109,7 +172,9 @@ impl Leader {
|
||||
}
|
||||
});
|
||||
|
||||
tracing::info!("starting leader actions");
|
||||
let res = f(child_token).await;
|
||||
|
||||
guard.abort();
|
||||
res?;
|
||||
}
|
||||
@@ -117,35 +182,30 @@ impl Leader {
|
||||
|
||||
pub async fn leader_id(&self) -> Uuid {
|
||||
let inner = self.inner.read().await;
|
||||
inner.id
|
||||
inner.leader_id.clone().into()
|
||||
}
|
||||
|
||||
pub async fn create_bucket(&self) -> anyhow::Result<()> {
|
||||
let mut inner = self.inner.write().await;
|
||||
tracing::info!("creating bucket leadership bucket");
|
||||
|
||||
inner.create_bucket().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn start(&self, cancellation_token: CancellationToken) -> anyhow::Result<()> {
|
||||
pub async fn start(&self) -> anyhow::Result<()> {
|
||||
let mut attempts = 1;
|
||||
|
||||
{
|
||||
self.inner.write().await.backend.setup().await?;
|
||||
}
|
||||
|
||||
// Initial attempt
|
||||
let _ = self.try_become_leader().await;
|
||||
|
||||
loop {
|
||||
let wait_factor = {
|
||||
let mut rng = rand::rng();
|
||||
rng.random_range(0.001..1.000)
|
||||
rng.random_range(0.50..1.00)
|
||||
};
|
||||
|
||||
let sleep_fut = tokio::time::sleep((DEFAULT_INTERVAL * attempts).mul_f64(wait_factor));
|
||||
|
||||
tokio::select! {
|
||||
_ = sleep_fut => {},
|
||||
_ = cancellation_token.cancelled() => {
|
||||
_ = self.cancellation.cancelled() => {
|
||||
self.shutting_down.store(true, std::sync::atomic::Ordering::Relaxed); // Ordering can be relaxed, because our operation is an atomic update
|
||||
return Ok(())
|
||||
}
|
||||
@@ -153,8 +213,7 @@ impl Leader {
|
||||
|
||||
match self.try_become_leader().await {
|
||||
Ok(_) => {
|
||||
self.is_leader
|
||||
.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
self.is_leader.store(true, Ordering::Relaxed);
|
||||
attempts = 1;
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -203,13 +262,10 @@ pub enum Status {
|
||||
struct InnerLeader {
|
||||
state: LeaderState,
|
||||
|
||||
bucket: String,
|
||||
key: String,
|
||||
backend: Backend,
|
||||
|
||||
id: uuid::Uuid,
|
||||
revision: u64,
|
||||
|
||||
client: async_nats::jetstream::Context,
|
||||
key: Key,
|
||||
leader_id: LeaderId,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
@@ -221,49 +277,17 @@ enum LeaderState {
|
||||
}
|
||||
|
||||
impl InnerLeader {
|
||||
pub fn new(bucket: &str, key: &str, client: async_nats::Client) -> Self {
|
||||
pub fn new(backend: Backend, key: impl Into<Key>) -> Self {
|
||||
Self {
|
||||
bucket: bucket.into(),
|
||||
backend,
|
||||
leader_id: LeaderId::new(),
|
||||
|
||||
key: key.into(),
|
||||
|
||||
id: uuid::Uuid::new_v4(),
|
||||
revision: u64::MIN,
|
||||
|
||||
state: LeaderState::Unknown,
|
||||
client: async_nats::jetstream::new(client),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create_bucket(&mut self) -> anyhow::Result<()> {
|
||||
if (self.client.get_key_value(&self.bucket).await).is_ok() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Err(e) = self
|
||||
.client
|
||||
.create_key_value(kv::Config {
|
||||
bucket: self.bucket.clone(),
|
||||
description: "leadership bucket for noleader".into(),
|
||||
limit_markers: Some(std::time::Duration::from_secs(60)),
|
||||
max_age: std::time::Duration::from_secs(60),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
{
|
||||
tracing::info!(
|
||||
"bucket creation failed, it might have just been a conflict, testing again: {e}"
|
||||
);
|
||||
|
||||
if (self.client.get_key_value(&self.bucket).await).is_ok() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
anyhow::bail!("failed to create bucket: {}", e)
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// start, will run a blocking operation for becoming the next leader.
|
||||
pub async fn start(&mut self) -> anyhow::Result<()> {
|
||||
// Attempt to grab leadership,
|
||||
@@ -301,61 +325,42 @@ impl InnerLeader {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn update_leadership(&mut self) -> anyhow::Result<()> {
|
||||
let bucket = self.client.get_key_value(&self.bucket).await?;
|
||||
pub async fn cleanup(&self) -> anyhow::Result<()> {
|
||||
self.backend
|
||||
.release(&self.key, &self.leader_id)
|
||||
.await
|
||||
.context("cleanup")?;
|
||||
|
||||
let Some(val) = bucket.get(&self.key).await? else {
|
||||
anyhow::bail!("key doesn't exists, we've lost leadership status")
|
||||
};
|
||||
|
||||
let Ok(id) = uuid::Uuid::from_slice(&val) else {
|
||||
anyhow::bail!("value has changed, it is no longer a uuid, dropping leadership status");
|
||||
};
|
||||
|
||||
if id != self.id {
|
||||
anyhow::bail!("leadership has changed")
|
||||
Ok(())
|
||||
}
|
||||
|
||||
let rev = bucket
|
||||
.update(
|
||||
&self.key,
|
||||
bytes::Bytes::copy_from_slice(self.id.as_bytes()),
|
||||
self.revision,
|
||||
)
|
||||
.await?;
|
||||
async fn update_leadership(&mut self) -> anyhow::Result<()> {
|
||||
let val = self
|
||||
.backend
|
||||
.get(&self.key)
|
||||
.await
|
||||
.context("could not find key, we've lost leadership status")?;
|
||||
|
||||
self.revision = rev;
|
||||
match val {
|
||||
backend::LeaderValue::Unknown => anyhow::bail!("leadership is unknown"),
|
||||
backend::LeaderValue::Found { id } if id != self.leader_id => {
|
||||
anyhow::bail!("leadership has changed")
|
||||
}
|
||||
backend::LeaderValue::Found { .. } => self
|
||||
.backend
|
||||
.update(&self.key, &self.leader_id)
|
||||
.await
|
||||
.context("update leadership lock")?,
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn try_for_leadership(&mut self) -> anyhow::Result<()> {
|
||||
let bucket = self
|
||||
.client
|
||||
.get_key_value(&self.bucket)
|
||||
self.backend
|
||||
.update(&self.key, &self.leader_id)
|
||||
.await
|
||||
.context("failed to get bucket")?;
|
||||
|
||||
let rev = match bucket
|
||||
.create_with_ttl(
|
||||
&self.key,
|
||||
bytes::Bytes::copy_from_slice(self.id.as_bytes()),
|
||||
std::time::Duration::from_secs(60),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(rev) => rev,
|
||||
Err(e) => match e.kind() {
|
||||
kv::CreateErrorKind::AlreadyExists => {
|
||||
anyhow::bail!("another candidate has leadership status")
|
||||
}
|
||||
_ => {
|
||||
anyhow::bail!("{}", e);
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
self.revision = rev;
|
||||
.context("try for leadership")?;
|
||||
|
||||
tokio::time::sleep(DEFAULT_INTERVAL).await;
|
||||
|
||||
@@ -363,7 +368,7 @@ impl InnerLeader {
|
||||
|
||||
let leadership_state = self.leadership_status().await?;
|
||||
|
||||
if !leadership_state.is_leader(&self.id) {
|
||||
if !leadership_state.is_leader(&self.leader_id) {
|
||||
anyhow::bail!("failed to become leader, there is likely some churn going on");
|
||||
}
|
||||
|
||||
@@ -374,25 +379,16 @@ impl InnerLeader {
|
||||
}
|
||||
|
||||
async fn leadership_status(&mut self) -> anyhow::Result<LeadershipState> {
|
||||
let bucket = self.client.get_key_value(&self.bucket).await?;
|
||||
|
||||
let val = bucket.get(&self.key).await?;
|
||||
let val = self
|
||||
.backend
|
||||
.get(&self.key)
|
||||
.await
|
||||
.inspect_err(|e| tracing::warn!("failed to query for leadership: {}", e))
|
||||
.ok();
|
||||
|
||||
Ok(match val {
|
||||
Some(content) => {
|
||||
let id = match uuid::Uuid::from_slice(&content) {
|
||||
Ok(u) => u,
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
"leadership state is not a valid UUID, ignoring the value: {}",
|
||||
e
|
||||
);
|
||||
return Ok(LeadershipState::NotFound);
|
||||
}
|
||||
};
|
||||
|
||||
LeadershipState::Allocated { id }
|
||||
}
|
||||
Some(backend::LeaderValue::Found { id }) => LeadershipState::Allocated { id },
|
||||
Some(backend::LeaderValue::Unknown) => LeadershipState::NotFound,
|
||||
None => LeadershipState::NotFound,
|
||||
})
|
||||
}
|
||||
@@ -400,11 +396,11 @@ impl InnerLeader {
|
||||
|
||||
enum LeadershipState {
|
||||
NotFound,
|
||||
Allocated { id: uuid::Uuid },
|
||||
Allocated { id: LeaderId },
|
||||
}
|
||||
|
||||
impl LeadershipState {
|
||||
pub fn is_leader(&self, leader_id: &Uuid) -> bool {
|
||||
pub fn is_leader(&self, leader_id: &LeaderId) -> bool {
|
||||
match self {
|
||||
LeadershipState::Allocated { id } => id == leader_id,
|
||||
_ => false,
|
||||
|
||||
@@ -5,6 +5,8 @@ base: "git@git.front.kjuulh.io:kjuulh/cuddle-rust-lib-plan.git"
|
||||
vars:
|
||||
service: "noleader"
|
||||
registry: kasperhermansen
|
||||
rust:
|
||||
publish: {}
|
||||
|
||||
please:
|
||||
project:
|
||||
|
||||
20
mise.toml
Normal file
20
mise.toml
Normal file
@@ -0,0 +1,20 @@
|
||||
[env]
|
||||
_.file = ".env"
|
||||
|
||||
|
||||
[tasks.test]
|
||||
alias = ["t"]
|
||||
run = "cargo nextest run"
|
||||
|
||||
[tasks.example]
|
||||
alias = ["e"]
|
||||
run = "cargo run --example"
|
||||
|
||||
[tasks."local:up"]
|
||||
run = "docker compose -f ./templates/docker/docker-compose.yml up -d"
|
||||
|
||||
[tasks."local:down"]
|
||||
run = "docker compose -f ./templates/docker/docker-compose.yml down -v"
|
||||
|
||||
[tasks."db:prepare"]
|
||||
run = "cargo sqlx prepare --workspace"
|
||||
3
renovate.json
Normal file
3
renovate.json
Normal file
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
|
||||
}
|
||||
@@ -7,3 +7,18 @@ services:
|
||||
- "4222:4222" # Client connections
|
||||
- "8222:8222" # HTTP monitoring
|
||||
- "6222:6222" # Clustering
|
||||
|
||||
postgres:
|
||||
image: postgres:17-alpine
|
||||
environment:
|
||||
POSTGRES_USER: devuser
|
||||
POSTGRES_PASSWORD: devpassword
|
||||
POSTGRES_DB: dev
|
||||
shm_size: 128mb
|
||||
ports:
|
||||
- "5432:5432"
|
||||
healthcheck:
|
||||
test: ["CMD-SHELL", "pg_isready -U devuser -d dev"]
|
||||
interval: 5s
|
||||
timeout: 5s
|
||||
retries: 5
|
||||
|
||||
Reference in New Issue
Block a user