feat: add basic no drop
This commit is contained in:
23
crates/nodrop/Cargo.toml
Normal file
23
crates/nodrop/Cargo.toml
Normal file
@@ -0,0 +1,23 @@
|
||||
[package]
|
||||
name = "nodrop"
|
||||
edition = "2024"
|
||||
description = "nodrop, is a simple drop queue system for async operations. (Until async-drop is a thing)"
|
||||
license = "MIT"
|
||||
repository = "https://github.com/kjuulh/nodrop"
|
||||
authors = ["kjuulh <contact@kasperhermansen.com>"]
|
||||
|
||||
version.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
tokio.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
async-trait = { version = "0.1" }
|
||||
|
||||
notmad = { version = "0.7.2", optional = true }
|
||||
tokio-util = { version = "0.7", optional = true }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
notmad = ["dep:notmad", "dep:tokio-util"]
|
242
crates/nodrop/src/lib.rs
Normal file
242
crates/nodrop/src/lib.rs
Normal file
@@ -0,0 +1,242 @@
|
||||
use std::sync::{Arc, atomic::AtomicBool};
|
||||
|
||||
use tokio::sync::{
|
||||
Mutex,
|
||||
mpsc::{self, UnboundedReceiver, UnboundedSender},
|
||||
};
|
||||
|
||||
#[cfg(feature = "notmad")]
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
type ThreadSafeQueueItem = Arc<Mutex<dyn QueueItem + Send + Sync + 'static>>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DropQueue {
|
||||
draining: Arc<AtomicBool>,
|
||||
input: UnboundedSender<ThreadSafeQueueItem>,
|
||||
receiver: Arc<Mutex<UnboundedReceiver<ThreadSafeQueueItem>>>,
|
||||
}
|
||||
|
||||
impl Default for DropQueue {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl DropQueue {
|
||||
pub fn new() -> Self {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
|
||||
Self {
|
||||
draining: Arc::new(AtomicBool::new(false)),
|
||||
input: tx,
|
||||
receiver: Arc::new(Mutex::new(rx)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn assign<F, Fut>(&self, f: F) -> anyhow::Result<()>
|
||||
where
|
||||
F: FnOnce() -> Fut + Send + Sync + 'static,
|
||||
Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
|
||||
{
|
||||
if self.draining.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
panic!("trying to put an item on a draining queue. This is not allowed");
|
||||
}
|
||||
|
||||
self.input
|
||||
.send(Arc::new(Mutex::new(ClosureComponent {
|
||||
inner: Box::new(Some(f)),
|
||||
})))
|
||||
.expect("unbounded channel should never be full");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn process_next(&self) -> anyhow::Result<()> {
|
||||
let item = {
|
||||
let mut queue = self.receiver.lock().await;
|
||||
queue.recv().await
|
||||
};
|
||||
|
||||
if let Some(item) = item {
|
||||
let mut item = item.try_lock().expect("should always be unlockable");
|
||||
item.execute().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn process(&self) -> anyhow::Result<()> {
|
||||
loop {
|
||||
if self.draining.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.process_next().await?;
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn try_process_next(&self) -> anyhow::Result<Option<()>> {
|
||||
let item = {
|
||||
let mut queue = self.receiver.lock().await;
|
||||
match queue.try_recv() {
|
||||
Ok(o) => o,
|
||||
Err(_) => return Ok(None),
|
||||
}
|
||||
};
|
||||
|
||||
let mut item = item
|
||||
.try_lock()
|
||||
.expect("we should always be able to unlock item");
|
||||
item.execute().await?;
|
||||
|
||||
Ok(Some(()))
|
||||
}
|
||||
|
||||
pub async fn drain(&self) -> anyhow::Result<()> {
|
||||
self.draining
|
||||
.store(true, std::sync::atomic::Ordering::Release);
|
||||
|
||||
while self.try_process_next().await?.is_some() {}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "notmad")]
|
||||
async fn process_all(&self, cancellation_token: CancellationToken) -> anyhow::Result<()> {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancellation_token.cancelled() => {
|
||||
return Ok(())
|
||||
},
|
||||
res = self.process_next() => {
|
||||
res?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct ClosureComponent<F, Fut>
|
||||
where
|
||||
F: FnOnce() -> Fut + Send + Sync + 'static,
|
||||
Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
|
||||
{
|
||||
inner: Box<Option<F>>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
trait QueueItem {
|
||||
async fn execute(&mut self) -> anyhow::Result<()>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<F, Fut> QueueItem for ClosureComponent<F, Fut>
|
||||
where
|
||||
F: FnOnce() -> Fut + Send + Sync + 'static,
|
||||
Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
|
||||
{
|
||||
async fn execute(&mut self) -> Result<(), anyhow::Error> {
|
||||
let item = self.inner.take().expect("to only be called once");
|
||||
|
||||
item().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "notmad")]
|
||||
mod notmad {
|
||||
use async_trait::async_trait;
|
||||
use notmad::MadError;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::DropQueue;
|
||||
|
||||
#[async_trait]
|
||||
impl notmad::Component for DropQueue {
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("nodrop/drop-queue".into())
|
||||
}
|
||||
|
||||
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError> {
|
||||
self.process_all(cancellation_token)
|
||||
.await
|
||||
.map_err(notmad::MadError::Inner)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "notmad")]
|
||||
#[allow(unused_imports)]
|
||||
pub use notmad::*;
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use crate::DropQueue;
|
||||
|
||||
#[tokio::test]
|
||||
async fn can_drop_item() -> anyhow::Result<()> {
|
||||
let drop_queue = DropQueue::new();
|
||||
|
||||
let (called_tx, called_rx) = oneshot::channel();
|
||||
|
||||
drop_queue.assign(|| async move {
|
||||
tracing::info!("was called");
|
||||
|
||||
called_tx.send(()).unwrap();
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
drop_queue.process_next().await?;
|
||||
|
||||
called_rx.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn can_drop_multiple_items() -> anyhow::Result<()> {
|
||||
let drop_queue = DropQueue::new();
|
||||
|
||||
let (called_tx, called_rx) = oneshot::channel();
|
||||
let _drop_queue = drop_queue.clone();
|
||||
tokio::spawn(async move {
|
||||
_drop_queue
|
||||
.assign(|| async move {
|
||||
tracing::info!("was called");
|
||||
|
||||
called_tx.send(()).unwrap();
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
let (called_tx2, called_rx2) = oneshot::channel();
|
||||
let _drop_queue = drop_queue.clone();
|
||||
tokio::spawn(async move {
|
||||
_drop_queue
|
||||
.assign(|| async move {
|
||||
tracing::info!("was called");
|
||||
|
||||
called_tx2.send(()).unwrap();
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
drop_queue.process_next().await?;
|
||||
drop_queue.process_next().await?;
|
||||
|
||||
called_rx.await?;
|
||||
called_rx2.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user