feat: add basic retry mechanism
Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
79
Cargo.lock
generated
79
Cargo.lock
generated
@@ -4,9 +4,9 @@ version = 4
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "anyhow"
|
name = "anyhow"
|
||||||
version = "1.0.71"
|
version = "1.0.100"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8"
|
checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "autocfg"
|
name = "autocfg"
|
||||||
@@ -33,10 +33,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "either"
|
name = "fastrand"
|
||||||
version = "1.15.0"
|
version = "2.3.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
|
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hermit-abi"
|
name = "hermit-abi"
|
||||||
@@ -63,12 +63,6 @@ dependencies = [
|
|||||||
"scopeguard",
|
"scopeguard",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "log"
|
|
||||||
version = "0.4.19"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "mio"
|
name = "mio"
|
||||||
version = "0.8.8"
|
version = "0.8.8"
|
||||||
@@ -85,10 +79,8 @@ name = "noretry"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"either",
|
"fastrand",
|
||||||
"thiserror",
|
|
||||||
"tokio",
|
"tokio",
|
||||||
"tracing",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -101,12 +93,6 @@ dependencies = [
|
|||||||
"libc",
|
"libc",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "once_cell"
|
|
||||||
version = "1.18.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "parking_lot"
|
name = "parking_lot"
|
||||||
version = "0.12.1"
|
version = "0.12.1"
|
||||||
@@ -205,26 +191,6 @@ dependencies = [
|
|||||||
"unicode-ident",
|
"unicode-ident",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "thiserror"
|
|
||||||
version = "2.0.18"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4"
|
|
||||||
dependencies = [
|
|
||||||
"thiserror-impl",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "thiserror-impl"
|
|
||||||
version = "2.0.18"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio"
|
name = "tokio"
|
||||||
version = "1.28.2"
|
version = "1.28.2"
|
||||||
@@ -255,39 +221,6 @@ dependencies = [
|
|||||||
"syn",
|
"syn",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tracing"
|
|
||||||
version = "0.1.37"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
|
|
||||||
dependencies = [
|
|
||||||
"cfg-if",
|
|
||||||
"log",
|
|
||||||
"pin-project-lite",
|
|
||||||
"tracing-attributes",
|
|
||||||
"tracing-core",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tracing-attributes"
|
|
||||||
version = "0.1.24"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tracing-core"
|
|
||||||
version = "0.1.31"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
|
|
||||||
dependencies = [
|
|
||||||
"once_cell",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "unicode-ident"
|
name = "unicode-ident"
|
||||||
version = "1.0.9"
|
version = "1.0.9"
|
||||||
|
|||||||
@@ -6,5 +6,6 @@ resolver = "2"
|
|||||||
noretry = { path = "crates/noretry" }
|
noretry = { path = "crates/noretry" }
|
||||||
|
|
||||||
anyhow = { version = "1.0.71" }
|
anyhow = { version = "1.0.71" }
|
||||||
|
fastrand = { version = "2" }
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
tracing = { version = "0.1", features = ["log"] }
|
tracing = { version = "0.1", features = ["log"] }
|
||||||
|
|||||||
@@ -4,8 +4,8 @@ version = "0.1.0"
|
|||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow.workspace = true
|
fastrand.workspace = true
|
||||||
either = "1.15.0"
|
|
||||||
thiserror = "2.0.18"
|
|
||||||
tokio.workspace = true
|
tokio.workspace = true
|
||||||
tracing.workspace = true
|
|
||||||
|
[dev-dependencies]
|
||||||
|
anyhow.workspace = true
|
||||||
|
|||||||
@@ -1,17 +1,22 @@
|
|||||||
use std::fmt::Display;
|
use std::fmt;
|
||||||
|
use std::time::Duration;
|
||||||
use either::Either;
|
|
||||||
|
|
||||||
pub fn builder() -> RetryBuilder {
|
pub fn builder() -> RetryBuilder {
|
||||||
RetryBuilder {
|
RetryBuilder {
|
||||||
max_attempts: Some(3),
|
max_attempts: 3,
|
||||||
retry_strategy: Some(RetryStrategy::ExponentialBackoff),
|
retry_strategy: RetryStrategy::ExponentialBackoff,
|
||||||
|
initial_delay: Duration::from_millis(100),
|
||||||
|
max_delay: Duration::from_secs(30),
|
||||||
|
jitter: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct RetryBuilder {
|
pub struct RetryBuilder {
|
||||||
max_attempts: Option<usize>,
|
max_attempts: usize,
|
||||||
retry_strategy: Option<RetryStrategy>,
|
retry_strategy: RetryStrategy,
|
||||||
|
initial_delay: Duration,
|
||||||
|
max_delay: Duration,
|
||||||
|
jitter: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum RetryStrategy {
|
pub enum RetryStrategy {
|
||||||
@@ -21,53 +26,155 @@ pub enum RetryStrategy {
|
|||||||
|
|
||||||
impl RetryBuilder {
|
impl RetryBuilder {
|
||||||
pub fn with_max_attempts(mut self, attempts: usize) -> Self {
|
pub fn with_max_attempts(mut self, attempts: usize) -> Self {
|
||||||
self.max_attempts = Some(attempts);
|
self.max_attempts = attempts;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn with_exponential_backoff(mut self) -> Self {
|
pub fn with_exponential_backoff(mut self) -> Self {
|
||||||
self.retry_strategy = Some(RetryStrategy::ExponentialBackoff);
|
self.retry_strategy = RetryStrategy::ExponentialBackoff;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn with_linear_backoff(mut self) -> Self {
|
pub fn with_linear_backoff(mut self) -> Self {
|
||||||
self.retry_strategy = Some(RetryStrategy::LinearBackoff);
|
self.retry_strategy = RetryStrategy::LinearBackoff;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn build(mut self) -> RetryOptions {
|
pub fn with_initial_delay(mut self, delay: Duration) -> Self {
|
||||||
|
self.initial_delay = delay;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn with_max_delay(mut self, delay: Duration) -> Self {
|
||||||
|
self.max_delay = delay;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn with_jitter(mut self, jitter: bool) -> Self {
|
||||||
|
self.jitter = jitter;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn build(self) -> RetryOptions {
|
||||||
RetryOptions {
|
RetryOptions {
|
||||||
max_attempts: self.max_attempts,
|
max_attempts: self.max_attempts,
|
||||||
retry_strategy: self.retry_strategy,
|
retry_strategy: self.retry_strategy,
|
||||||
|
initial_delay: self.initial_delay,
|
||||||
|
max_delay: self.max_delay,
|
||||||
|
jitter: self.jitter,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct RetryOptions {
|
pub struct RetryOptions {
|
||||||
max_attempts: Option<usize>,
|
max_attempts: usize,
|
||||||
retry_strategy: Option<RetryStrategy>,
|
retry_strategy: RetryStrategy,
|
||||||
|
initial_delay: Duration,
|
||||||
|
max_delay: Duration,
|
||||||
|
jitter: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RetryOptions {
|
impl RetryOptions {
|
||||||
pub async fn run<T, TFut, TOk, TError>(
|
pub async fn run<T, TFut, TOk, TError>(self, f: T) -> Result<TOk, RetryError<TError>>
|
||||||
mut self,
|
|
||||||
f: T,
|
|
||||||
) -> Result<TOk, Either<TError, RetryError>>
|
|
||||||
where
|
where
|
||||||
T: Fn() -> TFut + Send + 'static,
|
T: Fn() -> TFut + Send + 'static,
|
||||||
TFut: Future<Output = Result<TOk, TError>>,
|
TFut: Future<Output = Result<TOk, Retryable<TError>>>,
|
||||||
{
|
{
|
||||||
|
let mut last_error = None;
|
||||||
|
|
||||||
|
for attempt in 1..=self.max_attempts {
|
||||||
match f().await {
|
match f().await {
|
||||||
Ok(success) => return Ok(success),
|
Ok(success) => return Ok(success),
|
||||||
Err(e) => return Err(either::Left(e)),
|
Err(Retryable::Permanent { error }) => {
|
||||||
|
return Err(RetryError::Permanent { error });
|
||||||
|
}
|
||||||
|
Err(Retryable::Transient { error }) => {
|
||||||
|
last_error = Some(error);
|
||||||
|
|
||||||
|
if attempt < self.max_attempts {
|
||||||
|
let delay = self.get_delay(attempt);
|
||||||
|
tokio::time::sleep(delay).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(RetryError::Exhausted {
|
||||||
|
last_error: last_error.unwrap(),
|
||||||
|
attempts: self.max_attempts,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_delay(&self, attempt: usize) -> Duration {
|
||||||
|
let base = match self.retry_strategy {
|
||||||
|
RetryStrategy::LinearBackoff => self.initial_delay * attempt as u32,
|
||||||
|
RetryStrategy::ExponentialBackoff => self.initial_delay * 2u32.pow(attempt as u32 - 1),
|
||||||
};
|
};
|
||||||
|
|
||||||
Err(either::Right(RetryError::ExhaustedAttempts))
|
let capped = base.min(self.max_delay);
|
||||||
|
|
||||||
|
if self.jitter {
|
||||||
|
// Random factor in [0.5, 1.0) to avoid thundering herd
|
||||||
|
let jitter_factor = 0.5 + fastrand::f64() * 0.5;
|
||||||
|
capped.mul_f64(jitter_factor)
|
||||||
|
} else {
|
||||||
|
capped
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(thiserror::Error, Debug)]
|
pub enum Retryable<E> {
|
||||||
pub enum RetryError {
|
Transient { error: E },
|
||||||
#[error("retry stopped because attempts exceeded budget")]
|
Permanent { error: E },
|
||||||
ExhaustedAttempts,
|
}
|
||||||
|
|
||||||
|
impl<E> From<E> for Retryable<E> {
|
||||||
|
fn from(value: E) -> Self {
|
||||||
|
Self::Transient { error: value }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum RetryError<E> {
|
||||||
|
/// All retry attempts were exhausted.
|
||||||
|
Exhausted {
|
||||||
|
last_error: E,
|
||||||
|
attempts: usize,
|
||||||
|
},
|
||||||
|
Permanent {
|
||||||
|
error: E,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<E> RetryError<E> {
|
||||||
|
/// Extract the underlying error, discarding retry context.
|
||||||
|
pub fn into_inner(self) -> E {
|
||||||
|
match self {
|
||||||
|
RetryError::Exhausted { last_error, .. } => last_error,
|
||||||
|
RetryError::Permanent { error } => error,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<E: fmt::Display> fmt::Display for RetryError<E> {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
RetryError::Exhausted {
|
||||||
|
last_error,
|
||||||
|
attempts,
|
||||||
|
} => write!(f, "retry exhausted after {attempts} attempts: {last_error}"),
|
||||||
|
RetryError::Permanent { error } => {
|
||||||
|
write!(f, "{error}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<E: std::error::Error + 'static> std::error::Error for RetryError<E> {
|
||||||
|
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||||
|
match self {
|
||||||
|
RetryError::Exhausted { last_error, .. } => Some(last_error),
|
||||||
|
RetryError::Permanent { error } => Some(error),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,12 @@
|
|||||||
|
use noretry::Retryable;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_can_call() -> anyhow::Result<()> {
|
async fn test_can_call() -> anyhow::Result<()> {
|
||||||
let res = noretry::builder().build().run(my_func).await??;
|
let res = noretry::builder()
|
||||||
|
.build()
|
||||||
|
.run(my_func)
|
||||||
|
.await
|
||||||
|
.map_err(|e| e.into_inner())?;
|
||||||
|
|
||||||
assert_eq!("output", res.as_str());
|
assert_eq!("output", res.as_str());
|
||||||
|
|
||||||
@@ -8,20 +14,52 @@ async fn test_can_call() -> anyhow::Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_can_fail() -> anyhow::Result<()> {
|
async fn test_can_fail() {
|
||||||
let res = noretry::builder().build().run(fail).await?;
|
let res = noretry::builder()
|
||||||
|
.with_max_attempts(2)
|
||||||
|
.build()
|
||||||
|
.run(fail)
|
||||||
|
.await;
|
||||||
|
|
||||||
assert_eq!("output", res.as_str());
|
if let Err(noretry::RetryError::Exhausted {
|
||||||
|
attempts,
|
||||||
Ok(())
|
last_error,
|
||||||
|
}) = res
|
||||||
|
{
|
||||||
|
assert_eq!(2, attempts);
|
||||||
|
assert_eq!("fails", last_error.to_string());
|
||||||
|
} else {
|
||||||
|
panic!("expected error")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn my_func() -> anyhow::Result<String> {
|
async fn my_func() -> Result<String, Retryable<anyhow::Error>> {
|
||||||
println!("my func was called");
|
println!("my func was called");
|
||||||
|
|
||||||
Ok("output".into())
|
Ok("output".into())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fail() -> anyhow::Result<String> {
|
async fn fail() -> Result<String, Retryable<anyhow::Error>> {
|
||||||
anyhow::bail!("fails")
|
Err(anyhow::anyhow!("fails"))?
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_permanent_error() {
|
||||||
|
let res = noretry::builder()
|
||||||
|
.with_max_attempts(5)
|
||||||
|
.build()
|
||||||
|
.run(fail_permanent)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
if let Err(noretry::RetryError::Permanent { error }) = res {
|
||||||
|
assert_eq!("not retryable", error.to_string());
|
||||||
|
} else {
|
||||||
|
panic!("expected permanent error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fail_permanent() -> Result<String, Retryable<anyhow::Error>> {
|
||||||
|
Err(Retryable::Permanent {
|
||||||
|
error: anyhow::anyhow!("not retryable"),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user