Compare commits
3 Commits
cb7541171d
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 95e8ed3b7b | |||
| 5b6df5d596 | |||
|
38946f77c6
|
@@ -6,7 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [0.1.1] - 2025-11-13
|
||||
## [0.1.1] - 2025-11-24
|
||||
|
||||
### Added
|
||||
- replace channel with semaphore for better performance
|
||||
|
||||
### Fixed
|
||||
- *(deps)* update all dependencies
|
||||
|
||||
14
Cargo.lock
generated
14
Cargo.lock
generated
@@ -79,7 +79,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "noworkers"
|
||||
version = "0.1.0"
|
||||
version = "0.1.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"tokio",
|
||||
@@ -234,9 +234,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tracing"
|
||||
version = "0.1.41"
|
||||
version = "0.1.43"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
|
||||
checksum = "2d15d90a0b5c19378952d479dc858407149d7bb45a14de0142f6c534b16fc647"
|
||||
dependencies = [
|
||||
"log",
|
||||
"pin-project-lite",
|
||||
@@ -246,9 +246,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tracing-attributes"
|
||||
version = "0.1.30"
|
||||
version = "0.1.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903"
|
||||
checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -257,9 +257,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tracing-core"
|
||||
version = "0.1.34"
|
||||
version = "0.1.35"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678"
|
||||
checksum = "7a04e24fab5c89c6a36eb8558c9656f30d81de51dfa4d3b45f26b21d61fa0a6c"
|
||||
dependencies = [
|
||||
"once_cell",
|
||||
]
|
||||
|
||||
@@ -62,7 +62,7 @@ async fn example_parallel_map(dataset: Vec<i32>) -> anyhow::Result<()> {
|
||||
|
||||
// Update progress
|
||||
let count = processed.fetch_add(1, Ordering::SeqCst) + 1;
|
||||
if count % 10 == 0 {
|
||||
if count.is_multiple_of(10) {
|
||||
println!(" Processed {}/{} items", count, total);
|
||||
}
|
||||
|
||||
@@ -127,9 +127,7 @@ async fn example_pipeline(dataset: Vec<i32>) -> anyhow::Result<()> {
|
||||
stage2
|
||||
.add(move |_| async move {
|
||||
// Filter even numbers and sum
|
||||
let filtered_sum: i32 = chunk.iter()
|
||||
.filter(|&&x| x % 2 == 0)
|
||||
.sum();
|
||||
let filtered_sum: i32 = chunk.iter().filter(|&&x| x % 2 == 0).sum();
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(20)).await;
|
||||
|
||||
@@ -161,7 +159,11 @@ async fn example_batch_processing(dataset: Vec<i32>) -> anyhow::Result<()> {
|
||||
.map(|chunk| chunk.to_vec())
|
||||
.collect();
|
||||
|
||||
println!(" Processing {} batches of {} items each", batches.len(), batch_size);
|
||||
println!(
|
||||
" Processing {} batches of {} items each",
|
||||
batches.len(),
|
||||
batch_size
|
||||
);
|
||||
|
||||
let mut workers = Workers::new();
|
||||
workers.with_limit(3); // Process 3 batches concurrently
|
||||
@@ -206,7 +208,11 @@ async fn example_batch_processing(dataset: Vec<i32>) -> anyhow::Result<()> {
|
||||
let results = results.lock().await;
|
||||
let total_processed: usize = results.iter().sum();
|
||||
|
||||
println!(" Processed {} items in {}ms\n", total_processed, start.elapsed().as_millis());
|
||||
println!(
|
||||
" Processed {} items in {}ms\n",
|
||||
total_processed,
|
||||
start.elapsed().as_millis()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
62
crates/noworkers/examples/wait_lock.rs
Normal file
62
crates/noworkers/examples/wait_lock.rs
Normal file
@@ -0,0 +1,62 @@
|
||||
#![feature(random)]
|
||||
use std::{random, sync::atomic::AtomicUsize};
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
static SLOW_IN_PROGRESS: AtomicUsize = AtomicUsize::new(0);
|
||||
static FAST_IN_PROGRESS: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
println!(
|
||||
"slow: {}, fast: {}",
|
||||
SLOW_IN_PROGRESS.load(std::sync::atomic::Ordering::Relaxed),
|
||||
FAST_IN_PROGRESS.load(std::sync::atomic::Ordering::Relaxed)
|
||||
);
|
||||
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
|
||||
}
|
||||
});
|
||||
let mut workers = noworkers::Workers::new();
|
||||
|
||||
workers.with_limit(30);
|
||||
|
||||
for _ in 0..1000 {
|
||||
let range: u16 = random::random(..);
|
||||
if range < (u16::MAX / 4) {
|
||||
workers.add(slow).await?;
|
||||
continue;
|
||||
}
|
||||
|
||||
workers.add(fast).await?;
|
||||
}
|
||||
|
||||
workers.wait().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn fast(_cancel: CancellationToken) -> anyhow::Result<()> {
|
||||
FAST_IN_PROGRESS.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
||||
// println!("{}: running fast", now());
|
||||
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
|
||||
|
||||
FAST_IN_PROGRESS.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
|
||||
Ok(())
|
||||
}
|
||||
async fn slow(_cancel: CancellationToken) -> anyhow::Result<()> {
|
||||
SLOW_IN_PROGRESS.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
||||
// println!("{}: running slow", now());
|
||||
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
|
||||
// println!("{}: completed slow", now());
|
||||
SLOW_IN_PROGRESS.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn now() -> u128 {
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis()
|
||||
}
|
||||
@@ -191,7 +191,10 @@
|
||||
|
||||
use std::{future::Future, sync::Arc};
|
||||
|
||||
use tokio::{sync::Mutex, task::JoinHandle};
|
||||
use tokio::{
|
||||
sync::{Mutex, OwnedSemaphorePermit, Semaphore},
|
||||
task::JoinHandle,
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
/// Extension traits for common patterns.
|
||||
@@ -322,8 +325,7 @@ enum WorkerLimit {
|
||||
#[default]
|
||||
NoLimit,
|
||||
Amount {
|
||||
queue: tokio::sync::mpsc::Sender<()>,
|
||||
done: Arc<Mutex<tokio::sync::mpsc::Receiver<()>>>,
|
||||
done: Arc<tokio::sync::Semaphore>,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -331,18 +333,21 @@ impl WorkerLimit {
|
||||
pub async fn queue_worker(&self) -> WorkerGuard {
|
||||
match self {
|
||||
WorkerLimit::NoLimit => {}
|
||||
WorkerLimit::Amount { queue, .. } => {
|
||||
WorkerLimit::Amount { done } => {
|
||||
// Queue work, if the channel is limited, we will block until there is enough room
|
||||
queue
|
||||
.send(())
|
||||
let permit = done
|
||||
.clone()
|
||||
.acquire_owned()
|
||||
.await
|
||||
.expect("tried to queue work on a closed worker channel");
|
||||
.expect("to be able to acquire permit");
|
||||
|
||||
return WorkerGuard {
|
||||
_permit: Some(permit),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
WorkerGuard {
|
||||
limit: self.clone(),
|
||||
}
|
||||
WorkerGuard { _permit: None }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -353,24 +358,7 @@ impl WorkerLimit {
|
||||
///
|
||||
/// This type is not directly constructible by users.
|
||||
pub struct WorkerGuard {
|
||||
limit: WorkerLimit,
|
||||
}
|
||||
|
||||
impl Drop for WorkerGuard {
|
||||
fn drop(&mut self) {
|
||||
match &self.limit {
|
||||
WorkerLimit::NoLimit => { /* no limit on dequeue */ }
|
||||
WorkerLimit::Amount { done, .. } => {
|
||||
let done = done.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut done = done.lock().await;
|
||||
|
||||
// dequeue an item, leave room for the next
|
||||
done.recv().await
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
_permit: Option<OwnedSemaphorePermit>,
|
||||
}
|
||||
|
||||
impl Workers {
|
||||
@@ -542,11 +530,8 @@ impl Workers {
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn with_limit(&mut self, limit: usize) -> &mut Self {
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(limit);
|
||||
|
||||
self.limit = WorkerLimit::Amount {
|
||||
queue: tx,
|
||||
done: Arc::new(Mutex::new(rx)),
|
||||
done: Arc::new(Semaphore::new(limit)),
|
||||
};
|
||||
self
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user