4 Commits

Author SHA1 Message Date
cuddle-please
d8d3d7b9a5 chore(release): 0.1.2
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2025-11-29 01:54:00 +00:00
95e8ed3b7b chore(deps): update rust crate tracing to v0.1.43
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2025-11-29 01:52:15 +00:00
5b6df5d596 chore(release): v0.1.1 (#9)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
chore(release): 0.1.1

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: #9
2025-11-25 09:03:15 +01:00
38946f77c6 feat: replace channel with semaphore for better performance
All checks were successful
continuous-integration/drone/push Build is passing
2025-11-24 23:25:11 +01:00
7 changed files with 163 additions and 102 deletions

View File

@@ -6,7 +6,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
## [0.1.1] - 2025-11-13 ## [0.1.2] - 2025-11-29
### Other
- *(deps)* update rust crate tracing to v0.1.43
## [0.1.1] - 2025-11-24
### Added
- replace channel with semaphore for better performance
### Fixed ### Fixed
- *(deps)* update all dependencies - *(deps)* update all dependencies

14
Cargo.lock generated
View File

@@ -79,7 +79,7 @@ dependencies = [
[[package]] [[package]]
name = "noworkers" name = "noworkers"
version = "0.1.0" version = "0.1.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"tokio", "tokio",
@@ -234,9 +234,9 @@ dependencies = [
[[package]] [[package]]
name = "tracing" name = "tracing"
version = "0.1.41" version = "0.1.43"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" checksum = "2d15d90a0b5c19378952d479dc858407149d7bb45a14de0142f6c534b16fc647"
dependencies = [ dependencies = [
"log", "log",
"pin-project-lite", "pin-project-lite",
@@ -246,9 +246,9 @@ dependencies = [
[[package]] [[package]]
name = "tracing-attributes" name = "tracing-attributes"
version = "0.1.30" version = "0.1.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@@ -257,9 +257,9 @@ dependencies = [
[[package]] [[package]]
name = "tracing-core" name = "tracing-core"
version = "0.1.34" version = "0.1.35"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" checksum = "7a04e24fab5c89c6a36eb8558c9656f30d81de51dfa4d3b45f26b21d61fa0a6c"
dependencies = [ dependencies = [
"once_cell", "once_cell",
] ]

View File

@@ -3,7 +3,7 @@ members = ["crates/*"]
resolver = "2" resolver = "2"
[workspace.package] [workspace.package]
version = "0.1.1" version = "0.1.2"
license = "MIT" license = "MIT"
[workspace.dependencies] [workspace.dependencies]

View File

@@ -62,7 +62,7 @@ async fn example_parallel_map(dataset: Vec<i32>) -> anyhow::Result<()> {
// Update progress // Update progress
let count = processed.fetch_add(1, Ordering::SeqCst) + 1; let count = processed.fetch_add(1, Ordering::SeqCst) + 1;
if count % 10 == 0 { if count.is_multiple_of(10) {
println!(" Processed {}/{} items", count, total); println!(" Processed {}/{} items", count, total);
} }
@@ -127,9 +127,7 @@ async fn example_pipeline(dataset: Vec<i32>) -> anyhow::Result<()> {
stage2 stage2
.add(move |_| async move { .add(move |_| async move {
// Filter even numbers and sum // Filter even numbers and sum
let filtered_sum: i32 = chunk.iter() let filtered_sum: i32 = chunk.iter().filter(|&&x| x % 2 == 0).sum();
.filter(|&&x| x % 2 == 0)
.sum();
tokio::time::sleep(Duration::from_millis(20)).await; tokio::time::sleep(Duration::from_millis(20)).await;
@@ -161,7 +159,11 @@ async fn example_batch_processing(dataset: Vec<i32>) -> anyhow::Result<()> {
.map(|chunk| chunk.to_vec()) .map(|chunk| chunk.to_vec())
.collect(); .collect();
println!(" Processing {} batches of {} items each", batches.len(), batch_size); println!(
" Processing {} batches of {} items each",
batches.len(),
batch_size
);
let mut workers = Workers::new(); let mut workers = Workers::new();
workers.with_limit(3); // Process 3 batches concurrently workers.with_limit(3); // Process 3 batches concurrently
@@ -206,7 +208,11 @@ async fn example_batch_processing(dataset: Vec<i32>) -> anyhow::Result<()> {
let results = results.lock().await; let results = results.lock().await;
let total_processed: usize = results.iter().sum(); let total_processed: usize = results.iter().sum();
println!(" Processed {} items in {}ms\n", total_processed, start.elapsed().as_millis()); println!(
" Processed {} items in {}ms\n",
total_processed,
start.elapsed().as_millis()
);
Ok(()) Ok(())
} }

View File

@@ -0,0 +1,62 @@
#![feature(random)]
use std::{random, sync::atomic::AtomicUsize};
use tokio_util::sync::CancellationToken;
static SLOW_IN_PROGRESS: AtomicUsize = AtomicUsize::new(0);
static FAST_IN_PROGRESS: AtomicUsize = AtomicUsize::new(0);
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tokio::spawn(async move {
loop {
println!(
"slow: {}, fast: {}",
SLOW_IN_PROGRESS.load(std::sync::atomic::Ordering::Relaxed),
FAST_IN_PROGRESS.load(std::sync::atomic::Ordering::Relaxed)
);
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
});
let mut workers = noworkers::Workers::new();
workers.with_limit(30);
for _ in 0..1000 {
let range: u16 = random::random(..);
if range < (u16::MAX / 4) {
workers.add(slow).await?;
continue;
}
workers.add(fast).await?;
}
workers.wait().await?;
Ok(())
}
async fn fast(_cancel: CancellationToken) -> anyhow::Result<()> {
FAST_IN_PROGRESS.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
// println!("{}: running fast", now());
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
FAST_IN_PROGRESS.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
Ok(())
}
async fn slow(_cancel: CancellationToken) -> anyhow::Result<()> {
SLOW_IN_PROGRESS.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
// println!("{}: running slow", now());
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
// println!("{}: completed slow", now());
SLOW_IN_PROGRESS.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
Ok(())
}
fn now() -> u128 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
}

View File

@@ -191,7 +191,10 @@
use std::{future::Future, sync::Arc}; use std::{future::Future, sync::Arc};
use tokio::{sync::Mutex, task::JoinHandle}; use tokio::{
sync::{Mutex, OwnedSemaphorePermit, Semaphore},
task::JoinHandle,
};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
/// Extension traits for common patterns. /// Extension traits for common patterns.
@@ -322,8 +325,7 @@ enum WorkerLimit {
#[default] #[default]
NoLimit, NoLimit,
Amount { Amount {
queue: tokio::sync::mpsc::Sender<()>, done: Arc<tokio::sync::Semaphore>,
done: Arc<Mutex<tokio::sync::mpsc::Receiver<()>>>,
}, },
} }
@@ -331,18 +333,21 @@ impl WorkerLimit {
pub async fn queue_worker(&self) -> WorkerGuard { pub async fn queue_worker(&self) -> WorkerGuard {
match self { match self {
WorkerLimit::NoLimit => {} WorkerLimit::NoLimit => {}
WorkerLimit::Amount { queue, .. } => { WorkerLimit::Amount { done } => {
// Queue work, if the channel is limited, we will block until there is enough room // Queue work, if the channel is limited, we will block until there is enough room
queue let permit = done
.send(()) .clone()
.acquire_owned()
.await .await
.expect("tried to queue work on a closed worker channel"); .expect("to be able to acquire permit");
return WorkerGuard {
_permit: Some(permit),
};
} }
} }
WorkerGuard { WorkerGuard { _permit: None }
limit: self.clone(),
}
} }
} }
@@ -353,24 +358,7 @@ impl WorkerLimit {
/// ///
/// This type is not directly constructible by users. /// This type is not directly constructible by users.
pub struct WorkerGuard { pub struct WorkerGuard {
limit: WorkerLimit, _permit: Option<OwnedSemaphorePermit>,
}
impl Drop for WorkerGuard {
fn drop(&mut self) {
match &self.limit {
WorkerLimit::NoLimit => { /* no limit on dequeue */ }
WorkerLimit::Amount { done, .. } => {
let done = done.clone();
tokio::spawn(async move {
let mut done = done.lock().await;
// dequeue an item, leave room for the next
done.recv().await
});
}
}
}
} }
impl Workers { impl Workers {
@@ -542,11 +530,8 @@ impl Workers {
/// # } /// # }
/// ``` /// ```
pub fn with_limit(&mut self, limit: usize) -> &mut Self { pub fn with_limit(&mut self, limit: usize) -> &mut Self {
let (tx, rx) = tokio::sync::mpsc::channel(limit);
self.limit = WorkerLimit::Amount { self.limit = WorkerLimit::Amount {
queue: tx, done: Arc::new(Semaphore::new(limit)),
done: Arc::new(Mutex::new(rx)),
}; };
self self
} }