This commit is contained in:
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -68,6 +68,17 @@ dependencies = [
|
|||||||
"url",
|
"url",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "async-trait"
|
||||||
|
version = "0.1.89"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "autocfg"
|
name = "autocfg"
|
||||||
version = "1.5.0"
|
version = "1.5.0"
|
||||||
@@ -649,6 +660,7 @@ version = "0.1.2"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-nats",
|
"async-nats",
|
||||||
|
"async-trait",
|
||||||
"bytes",
|
"bytes",
|
||||||
"rand 0.9.1",
|
"rand 0.9.1",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
|||||||
@@ -13,11 +13,12 @@ anyhow.workspace = true
|
|||||||
tracing.workspace = true
|
tracing.workspace = true
|
||||||
|
|
||||||
async-nats = "0.42"
|
async-nats = "0.42"
|
||||||
uuid = { version = "1", features = ["v4"] }
|
uuid = { version = "1", features = ["v4", "v7"] }
|
||||||
bytes = "1"
|
bytes = "1"
|
||||||
tokio.workspace = true
|
tokio.workspace = true
|
||||||
tokio-util = "0.7"
|
tokio-util = "0.7"
|
||||||
rand = "0.9.1"
|
rand = "0.9.1"
|
||||||
|
async-trait = "0.1.89"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||||
|
|||||||
@@ -1,4 +1,3 @@
|
|||||||
use tokio_util::sync::CancellationToken;
|
|
||||||
use tracing_subscriber::EnvFilter;
|
use tracing_subscriber::EnvFilter;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
@@ -14,15 +13,12 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
.init();
|
.init();
|
||||||
|
|
||||||
let mybucket = "mytestbucket";
|
let mybucket = "mytestbucket";
|
||||||
let mykey = "myleaderkey";
|
let mykey = "basic";
|
||||||
let client = async_nats::connect("localhost:4222").await?;
|
let client = async_nats::connect("localhost:4222").await?;
|
||||||
|
|
||||||
let leader = noleader::Leader::new(mybucket, mykey, client);
|
let leader = noleader::Leader::new_nats(mykey, mybucket, client);
|
||||||
let leader_id = leader.leader_id().await.to_string();
|
let leader_id = leader.leader_id().await.to_string();
|
||||||
|
|
||||||
tracing::info!("creating bucket");
|
|
||||||
leader.create_bucket().await?;
|
|
||||||
|
|
||||||
leader
|
leader
|
||||||
.acquire_and_run({
|
.acquire_and_run({
|
||||||
move |token| {
|
move |token| {
|
||||||
|
|||||||
@@ -23,12 +23,9 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
let client = client.clone();
|
let client = client.clone();
|
||||||
|
|
||||||
let handle = tokio::spawn(async move {
|
let handle = tokio::spawn(async move {
|
||||||
let leader = noleader::Leader::new(mybucket, mykey, client);
|
let leader = noleader::Leader::new_nats(mykey, mybucket, client);
|
||||||
let leader_id = leader.leader_id().await.to_string();
|
let leader_id = leader.leader_id().await.to_string();
|
||||||
|
|
||||||
tracing::info!("creating bucket");
|
|
||||||
leader.create_bucket().await?;
|
|
||||||
|
|
||||||
tokio::spawn({
|
tokio::spawn({
|
||||||
let leader = leader.clone();
|
let leader = leader.clone();
|
||||||
let leader_id = leader_id.clone();
|
let leader_id = leader_id.clone();
|
||||||
|
|||||||
98
crates/noleader/src/backend.rs
Normal file
98
crates/noleader/src/backend.rs
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
use std::{ops::Deref, sync::Arc};
|
||||||
|
|
||||||
|
use crate::backend::nats::NatsBackend;
|
||||||
|
|
||||||
|
mod nats;
|
||||||
|
|
||||||
|
pub struct Backend {
|
||||||
|
inner: Arc<dyn BackendEdge + Send + Sync + 'static>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Backend {
|
||||||
|
pub fn new(edge: impl BackendEdge + Send + Sync + 'static) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: Arc::new(edge),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn nats(client: async_nats::Client, bucket: &str) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: Arc::new(NatsBackend::new(client, bucket)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Deref for Backend {
|
||||||
|
type Target = Arc<dyn BackendEdge + Send + Sync + 'static>;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.inner
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
pub trait BackendEdge {
|
||||||
|
async fn setup(&self) -> anyhow::Result<()>;
|
||||||
|
async fn get(&self, key: &Key) -> anyhow::Result<LeaderValue>;
|
||||||
|
async fn update(&self, key: &Key, val: &LeaderId) -> anyhow::Result<()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum LeaderValue {
|
||||||
|
Unknown,
|
||||||
|
Found { id: LeaderId },
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Key(String);
|
||||||
|
|
||||||
|
impl From<String> for Key {
|
||||||
|
fn from(value: String) -> Self {
|
||||||
|
Self(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl From<&str> for Key {
|
||||||
|
fn from(value: &str) -> Self {
|
||||||
|
Self(value.to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Key> for String {
|
||||||
|
fn from(value: Key) -> Self {
|
||||||
|
value.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl From<&Key> for String {
|
||||||
|
fn from(value: &Key) -> Self {
|
||||||
|
value.0.clone()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||||
|
pub struct LeaderId(uuid::Uuid);
|
||||||
|
impl LeaderId {
|
||||||
|
pub(crate) fn new() -> Self {
|
||||||
|
Self(uuid::Uuid::now_v7())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<LeaderId> for uuid::Uuid {
|
||||||
|
fn from(value: LeaderId) -> Self {
|
||||||
|
value.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl From<&LeaderId> for uuid::Uuid {
|
||||||
|
fn from(value: &LeaderId) -> Self {
|
||||||
|
value.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<uuid::Uuid> for LeaderId {
|
||||||
|
fn from(value: uuid::Uuid) -> Self {
|
||||||
|
Self(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LeaderId {
|
||||||
|
pub const fn as_bytes(&self) -> &[u8] {
|
||||||
|
self.0.as_bytes()
|
||||||
|
}
|
||||||
|
}
|
||||||
125
crates/noleader/src/backend/nats.rs
Normal file
125
crates/noleader/src/backend/nats.rs
Normal file
@@ -0,0 +1,125 @@
|
|||||||
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
|
|
||||||
|
use anyhow::Context;
|
||||||
|
use async_nats::jetstream::{self, kv};
|
||||||
|
|
||||||
|
use crate::backend::{BackendEdge, Key, LeaderId, LeaderValue};
|
||||||
|
|
||||||
|
pub struct NatsBackend {
|
||||||
|
bucket: String,
|
||||||
|
client: jetstream::Context,
|
||||||
|
|
||||||
|
revision: AtomicU64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NatsBackend {
|
||||||
|
pub fn new(client: async_nats::Client, bucket: &str) -> Self {
|
||||||
|
Self {
|
||||||
|
bucket: bucket.into(),
|
||||||
|
client: jetstream::new(client),
|
||||||
|
revision: AtomicU64::new(0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn create_bucket(&self) -> anyhow::Result<()> {
|
||||||
|
if (self.client.get_key_value(&self.bucket).await).is_ok() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Err(e) = self
|
||||||
|
.client
|
||||||
|
.create_key_value(kv::Config {
|
||||||
|
bucket: self.bucket.clone(),
|
||||||
|
description: "leadership bucket for noleader".into(),
|
||||||
|
limit_markers: Some(std::time::Duration::from_secs(60)),
|
||||||
|
max_age: std::time::Duration::from_secs(60),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::info!(
|
||||||
|
"bucket creation failed, it might have just been a conflict, testing again: {e}"
|
||||||
|
);
|
||||||
|
|
||||||
|
if (self.client.get_key_value(&self.bucket).await).is_ok() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
anyhow::bail!("failed to create bucket: {}", e)
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl BackendEdge for NatsBackend {
|
||||||
|
async fn setup(&self) -> anyhow::Result<()> {
|
||||||
|
self.create_bucket().await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
async fn get(&self, key: &Key) -> anyhow::Result<LeaderValue> {
|
||||||
|
let bucket = self.client.get_key_value(&self.bucket).await?;
|
||||||
|
|
||||||
|
let Some(val) = bucket.get(key).await? else {
|
||||||
|
anyhow::bail!("key doesn't exists, we've lost leadership status")
|
||||||
|
};
|
||||||
|
|
||||||
|
let Ok(id) = uuid::Uuid::from_slice(&val) else {
|
||||||
|
return Ok(LeaderValue::Unknown);
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(LeaderValue::Found { id: id.into() })
|
||||||
|
}
|
||||||
|
async fn update(&self, key: &Key, val: &LeaderId) -> anyhow::Result<()> {
|
||||||
|
let bucket = self
|
||||||
|
.client
|
||||||
|
.get_key_value(&self.bucket)
|
||||||
|
.await
|
||||||
|
.context("get bucket")?;
|
||||||
|
|
||||||
|
match bucket
|
||||||
|
.update(
|
||||||
|
&key.0,
|
||||||
|
bytes::Bytes::copy_from_slice(val.as_bytes()),
|
||||||
|
self.revision.load(Ordering::Relaxed),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(rev) => {
|
||||||
|
self.revision.store(rev, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
Err(e) => match e.kind() {
|
||||||
|
kv::UpdateErrorKind::WrongLastRevision => {
|
||||||
|
tracing::trace!("creating nats entry");
|
||||||
|
match bucket
|
||||||
|
.create_with_ttl(
|
||||||
|
&key.0,
|
||||||
|
bytes::Bytes::copy_from_slice(val.as_bytes()),
|
||||||
|
std::time::Duration::from_secs(60),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(rev) => {
|
||||||
|
self.revision.store(rev, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
Err(e) => match e.kind() {
|
||||||
|
kv::CreateErrorKind::AlreadyExists => {
|
||||||
|
anyhow::bail!("another candidate has leadership status")
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
anyhow::bail!("{}", e);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
anyhow::bail!("failed to create bucket: {e}")
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
0
crates/noleader/src/inner.create_bucket
Normal file
0
crates/noleader/src/inner.create_bucket
Normal file
@@ -1,19 +1,20 @@
|
|||||||
use std::{
|
use std::{
|
||||||
future::Future,
|
future::Future,
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicBool, Ordering},
|
|
||||||
Arc,
|
Arc,
|
||||||
|
atomic::{AtomicBool, Ordering},
|
||||||
},
|
},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use async_nats::jetstream::kv;
|
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::backend::{Backend, Key, LeaderId};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Leader {
|
pub struct Leader {
|
||||||
shutting_down: Arc<AtomicBool>,
|
shutting_down: Arc<AtomicBool>,
|
||||||
@@ -22,15 +23,21 @@ pub struct Leader {
|
|||||||
}
|
}
|
||||||
const DEFAULT_INTERVAL: Duration = std::time::Duration::from_secs(10);
|
const DEFAULT_INTERVAL: Duration = std::time::Duration::from_secs(10);
|
||||||
|
|
||||||
|
mod backend;
|
||||||
|
|
||||||
impl Leader {
|
impl Leader {
|
||||||
pub fn new(bucket: &str, key: &str, client: async_nats::Client) -> Self {
|
pub fn new(key: &str, backend: Backend) -> Self {
|
||||||
Self {
|
Self {
|
||||||
shutting_down: Arc::new(AtomicBool::new(false)),
|
shutting_down: Arc::new(AtomicBool::new(false)),
|
||||||
is_leader: Arc::new(AtomicBool::new(false)),
|
is_leader: Arc::new(AtomicBool::new(false)),
|
||||||
inner: Arc::new(RwLock::new(InnerLeader::new(bucket, key, client))),
|
inner: Arc::new(RwLock::new(InnerLeader::new(backend, key))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn new_nats(key: &str, bucket: &str, client: async_nats::Client) -> Self {
|
||||||
|
Self::new(key, Backend::nats(client, bucket))
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn acquire_and_run<F, Fut>(&self, f: F) -> anyhow::Result<()>
|
pub async fn acquire_and_run<F, Fut>(&self, f: F) -> anyhow::Result<()>
|
||||||
where
|
where
|
||||||
F: Fn(CancellationToken) -> Fut,
|
F: Fn(CancellationToken) -> Fut,
|
||||||
@@ -101,7 +108,13 @@ impl Leader {
|
|||||||
|
|
||||||
let guard = tokio::spawn(async move {
|
let guard = tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
tokio::select! {
|
||||||
|
_ = tokio::time::sleep(std::time::Duration::from_millis(500)) => {}
|
||||||
|
_ = cancellation_token.cancelled() => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
if !is_leader.load(Ordering::Relaxed) {
|
if !is_leader.load(Ordering::Relaxed) {
|
||||||
cancellation_token.cancel();
|
cancellation_token.cancel();
|
||||||
@@ -117,28 +130,23 @@ impl Leader {
|
|||||||
|
|
||||||
pub async fn leader_id(&self) -> Uuid {
|
pub async fn leader_id(&self) -> Uuid {
|
||||||
let inner = self.inner.read().await;
|
let inner = self.inner.read().await;
|
||||||
inner.id
|
inner.leader_id.clone().into()
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn create_bucket(&self) -> anyhow::Result<()> {
|
|
||||||
let mut inner = self.inner.write().await;
|
|
||||||
tracing::info!("creating bucket leadership bucket");
|
|
||||||
|
|
||||||
inner.create_bucket().await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn start(&self, cancellation_token: CancellationToken) -> anyhow::Result<()> {
|
pub async fn start(&self, cancellation_token: CancellationToken) -> anyhow::Result<()> {
|
||||||
let mut attempts = 1;
|
let mut attempts = 1;
|
||||||
|
|
||||||
|
{
|
||||||
|
self.inner.write().await.backend.setup().await?;
|
||||||
|
}
|
||||||
|
|
||||||
// Initial attempt
|
// Initial attempt
|
||||||
let _ = self.try_become_leader().await;
|
let _ = self.try_become_leader().await;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let wait_factor = {
|
let wait_factor = {
|
||||||
let mut rng = rand::rng();
|
let mut rng = rand::rng();
|
||||||
rng.random_range(0.001..1.000)
|
rng.random_range(0.50..1.00)
|
||||||
};
|
};
|
||||||
|
|
||||||
let sleep_fut = tokio::time::sleep((DEFAULT_INTERVAL * attempts).mul_f64(wait_factor));
|
let sleep_fut = tokio::time::sleep((DEFAULT_INTERVAL * attempts).mul_f64(wait_factor));
|
||||||
@@ -153,8 +161,7 @@ impl Leader {
|
|||||||
|
|
||||||
match self.try_become_leader().await {
|
match self.try_become_leader().await {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
self.is_leader
|
self.is_leader.store(true, Ordering::Relaxed);
|
||||||
.store(true, std::sync::atomic::Ordering::Relaxed);
|
|
||||||
attempts = 1;
|
attempts = 1;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -203,13 +210,11 @@ pub enum Status {
|
|||||||
struct InnerLeader {
|
struct InnerLeader {
|
||||||
state: LeaderState,
|
state: LeaderState,
|
||||||
|
|
||||||
bucket: String,
|
backend: Backend,
|
||||||
key: String,
|
|
||||||
|
|
||||||
id: uuid::Uuid,
|
key: Key,
|
||||||
|
leader_id: LeaderId,
|
||||||
revision: u64,
|
revision: u64,
|
||||||
|
|
||||||
client: async_nats::jetstream::Context,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Clone)]
|
#[derive(Default, Clone)]
|
||||||
@@ -221,49 +226,18 @@ enum LeaderState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl InnerLeader {
|
impl InnerLeader {
|
||||||
pub fn new(bucket: &str, key: &str, client: async_nats::Client) -> Self {
|
pub fn new(backend: Backend, key: impl Into<Key>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
bucket: bucket.into(),
|
backend,
|
||||||
key: key.into(),
|
leader_id: LeaderId::new(),
|
||||||
|
|
||||||
id: uuid::Uuid::new_v4(),
|
|
||||||
revision: u64::MIN,
|
revision: u64::MIN,
|
||||||
|
|
||||||
|
key: key.into(),
|
||||||
|
|
||||||
state: LeaderState::Unknown,
|
state: LeaderState::Unknown,
|
||||||
client: async_nats::jetstream::new(client),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn create_bucket(&mut self) -> anyhow::Result<()> {
|
|
||||||
if (self.client.get_key_value(&self.bucket).await).is_ok() {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Err(e) = self
|
|
||||||
.client
|
|
||||||
.create_key_value(kv::Config {
|
|
||||||
bucket: self.bucket.clone(),
|
|
||||||
description: "leadership bucket for noleader".into(),
|
|
||||||
limit_markers: Some(std::time::Duration::from_secs(60)),
|
|
||||||
max_age: std::time::Duration::from_secs(60),
|
|
||||||
..Default::default()
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
tracing::info!(
|
|
||||||
"bucket creation failed, it might have just been a conflict, testing again: {e}"
|
|
||||||
);
|
|
||||||
|
|
||||||
if (self.client.get_key_value(&self.bucket).await).is_ok() {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
anyhow::bail!("failed to create bucket: {}", e)
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// start, will run a blocking operation for becoming the next leader.
|
/// start, will run a blocking operation for becoming the next leader.
|
||||||
pub async fn start(&mut self) -> anyhow::Result<()> {
|
pub async fn start(&mut self) -> anyhow::Result<()> {
|
||||||
// Attempt to grab leadership,
|
// Attempt to grab leadership,
|
||||||
@@ -302,60 +276,32 @@ impl InnerLeader {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn update_leadership(&mut self) -> anyhow::Result<()> {
|
async fn update_leadership(&mut self) -> anyhow::Result<()> {
|
||||||
let bucket = self.client.get_key_value(&self.bucket).await?;
|
let val = self
|
||||||
|
.backend
|
||||||
|
.get(&self.key)
|
||||||
|
.await
|
||||||
|
.context("could not find key, we've lost leadership status")?;
|
||||||
|
|
||||||
let Some(val) = bucket.get(&self.key).await? else {
|
match val {
|
||||||
anyhow::bail!("key doesn't exists, we've lost leadership status")
|
backend::LeaderValue::Unknown => anyhow::bail!("leadership is unknown"),
|
||||||
};
|
backend::LeaderValue::Found { id } if id != self.leader_id => {
|
||||||
|
|
||||||
let Ok(id) = uuid::Uuid::from_slice(&val) else {
|
|
||||||
anyhow::bail!("value has changed, it is no longer a uuid, dropping leadership status");
|
|
||||||
};
|
|
||||||
|
|
||||||
if id != self.id {
|
|
||||||
anyhow::bail!("leadership has changed")
|
anyhow::bail!("leadership has changed")
|
||||||
}
|
}
|
||||||
|
backend::LeaderValue::Found { .. } => self
|
||||||
let rev = bucket
|
.backend
|
||||||
.update(
|
.update(&self.key, &self.leader_id)
|
||||||
&self.key,
|
.await
|
||||||
bytes::Bytes::copy_from_slice(self.id.as_bytes()),
|
.context("update leadership lock")?,
|
||||||
self.revision,
|
}
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
self.revision = rev;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn try_for_leadership(&mut self) -> anyhow::Result<()> {
|
async fn try_for_leadership(&mut self) -> anyhow::Result<()> {
|
||||||
let bucket = self
|
self.backend
|
||||||
.client
|
.update(&self.key, &self.leader_id)
|
||||||
.get_key_value(&self.bucket)
|
|
||||||
.await
|
.await
|
||||||
.context("failed to get bucket")?;
|
.context("try for leadership")?;
|
||||||
|
|
||||||
let rev = match bucket
|
|
||||||
.create_with_ttl(
|
|
||||||
&self.key,
|
|
||||||
bytes::Bytes::copy_from_slice(self.id.as_bytes()),
|
|
||||||
std::time::Duration::from_secs(60),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(rev) => rev,
|
|
||||||
Err(e) => match e.kind() {
|
|
||||||
kv::CreateErrorKind::AlreadyExists => {
|
|
||||||
anyhow::bail!("another candidate has leadership status")
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
anyhow::bail!("{}", e);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
self.revision = rev;
|
|
||||||
|
|
||||||
tokio::time::sleep(DEFAULT_INTERVAL).await;
|
tokio::time::sleep(DEFAULT_INTERVAL).await;
|
||||||
|
|
||||||
@@ -363,7 +309,7 @@ impl InnerLeader {
|
|||||||
|
|
||||||
let leadership_state = self.leadership_status().await?;
|
let leadership_state = self.leadership_status().await?;
|
||||||
|
|
||||||
if !leadership_state.is_leader(&self.id) {
|
if !leadership_state.is_leader(&self.leader_id) {
|
||||||
anyhow::bail!("failed to become leader, there is likely some churn going on");
|
anyhow::bail!("failed to become leader, there is likely some churn going on");
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -374,25 +320,16 @@ impl InnerLeader {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn leadership_status(&mut self) -> anyhow::Result<LeadershipState> {
|
async fn leadership_status(&mut self) -> anyhow::Result<LeadershipState> {
|
||||||
let bucket = self.client.get_key_value(&self.bucket).await?;
|
let val = self
|
||||||
|
.backend
|
||||||
let val = bucket.get(&self.key).await?;
|
.get(&self.key)
|
||||||
|
.await
|
||||||
|
.inspect_err(|e| tracing::warn!("failed to query for leadership: {}", e))
|
||||||
|
.ok();
|
||||||
|
|
||||||
Ok(match val {
|
Ok(match val {
|
||||||
Some(content) => {
|
Some(backend::LeaderValue::Found { id }) => LeadershipState::Allocated { id },
|
||||||
let id = match uuid::Uuid::from_slice(&content) {
|
Some(backend::LeaderValue::Unknown) => LeadershipState::NotFound,
|
||||||
Ok(u) => u,
|
|
||||||
Err(e) => {
|
|
||||||
tracing::warn!(
|
|
||||||
"leadership state is not a valid UUID, ignoring the value: {}",
|
|
||||||
e
|
|
||||||
);
|
|
||||||
return Ok(LeadershipState::NotFound);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
LeadershipState::Allocated { id }
|
|
||||||
}
|
|
||||||
None => LeadershipState::NotFound,
|
None => LeadershipState::NotFound,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -400,11 +337,11 @@ impl InnerLeader {
|
|||||||
|
|
||||||
enum LeadershipState {
|
enum LeadershipState {
|
||||||
NotFound,
|
NotFound,
|
||||||
Allocated { id: uuid::Uuid },
|
Allocated { id: LeaderId },
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LeadershipState {
|
impl LeadershipState {
|
||||||
pub fn is_leader(&self, leader_id: &Uuid) -> bool {
|
pub fn is_leader(&self, leader_id: &LeaderId) -> bool {
|
||||||
match self {
|
match self {
|
||||||
LeadershipState::Allocated { id } => id == leader_id,
|
LeadershipState::Allocated { id } => id == leader_id,
|
||||||
_ => false,
|
_ => false,
|
||||||
|
|||||||
Reference in New Issue
Block a user