diff --git a/Cargo.lock b/Cargo.lock index 3647968..b2739b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -68,6 +68,17 @@ dependencies = [ "url", ] +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.5.0" @@ -649,6 +660,7 @@ version = "0.1.2" dependencies = [ "anyhow", "async-nats", + "async-trait", "bytes", "rand 0.9.1", "tokio", diff --git a/crates/noleader/Cargo.toml b/crates/noleader/Cargo.toml index 1f98624..721d25e 100644 --- a/crates/noleader/Cargo.toml +++ b/crates/noleader/Cargo.toml @@ -13,11 +13,12 @@ anyhow.workspace = true tracing.workspace = true async-nats = "0.42" -uuid = { version = "1", features = ["v4"] } +uuid = { version = "1", features = ["v4", "v7"] } bytes = "1" tokio.workspace = true tokio-util = "0.7" rand = "0.9.1" +async-trait = "0.1.89" [dev-dependencies] tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/crates/noleader/examples/basic/main.rs b/crates/noleader/examples/basic/main.rs index 732c1fd..d9bfb59 100644 --- a/crates/noleader/examples/basic/main.rs +++ b/crates/noleader/examples/basic/main.rs @@ -1,4 +1,3 @@ -use tokio_util::sync::CancellationToken; use tracing_subscriber::EnvFilter; #[tokio::main] @@ -14,15 +13,12 @@ async fn main() -> anyhow::Result<()> { .init(); let mybucket = "mytestbucket"; - let mykey = "myleaderkey"; + let mykey = "basic"; let client = async_nats::connect("localhost:4222").await?; - let leader = noleader::Leader::new(mybucket, mykey, client); + let leader = noleader::Leader::new_nats(mykey, mybucket, client); let leader_id = leader.leader_id().await.to_string(); - tracing::info!("creating bucket"); - leader.create_bucket().await?; - leader .acquire_and_run({ move |token| { diff --git a/crates/noleader/examples/lots_of_candidates/main.rs b/crates/noleader/examples/lots_of_candidates/main.rs index 95f3feb..4ee0711 100644 --- a/crates/noleader/examples/lots_of_candidates/main.rs +++ b/crates/noleader/examples/lots_of_candidates/main.rs @@ -23,12 +23,9 @@ async fn main() -> anyhow::Result<()> { let client = client.clone(); let handle = tokio::spawn(async move { - let leader = noleader::Leader::new(mybucket, mykey, client); + let leader = noleader::Leader::new_nats(mykey, mybucket, client); let leader_id = leader.leader_id().await.to_string(); - tracing::info!("creating bucket"); - leader.create_bucket().await?; - tokio::spawn({ let leader = leader.clone(); let leader_id = leader_id.clone(); diff --git a/crates/noleader/src/backend.rs b/crates/noleader/src/backend.rs new file mode 100644 index 0000000..17bb8ec --- /dev/null +++ b/crates/noleader/src/backend.rs @@ -0,0 +1,98 @@ +use std::{ops::Deref, sync::Arc}; + +use crate::backend::nats::NatsBackend; + +mod nats; + +pub struct Backend { + inner: Arc, +} + +impl Backend { + pub fn new(edge: impl BackendEdge + Send + Sync + 'static) -> Self { + Self { + inner: Arc::new(edge), + } + } + + pub fn nats(client: async_nats::Client, bucket: &str) -> Self { + Self { + inner: Arc::new(NatsBackend::new(client, bucket)), + } + } +} + +impl Deref for Backend { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +#[async_trait::async_trait] +pub trait BackendEdge { + async fn setup(&self) -> anyhow::Result<()>; + async fn get(&self, key: &Key) -> anyhow::Result; + async fn update(&self, key: &Key, val: &LeaderId) -> anyhow::Result<()>; +} + +pub enum LeaderValue { + Unknown, + Found { id: LeaderId }, +} + +pub struct Key(String); + +impl From for Key { + fn from(value: String) -> Self { + Self(value) + } +} +impl From<&str> for Key { + fn from(value: &str) -> Self { + Self(value.to_string()) + } +} + +impl From for String { + fn from(value: Key) -> Self { + value.0 + } +} +impl From<&Key> for String { + fn from(value: &Key) -> Self { + value.0.clone() + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct LeaderId(uuid::Uuid); +impl LeaderId { + pub(crate) fn new() -> Self { + Self(uuid::Uuid::now_v7()) + } +} + +impl From for uuid::Uuid { + fn from(value: LeaderId) -> Self { + value.0 + } +} +impl From<&LeaderId> for uuid::Uuid { + fn from(value: &LeaderId) -> Self { + value.0 + } +} + +impl From for LeaderId { + fn from(value: uuid::Uuid) -> Self { + Self(value) + } +} + +impl LeaderId { + pub const fn as_bytes(&self) -> &[u8] { + self.0.as_bytes() + } +} diff --git a/crates/noleader/src/backend/nats.rs b/crates/noleader/src/backend/nats.rs new file mode 100644 index 0000000..56b6bf9 --- /dev/null +++ b/crates/noleader/src/backend/nats.rs @@ -0,0 +1,125 @@ +use std::sync::atomic::{AtomicU64, Ordering}; + +use anyhow::Context; +use async_nats::jetstream::{self, kv}; + +use crate::backend::{BackendEdge, Key, LeaderId, LeaderValue}; + +pub struct NatsBackend { + bucket: String, + client: jetstream::Context, + + revision: AtomicU64, +} + +impl NatsBackend { + pub fn new(client: async_nats::Client, bucket: &str) -> Self { + Self { + bucket: bucket.into(), + client: jetstream::new(client), + revision: AtomicU64::new(0), + } + } + + pub async fn create_bucket(&self) -> anyhow::Result<()> { + if (self.client.get_key_value(&self.bucket).await).is_ok() { + return Ok(()); + } + + if let Err(e) = self + .client + .create_key_value(kv::Config { + bucket: self.bucket.clone(), + description: "leadership bucket for noleader".into(), + limit_markers: Some(std::time::Duration::from_secs(60)), + max_age: std::time::Duration::from_secs(60), + ..Default::default() + }) + .await + { + tracing::info!( + "bucket creation failed, it might have just been a conflict, testing again: {e}" + ); + + if (self.client.get_key_value(&self.bucket).await).is_ok() { + return Ok(()); + } + + anyhow::bail!("failed to create bucket: {}", e) + } + + Ok(()) + } +} + +#[async_trait::async_trait] +impl BackendEdge for NatsBackend { + async fn setup(&self) -> anyhow::Result<()> { + self.create_bucket().await?; + + Ok(()) + } + async fn get(&self, key: &Key) -> anyhow::Result { + let bucket = self.client.get_key_value(&self.bucket).await?; + + let Some(val) = bucket.get(key).await? else { + anyhow::bail!("key doesn't exists, we've lost leadership status") + }; + + let Ok(id) = uuid::Uuid::from_slice(&val) else { + return Ok(LeaderValue::Unknown); + }; + + Ok(LeaderValue::Found { id: id.into() }) + } + async fn update(&self, key: &Key, val: &LeaderId) -> anyhow::Result<()> { + let bucket = self + .client + .get_key_value(&self.bucket) + .await + .context("get bucket")?; + + match bucket + .update( + &key.0, + bytes::Bytes::copy_from_slice(val.as_bytes()), + self.revision.load(Ordering::Relaxed), + ) + .await + { + Ok(rev) => { + self.revision.store(rev, Ordering::Relaxed); + } + Err(e) => match e.kind() { + kv::UpdateErrorKind::WrongLastRevision => { + tracing::trace!("creating nats entry"); + match bucket + .create_with_ttl( + &key.0, + bytes::Bytes::copy_from_slice(val.as_bytes()), + std::time::Duration::from_secs(60), + ) + .await + { + Ok(rev) => { + self.revision.store(rev, Ordering::Relaxed); + } + Err(e) => match e.kind() { + kv::CreateErrorKind::AlreadyExists => { + anyhow::bail!("another candidate has leadership status") + } + _ => { + anyhow::bail!("{}", e); + } + }, + } + } + _ => { + anyhow::bail!("failed to create bucket: {e}") + } + }, + } + + Ok(()) + } +} diff --git a/crates/noleader/src/inner.create_bucket b/crates/noleader/src/inner.create_bucket new file mode 100644 index 0000000..e69de29 diff --git a/crates/noleader/src/lib.rs b/crates/noleader/src/lib.rs index 296c87e..09f2452 100644 --- a/crates/noleader/src/lib.rs +++ b/crates/noleader/src/lib.rs @@ -1,19 +1,20 @@ use std::{ future::Future, sync::{ - atomic::{AtomicBool, Ordering}, Arc, + atomic::{AtomicBool, Ordering}, }, time::Duration, }; use anyhow::Context; -use async_nats::jetstream::kv; use rand::Rng; use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; use uuid::Uuid; +use crate::backend::{Backend, Key, LeaderId}; + #[derive(Clone)] pub struct Leader { shutting_down: Arc, @@ -22,15 +23,21 @@ pub struct Leader { } const DEFAULT_INTERVAL: Duration = std::time::Duration::from_secs(10); +mod backend; + impl Leader { - pub fn new(bucket: &str, key: &str, client: async_nats::Client) -> Self { + pub fn new(key: &str, backend: Backend) -> Self { Self { shutting_down: Arc::new(AtomicBool::new(false)), is_leader: Arc::new(AtomicBool::new(false)), - inner: Arc::new(RwLock::new(InnerLeader::new(bucket, key, client))), + inner: Arc::new(RwLock::new(InnerLeader::new(backend, key))), } } + pub fn new_nats(key: &str, bucket: &str, client: async_nats::Client) -> Self { + Self::new(key, Backend::nats(client, bucket)) + } + pub async fn acquire_and_run(&self, f: F) -> anyhow::Result<()> where F: Fn(CancellationToken) -> Fut, @@ -101,7 +108,13 @@ impl Leader { let guard = tokio::spawn(async move { loop { - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::select! { + _ = tokio::time::sleep(std::time::Duration::from_millis(500)) => {} + _ = cancellation_token.cancelled() => { + break; + } + + } if !is_leader.load(Ordering::Relaxed) { cancellation_token.cancel(); @@ -117,28 +130,23 @@ impl Leader { pub async fn leader_id(&self) -> Uuid { let inner = self.inner.read().await; - inner.id - } - - pub async fn create_bucket(&self) -> anyhow::Result<()> { - let mut inner = self.inner.write().await; - tracing::info!("creating bucket leadership bucket"); - - inner.create_bucket().await?; - - Ok(()) + inner.leader_id.clone().into() } pub async fn start(&self, cancellation_token: CancellationToken) -> anyhow::Result<()> { let mut attempts = 1; + { + self.inner.write().await.backend.setup().await?; + } + // Initial attempt let _ = self.try_become_leader().await; loop { let wait_factor = { let mut rng = rand::rng(); - rng.random_range(0.001..1.000) + rng.random_range(0.50..1.00) }; let sleep_fut = tokio::time::sleep((DEFAULT_INTERVAL * attempts).mul_f64(wait_factor)); @@ -153,8 +161,7 @@ impl Leader { match self.try_become_leader().await { Ok(_) => { - self.is_leader - .store(true, std::sync::atomic::Ordering::Relaxed); + self.is_leader.store(true, Ordering::Relaxed); attempts = 1; } Err(e) => { @@ -203,13 +210,11 @@ pub enum Status { struct InnerLeader { state: LeaderState, - bucket: String, - key: String, + backend: Backend, - id: uuid::Uuid, + key: Key, + leader_id: LeaderId, revision: u64, - - client: async_nats::jetstream::Context, } #[derive(Default, Clone)] @@ -221,49 +226,18 @@ enum LeaderState { } impl InnerLeader { - pub fn new(bucket: &str, key: &str, client: async_nats::Client) -> Self { + pub fn new(backend: Backend, key: impl Into) -> Self { Self { - bucket: bucket.into(), - key: key.into(), - - id: uuid::Uuid::new_v4(), + backend, + leader_id: LeaderId::new(), revision: u64::MIN, + key: key.into(), + state: LeaderState::Unknown, - client: async_nats::jetstream::new(client), } } - pub async fn create_bucket(&mut self) -> anyhow::Result<()> { - if (self.client.get_key_value(&self.bucket).await).is_ok() { - return Ok(()); - } - - if let Err(e) = self - .client - .create_key_value(kv::Config { - bucket: self.bucket.clone(), - description: "leadership bucket for noleader".into(), - limit_markers: Some(std::time::Duration::from_secs(60)), - max_age: std::time::Duration::from_secs(60), - ..Default::default() - }) - .await - { - tracing::info!( - "bucket creation failed, it might have just been a conflict, testing again: {e}" - ); - - if (self.client.get_key_value(&self.bucket).await).is_ok() { - return Ok(()); - } - - anyhow::bail!("failed to create bucket: {}", e) - } - - Ok(()) - } - /// start, will run a blocking operation for becoming the next leader. pub async fn start(&mut self) -> anyhow::Result<()> { // Attempt to grab leadership, @@ -302,60 +276,32 @@ impl InnerLeader { } async fn update_leadership(&mut self) -> anyhow::Result<()> { - let bucket = self.client.get_key_value(&self.bucket).await?; + let val = self + .backend + .get(&self.key) + .await + .context("could not find key, we've lost leadership status")?; - let Some(val) = bucket.get(&self.key).await? else { - anyhow::bail!("key doesn't exists, we've lost leadership status") - }; - - let Ok(id) = uuid::Uuid::from_slice(&val) else { - anyhow::bail!("value has changed, it is no longer a uuid, dropping leadership status"); - }; - - if id != self.id { - anyhow::bail!("leadership has changed") + match val { + backend::LeaderValue::Unknown => anyhow::bail!("leadership is unknown"), + backend::LeaderValue::Found { id } if id != self.leader_id => { + anyhow::bail!("leadership has changed") + } + backend::LeaderValue::Found { .. } => self + .backend + .update(&self.key, &self.leader_id) + .await + .context("update leadership lock")?, } - let rev = bucket - .update( - &self.key, - bytes::Bytes::copy_from_slice(self.id.as_bytes()), - self.revision, - ) - .await?; - - self.revision = rev; - Ok(()) } async fn try_for_leadership(&mut self) -> anyhow::Result<()> { - let bucket = self - .client - .get_key_value(&self.bucket) + self.backend + .update(&self.key, &self.leader_id) .await - .context("failed to get bucket")?; - - let rev = match bucket - .create_with_ttl( - &self.key, - bytes::Bytes::copy_from_slice(self.id.as_bytes()), - std::time::Duration::from_secs(60), - ) - .await - { - Ok(rev) => rev, - Err(e) => match e.kind() { - kv::CreateErrorKind::AlreadyExists => { - anyhow::bail!("another candidate has leadership status") - } - _ => { - anyhow::bail!("{}", e); - } - }, - }; - - self.revision = rev; + .context("try for leadership")?; tokio::time::sleep(DEFAULT_INTERVAL).await; @@ -363,7 +309,7 @@ impl InnerLeader { let leadership_state = self.leadership_status().await?; - if !leadership_state.is_leader(&self.id) { + if !leadership_state.is_leader(&self.leader_id) { anyhow::bail!("failed to become leader, there is likely some churn going on"); } @@ -374,25 +320,16 @@ impl InnerLeader { } async fn leadership_status(&mut self) -> anyhow::Result { - let bucket = self.client.get_key_value(&self.bucket).await?; - - let val = bucket.get(&self.key).await?; + let val = self + .backend + .get(&self.key) + .await + .inspect_err(|e| tracing::warn!("failed to query for leadership: {}", e)) + .ok(); Ok(match val { - Some(content) => { - let id = match uuid::Uuid::from_slice(&content) { - Ok(u) => u, - Err(e) => { - tracing::warn!( - "leadership state is not a valid UUID, ignoring the value: {}", - e - ); - return Ok(LeadershipState::NotFound); - } - }; - - LeadershipState::Allocated { id } - } + Some(backend::LeaderValue::Found { id }) => LeadershipState::Allocated { id }, + Some(backend::LeaderValue::Unknown) => LeadershipState::NotFound, None => LeadershipState::NotFound, }) } @@ -400,11 +337,11 @@ impl InnerLeader { enum LeadershipState { NotFound, - Allocated { id: uuid::Uuid }, + Allocated { id: LeaderId }, } impl LeadershipState { - pub fn is_leader(&self, leader_id: &Uuid) -> bool { + pub fn is_leader(&self, leader_id: &LeaderId) -> bool { match self { LeadershipState::Allocated { id } => id == leader_id, _ => false, diff --git a/mise.toml b/mise.toml new file mode 100644 index 0000000..719a8a8 --- /dev/null +++ b/mise.toml @@ -0,0 +1,7 @@ +[tasks.test] +alias = ["t"] +run = "cargo nextest run" + +[tasks.example] +alias = ["e"] +run = "cargo run --example"