7 Commits

Author SHA1 Message Date
cuddle-please
94ecef8aeb chore(release): 0.1.3
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2025-09-24 11:32:01 +00:00
8b27219af5 feat: extract backend
All checks were successful
continuous-integration/drone/push Build is passing
2025-09-24 13:29:46 +02:00
c334dba445 feat: do publish
All checks were successful
continuous-integration/drone/push Build is passing
2025-07-06 20:56:42 +02:00
aaf3a72d3b feat: allow readme
All checks were successful
continuous-integration/drone/push Build is passing
2025-07-04 13:28:28 +02:00
643d87895b chore(release): v0.1.2 (#2)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
chore(release): 0.1.2

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: https://git.front.kjuulh.io/kjuulh/noleader/pulls/2
2025-07-04 13:25:21 +02:00
21c1507ebe docs: update master
All checks were successful
continuous-integration/drone/push Build is passing
2025-07-04 13:24:24 +02:00
4bdb39c39d feat: update basic example with a more simple acquire and run function
All checks were successful
continuous-integration/drone/push Build is passing
2025-07-04 13:22:04 +02:00
12 changed files with 423 additions and 202 deletions

View File

@@ -6,6 +6,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
## [0.1.3] - 2025-09-24
### Added
- extract backend
- do publish
- allow readme
## [0.1.2] - 2025-07-04
### Added
- update basic example with a more simple acquire and run function
### Docs
- update master
## [0.1.1] - 2025-07-04 ## [0.1.1] - 2025-07-04
### Added ### Added

14
Cargo.lock generated
View File

@@ -68,6 +68,17 @@ dependencies = [
"url", "url",
] ]
[[package]]
name = "async-trait"
version = "0.1.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.5.0" version = "1.5.0"
@@ -645,10 +656,11 @@ dependencies = [
[[package]] [[package]]
name = "noleader" name = "noleader"
version = "0.1.0" version = "0.1.2"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-nats", "async-nats",
"async-trait",
"bytes", "bytes",
"rand 0.9.1", "rand 0.9.1",
"tokio", "tokio",

View File

@@ -3,7 +3,7 @@ members = ["crates/*"]
resolver = "2" resolver = "2"
[workspace.package] [workspace.package]
version = "0.1.1" version = "0.1.3"
license = "MIT" license = "MIT"
[workspace.dependencies] [workspace.dependencies]

View File

@@ -22,9 +22,9 @@ This library is still young and the API is subject to change.
## Intended use-case ## Intended use-case
Noleader is not built for distributed consensus, or fast re-election produces. It take upwards to a minute to get reelected, state is the users responsibility to handle. Noleader is not built for distributed consensus, or fast re-election procedures. It take upwards to a minute to get re-elected, state is the users responsibility to handle.
Noleader is pretty much just a distributed lock, intended for use-cases where the use wants to only have a single node scheduling work etc. Noleader is pretty much just a distributed lock, intended for use-cases where the user wants to only have a single node scheduling work etc.
Good alternatives are: Good alternatives are:
@@ -71,58 +71,40 @@ async fn main() -> anyhow::Result<()> {
// Ensure the KV bucket exists // Ensure the KV bucket exists
leader.create_bucket().await?; leader.create_bucket().await?;
// Spawn the election loop // Attempts to acquire election loop, will call inner function if it wins, if it loses it will retry over again.
tokio::spawn({ // Will block until either the inner function returns and error, or the election processes crashes, intended to allow the application to properly restart
let leader = leader.clone();
async move {
leader
.start(CancellationToken::default())
.await
.expect("leadership loop failed");
}
});
// Do work while we are the leader
leader leader
.do_while_leader(|cancel_token| async move { .acquire_and_run({
loop { move |token| {
if cancel_token.is_cancelled() { let leader_id = leader_id.clone();
break;
async move {
loop {
if token.is_cancelled() {
return Ok(());
}
tracing::info!(leader_id, "do work as leader");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
} }
tracing::info!("🔑 I am the leader—doing work");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
} }
Ok(())
}) })
.await?; .await?;
Ok(()) Ok(())
} }
``` ```
## API Overview ## Examples
* **`Leader::new(bucket: &str, key: &str, client: async_nats::Client) -> Leader`** See the examples folder in ./crates/noleader/examples
Create a new election participant.
* **`create_bucket(&self) -> anyhow::Result<()>`**
Ensures the KV bucket exists (no-op if already created).
* **`start(&self, token: CancellationToken) -> anyhow::Result<()>`**
Begins the background leader-election loop; renews TTL on success or retries on failure.
* **`do_while_leader<F, Fut>(&self, f: F) -> anyhow::Result<()>`**
Runs your closure as long as you hold leadership; cancels immediately on loss.
* **`leader_id(&self) -> Uuid`**
Returns your unique candidate ID.
* **`is_leader(&self) -> Status`**
Returns `Status::Leader` or `Status::Candidate`, taking shutdown into account.
### Types ## Architecture
Noleader uses a simple election stealing
```rust
pub enum Status {
Leader,
Candidate,
}
```
## License ## License

View File

@@ -1,18 +1,24 @@
[package] [package]
name = "noleader" name = "noleader"
version = "0.1.0" edition = "2024"
edition = "2021" readme = "../../README.md"
version.workspace = true
license.workspace = true
repository = "https://git.front.kjuulh.io/kjuulh/noleader"
authors = ["kjuulh <contact@kasperhermansen.com>"]
description = "A small leader election package using NATS keyvalue store as the distributed locking mechanism. Does not require a min / max set of nodes"
[dependencies] [dependencies]
anyhow.workspace = true anyhow.workspace = true
tracing.workspace = true tracing.workspace = true
async-nats = "0.42" async-nats = "0.42"
uuid = { version = "1", features = ["v4"] } uuid = { version = "1", features = ["v4", "v7"] }
bytes = "1" bytes = "1"
tokio.workspace = true tokio.workspace = true
tokio-util = "0.7" tokio-util = "0.7"
rand = "0.9.1" rand = "0.9.1"
async-trait = "0.1.89"
[dev-dependencies] [dev-dependencies]
tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@@ -1,4 +1,3 @@
use tokio_util::sync::CancellationToken;
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
#[tokio::main] #[tokio::main]
@@ -14,37 +13,27 @@ async fn main() -> anyhow::Result<()> {
.init(); .init();
let mybucket = "mytestbucket"; let mybucket = "mytestbucket";
let mykey = "myleaderkey"; let mykey = "basic";
let client = async_nats::connect("localhost:4222").await?; let client = async_nats::connect("localhost:4222").await?;
let leader = noleader::Leader::new(mybucket, mykey, client); let leader = noleader::Leader::new_nats(mykey, mybucket, client);
let leader_id = leader.leader_id().await.to_string(); let leader_id = leader.leader_id().await.to_string();
tracing::info!("creating bucket");
leader.create_bucket().await?;
tokio::spawn({
let leader = leader.clone();
let leader_id = leader_id.clone();
async move {
tracing::debug!(leader_id, "starting leader");
leader
.start(CancellationToken::default())
.await
.expect("to succeed");
}
});
leader leader
.do_while_leader(move |token| async move { .acquire_and_run({
loop { move |token| {
if token.is_cancelled() { let leader_id = leader_id.clone();
return Ok(());
}
tracing::info!("do work as leader"); async move {
tokio::time::sleep(std::time::Duration::from_secs(1)).await; loop {
if token.is_cancelled() {
return Ok(());
}
tracing::info!(leader_id, "do work as leader");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
} }
}) })
.await?; .await?;

View File

@@ -23,12 +23,9 @@ async fn main() -> anyhow::Result<()> {
let client = client.clone(); let client = client.clone();
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
let leader = noleader::Leader::new(mybucket, mykey, client); let leader = noleader::Leader::new_nats(mykey, mybucket, client);
let leader_id = leader.leader_id().await.to_string(); let leader_id = leader.leader_id().await.to_string();
tracing::info!("creating bucket");
leader.create_bucket().await?;
tokio::spawn({ tokio::spawn({
let leader = leader.clone(); let leader = leader.clone();
let leader_id = leader_id.clone(); let leader_id = leader_id.clone();

View File

@@ -0,0 +1,98 @@
use std::{ops::Deref, sync::Arc};
use crate::backend::nats::NatsBackend;
mod nats;
pub struct Backend {
inner: Arc<dyn BackendEdge + Send + Sync + 'static>,
}
impl Backend {
pub fn new(edge: impl BackendEdge + Send + Sync + 'static) -> Self {
Self {
inner: Arc::new(edge),
}
}
pub fn nats(client: async_nats::Client, bucket: &str) -> Self {
Self {
inner: Arc::new(NatsBackend::new(client, bucket)),
}
}
}
impl Deref for Backend {
type Target = Arc<dyn BackendEdge + Send + Sync + 'static>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
#[async_trait::async_trait]
pub trait BackendEdge {
async fn setup(&self) -> anyhow::Result<()>;
async fn get(&self, key: &Key) -> anyhow::Result<LeaderValue>;
async fn update(&self, key: &Key, val: &LeaderId) -> anyhow::Result<()>;
}
pub enum LeaderValue {
Unknown,
Found { id: LeaderId },
}
pub struct Key(String);
impl From<String> for Key {
fn from(value: String) -> Self {
Self(value)
}
}
impl From<&str> for Key {
fn from(value: &str) -> Self {
Self(value.to_string())
}
}
impl From<Key> for String {
fn from(value: Key) -> Self {
value.0
}
}
impl From<&Key> for String {
fn from(value: &Key) -> Self {
value.0.clone()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct LeaderId(uuid::Uuid);
impl LeaderId {
pub(crate) fn new() -> Self {
Self(uuid::Uuid::now_v7())
}
}
impl From<LeaderId> for uuid::Uuid {
fn from(value: LeaderId) -> Self {
value.0
}
}
impl From<&LeaderId> for uuid::Uuid {
fn from(value: &LeaderId) -> Self {
value.0
}
}
impl From<uuid::Uuid> for LeaderId {
fn from(value: uuid::Uuid) -> Self {
Self(value)
}
}
impl LeaderId {
pub const fn as_bytes(&self) -> &[u8] {
self.0.as_bytes()
}
}

View File

@@ -0,0 +1,125 @@
use std::sync::atomic::{AtomicU64, Ordering};
use anyhow::Context;
use async_nats::jetstream::{self, kv};
use crate::backend::{BackendEdge, Key, LeaderId, LeaderValue};
pub struct NatsBackend {
bucket: String,
client: jetstream::Context,
revision: AtomicU64,
}
impl NatsBackend {
pub fn new(client: async_nats::Client, bucket: &str) -> Self {
Self {
bucket: bucket.into(),
client: jetstream::new(client),
revision: AtomicU64::new(0),
}
}
pub async fn create_bucket(&self) -> anyhow::Result<()> {
if (self.client.get_key_value(&self.bucket).await).is_ok() {
return Ok(());
}
if let Err(e) = self
.client
.create_key_value(kv::Config {
bucket: self.bucket.clone(),
description: "leadership bucket for noleader".into(),
limit_markers: Some(std::time::Duration::from_secs(60)),
max_age: std::time::Duration::from_secs(60),
..Default::default()
})
.await
{
tracing::info!(
"bucket creation failed, it might have just been a conflict, testing again: {e}"
);
if (self.client.get_key_value(&self.bucket).await).is_ok() {
return Ok(());
}
anyhow::bail!("failed to create bucket: {}", e)
}
Ok(())
}
}
#[async_trait::async_trait]
impl BackendEdge for NatsBackend {
async fn setup(&self) -> anyhow::Result<()> {
self.create_bucket().await?;
Ok(())
}
async fn get(&self, key: &Key) -> anyhow::Result<LeaderValue> {
let bucket = self.client.get_key_value(&self.bucket).await?;
let Some(val) = bucket.get(key).await? else {
anyhow::bail!("key doesn't exists, we've lost leadership status")
};
let Ok(id) = uuid::Uuid::from_slice(&val) else {
return Ok(LeaderValue::Unknown);
};
Ok(LeaderValue::Found { id: id.into() })
}
async fn update(&self, key: &Key, val: &LeaderId) -> anyhow::Result<()> {
let bucket = self
.client
.get_key_value(&self.bucket)
.await
.context("get bucket")?;
match bucket
.update(
&key.0,
bytes::Bytes::copy_from_slice(val.as_bytes()),
self.revision.load(Ordering::Relaxed),
)
.await
{
Ok(rev) => {
self.revision.store(rev, Ordering::Relaxed);
}
Err(e) => match e.kind() {
kv::UpdateErrorKind::WrongLastRevision => {
tracing::trace!("creating nats entry");
match bucket
.create_with_ttl(
&key.0,
bytes::Bytes::copy_from_slice(val.as_bytes()),
std::time::Duration::from_secs(60),
)
.await
{
Ok(rev) => {
self.revision.store(rev, Ordering::Relaxed);
}
Err(e) => match e.kind() {
kv::CreateErrorKind::AlreadyExists => {
anyhow::bail!("another candidate has leadership status")
}
_ => {
anyhow::bail!("{}", e);
}
},
}
}
_ => {
anyhow::bail!("failed to create bucket: {e}")
}
},
}
Ok(())
}
}

View File

View File

@@ -1,19 +1,20 @@
use std::{ use std::{
future::Future, future::Future,
sync::{ sync::{
atomic::{AtomicBool, Ordering},
Arc, Arc,
atomic::{AtomicBool, Ordering},
}, },
time::Duration, time::Duration,
}; };
use anyhow::Context; use anyhow::Context;
use async_nats::jetstream::kv;
use rand::Rng; use rand::Rng;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use uuid::Uuid; use uuid::Uuid;
use crate::backend::{Backend, Key, LeaderId};
#[derive(Clone)] #[derive(Clone)]
pub struct Leader { pub struct Leader {
shutting_down: Arc<AtomicBool>, shutting_down: Arc<AtomicBool>,
@@ -22,33 +23,98 @@ pub struct Leader {
} }
const DEFAULT_INTERVAL: Duration = std::time::Duration::from_secs(10); const DEFAULT_INTERVAL: Duration = std::time::Duration::from_secs(10);
mod backend;
impl Leader { impl Leader {
pub fn new(bucket: &str, key: &str, client: async_nats::Client) -> Self { pub fn new(key: &str, backend: Backend) -> Self {
Self { Self {
shutting_down: Arc::new(AtomicBool::new(false)), shutting_down: Arc::new(AtomicBool::new(false)),
is_leader: Arc::new(AtomicBool::new(false)), is_leader: Arc::new(AtomicBool::new(false)),
inner: Arc::new(RwLock::new(InnerLeader::new(bucket, key, client))), inner: Arc::new(RwLock::new(InnerLeader::new(backend, key))),
} }
} }
pub fn new_nats(key: &str, bucket: &str, client: async_nats::Client) -> Self {
Self::new(key, Backend::nats(client, bucket))
}
pub async fn acquire_and_run<F, Fut>(&self, f: F) -> anyhow::Result<()>
where
F: Fn(CancellationToken) -> Fut,
Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
{
let parent_token = CancellationToken::default();
let s = self.clone();
let server_token = parent_token.child_token();
// Start the server election process in another task, this is because start is blocking
let handle = tokio::spawn({
let server_token = server_token.child_token();
async move {
match s.start(server_token).await {
Ok(_) => {}
Err(e) => tracing::error!("leader election process failed: {}", e),
}
tracing::info!("shutting down noleader");
parent_token.cancel();
}
});
// Do the work if we're leader
let res = self
.do_while_leader_inner(server_token.child_token(), f)
.await;
// Stop the server election process if our provided functions returns an error;
server_token.cancel();
// Close down the task as well, it should already be stopped, but this forces the task to close
handle.abort();
res?;
Ok(())
}
pub async fn do_while_leader<F, Fut>(&self, f: F) -> anyhow::Result<()> pub async fn do_while_leader<F, Fut>(&self, f: F) -> anyhow::Result<()>
where
F: Fn(CancellationToken) -> Fut,
Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
{
self.do_while_leader_inner(CancellationToken::new(), f)
.await
}
async fn do_while_leader_inner<F, Fut>(
&self,
cancellation_token: CancellationToken,
f: F,
) -> anyhow::Result<()>
where where
F: Fn(CancellationToken) -> Fut, F: Fn(CancellationToken) -> Fut,
Fut: Future<Output = anyhow::Result<()>> + Send + 'static, Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
{ {
loop { loop {
let cancellation_token = cancellation_token.child_token();
let is_leader = self.is_leader.clone(); let is_leader = self.is_leader.clone();
if !is_leader.load(Ordering::Relaxed) { if !is_leader.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_secs(1)).await;
continue; continue;
} }
let cancellation_token = CancellationToken::new();
let child_token = cancellation_token.child_token(); let child_token = cancellation_token.child_token();
let guard = tokio::spawn(async move { let guard = tokio::spawn(async move {
loop { loop {
tokio::time::sleep(std::time::Duration::from_millis(100)).await; tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_millis(500)) => {}
_ = cancellation_token.cancelled() => {
break;
}
}
if !is_leader.load(Ordering::Relaxed) { if !is_leader.load(Ordering::Relaxed) {
cancellation_token.cancel(); cancellation_token.cancel();
@@ -64,28 +130,23 @@ impl Leader {
pub async fn leader_id(&self) -> Uuid { pub async fn leader_id(&self) -> Uuid {
let inner = self.inner.read().await; let inner = self.inner.read().await;
inner.id inner.leader_id.clone().into()
}
pub async fn create_bucket(&self) -> anyhow::Result<()> {
let mut inner = self.inner.write().await;
tracing::info!("creating bucket leadership bucket");
inner.create_bucket().await?;
Ok(())
} }
pub async fn start(&self, cancellation_token: CancellationToken) -> anyhow::Result<()> { pub async fn start(&self, cancellation_token: CancellationToken) -> anyhow::Result<()> {
let mut attempts = 1; let mut attempts = 1;
{
self.inner.write().await.backend.setup().await?;
}
// Initial attempt // Initial attempt
let _ = self.try_become_leader().await; let _ = self.try_become_leader().await;
loop { loop {
let wait_factor = { let wait_factor = {
let mut rng = rand::rng(); let mut rng = rand::rng();
rng.random_range(0.001..1.000) rng.random_range(0.50..1.00)
}; };
let sleep_fut = tokio::time::sleep((DEFAULT_INTERVAL * attempts).mul_f64(wait_factor)); let sleep_fut = tokio::time::sleep((DEFAULT_INTERVAL * attempts).mul_f64(wait_factor));
@@ -100,8 +161,7 @@ impl Leader {
match self.try_become_leader().await { match self.try_become_leader().await {
Ok(_) => { Ok(_) => {
self.is_leader self.is_leader.store(true, Ordering::Relaxed);
.store(true, std::sync::atomic::Ordering::Relaxed);
attempts = 1; attempts = 1;
} }
Err(e) => { Err(e) => {
@@ -150,13 +210,11 @@ pub enum Status {
struct InnerLeader { struct InnerLeader {
state: LeaderState, state: LeaderState,
bucket: String, backend: Backend,
key: String,
id: uuid::Uuid, key: Key,
leader_id: LeaderId,
revision: u64, revision: u64,
client: async_nats::jetstream::Context,
} }
#[derive(Default, Clone)] #[derive(Default, Clone)]
@@ -168,49 +226,18 @@ enum LeaderState {
} }
impl InnerLeader { impl InnerLeader {
pub fn new(bucket: &str, key: &str, client: async_nats::Client) -> Self { pub fn new(backend: Backend, key: impl Into<Key>) -> Self {
Self { Self {
bucket: bucket.into(), backend,
key: key.into(), leader_id: LeaderId::new(),
id: uuid::Uuid::new_v4(),
revision: u64::MIN, revision: u64::MIN,
key: key.into(),
state: LeaderState::Unknown, state: LeaderState::Unknown,
client: async_nats::jetstream::new(client),
} }
} }
pub async fn create_bucket(&mut self) -> anyhow::Result<()> {
if (self.client.get_key_value(&self.bucket).await).is_ok() {
return Ok(());
}
if let Err(e) = self
.client
.create_key_value(kv::Config {
bucket: self.bucket.clone(),
description: "leadership bucket for noleader".into(),
limit_markers: Some(std::time::Duration::from_secs(60)),
max_age: std::time::Duration::from_secs(60),
..Default::default()
})
.await
{
tracing::info!(
"bucket creation failed, it might have just been a conflict, testing again: {e}"
);
if (self.client.get_key_value(&self.bucket).await).is_ok() {
return Ok(());
}
anyhow::bail!("failed to create bucket: {}", e)
}
Ok(())
}
/// start, will run a blocking operation for becoming the next leader. /// start, will run a blocking operation for becoming the next leader.
pub async fn start(&mut self) -> anyhow::Result<()> { pub async fn start(&mut self) -> anyhow::Result<()> {
// Attempt to grab leadership, // Attempt to grab leadership,
@@ -249,60 +276,32 @@ impl InnerLeader {
} }
async fn update_leadership(&mut self) -> anyhow::Result<()> { async fn update_leadership(&mut self) -> anyhow::Result<()> {
let bucket = self.client.get_key_value(&self.bucket).await?; let val = self
.backend
.get(&self.key)
.await
.context("could not find key, we've lost leadership status")?;
let Some(val) = bucket.get(&self.key).await? else { match val {
anyhow::bail!("key doesn't exists, we've lost leadership status") backend::LeaderValue::Unknown => anyhow::bail!("leadership is unknown"),
}; backend::LeaderValue::Found { id } if id != self.leader_id => {
anyhow::bail!("leadership has changed")
let Ok(id) = uuid::Uuid::from_slice(&val) else { }
anyhow::bail!("value has changed, it is no longer a uuid, dropping leadership status"); backend::LeaderValue::Found { .. } => self
}; .backend
.update(&self.key, &self.leader_id)
if id != self.id { .await
anyhow::bail!("leadership has changed") .context("update leadership lock")?,
} }
let rev = bucket
.update(
&self.key,
bytes::Bytes::copy_from_slice(self.id.as_bytes()),
self.revision,
)
.await?;
self.revision = rev;
Ok(()) Ok(())
} }
async fn try_for_leadership(&mut self) -> anyhow::Result<()> { async fn try_for_leadership(&mut self) -> anyhow::Result<()> {
let bucket = self self.backend
.client .update(&self.key, &self.leader_id)
.get_key_value(&self.bucket)
.await .await
.context("failed to get bucket")?; .context("try for leadership")?;
let rev = match bucket
.create_with_ttl(
&self.key,
bytes::Bytes::copy_from_slice(self.id.as_bytes()),
std::time::Duration::from_secs(60),
)
.await
{
Ok(rev) => rev,
Err(e) => match e.kind() {
kv::CreateErrorKind::AlreadyExists => {
anyhow::bail!("another candidate has leadership status")
}
_ => {
anyhow::bail!("{}", e);
}
},
};
self.revision = rev;
tokio::time::sleep(DEFAULT_INTERVAL).await; tokio::time::sleep(DEFAULT_INTERVAL).await;
@@ -310,7 +309,7 @@ impl InnerLeader {
let leadership_state = self.leadership_status().await?; let leadership_state = self.leadership_status().await?;
if !leadership_state.is_leader(&self.id) { if !leadership_state.is_leader(&self.leader_id) {
anyhow::bail!("failed to become leader, there is likely some churn going on"); anyhow::bail!("failed to become leader, there is likely some churn going on");
} }
@@ -321,25 +320,16 @@ impl InnerLeader {
} }
async fn leadership_status(&mut self) -> anyhow::Result<LeadershipState> { async fn leadership_status(&mut self) -> anyhow::Result<LeadershipState> {
let bucket = self.client.get_key_value(&self.bucket).await?; let val = self
.backend
let val = bucket.get(&self.key).await?; .get(&self.key)
.await
.inspect_err(|e| tracing::warn!("failed to query for leadership: {}", e))
.ok();
Ok(match val { Ok(match val {
Some(content) => { Some(backend::LeaderValue::Found { id }) => LeadershipState::Allocated { id },
let id = match uuid::Uuid::from_slice(&content) { Some(backend::LeaderValue::Unknown) => LeadershipState::NotFound,
Ok(u) => u,
Err(e) => {
tracing::warn!(
"leadership state is not a valid UUID, ignoring the value: {}",
e
);
return Ok(LeadershipState::NotFound);
}
};
LeadershipState::Allocated { id }
}
None => LeadershipState::NotFound, None => LeadershipState::NotFound,
}) })
} }
@@ -347,11 +337,11 @@ impl InnerLeader {
enum LeadershipState { enum LeadershipState {
NotFound, NotFound,
Allocated { id: uuid::Uuid }, Allocated { id: LeaderId },
} }
impl LeadershipState { impl LeadershipState {
pub fn is_leader(&self, leader_id: &Uuid) -> bool { pub fn is_leader(&self, leader_id: &LeaderId) -> bool {
match self { match self {
LeadershipState::Allocated { id } => id == leader_id, LeadershipState::Allocated { id } => id == leader_id,
_ => false, _ => false,

7
mise.toml Normal file
View File

@@ -0,0 +1,7 @@
[tasks.test]
alias = ["t"]
run = "cargo nextest run"
[tasks.example]
alias = ["e"]
run = "cargo run --example"