From 38946f77c66c2a104c1d258430f24e04615457a9 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Mon, 24 Nov 2025 23:24:59 +0100 Subject: [PATCH] feat: replace channel with semaphore for better performance --- crates/noworkers/examples/basic.rs | 6 +- .../noworkers/examples/parallel_processing.rs | 122 +++++++++--------- crates/noworkers/examples/wait_lock.rs | 62 +++++++++ crates/noworkers/src/lib.rs | 49 +++---- 4 files changed, 146 insertions(+), 93 deletions(-) create mode 100644 crates/noworkers/examples/wait_lock.rs diff --git a/crates/noworkers/examples/basic.rs b/crates/noworkers/examples/basic.rs index 6c45107..60468d6 100644 --- a/crates/noworkers/examples/basic.rs +++ b/crates/noworkers/examples/basic.rs @@ -18,10 +18,10 @@ async fn main() -> anyhow::Result<()> { workers .add(move |_cancel| async move { println!("Task {} starting...", i); - + // Simulate some async work tokio::time::sleep(tokio::time::Duration::from_millis(100 * (i as u64 + 1))).await; - + println!("Task {} completed!", i); Ok(()) }) @@ -36,4 +36,4 @@ async fn main() -> anyhow::Result<()> { println!("\nAll tasks completed successfully!"); Ok(()) -} \ No newline at end of file +} diff --git a/crates/noworkers/examples/parallel_processing.rs b/crates/noworkers/examples/parallel_processing.rs index 9a85203..ff1be2c 100644 --- a/crates/noworkers/examples/parallel_processing.rs +++ b/crates/noworkers/examples/parallel_processing.rs @@ -17,15 +17,15 @@ async fn main() -> anyhow::Result<()> { // Simulate a large dataset let dataset: Vec = (1..=100).collect(); let total_items = dataset.len(); - + println!("Processing {} items in parallel...\n", total_items); // Example 1: Simple parallel map example_parallel_map(dataset.clone()).await?; - + // Example 2: Pipeline with multiple stages example_pipeline(dataset.clone()).await?; - + // Example 3: Batch processing example_batch_processing(dataset).await?; @@ -35,147 +35,149 @@ async fn main() -> anyhow::Result<()> { async fn example_parallel_map(dataset: Vec) -> anyhow::Result<()> { println!("1. Simple Parallel Map"); println!("----------------------"); - + let mut workers = Workers::new(); - + // Use system CPU count for optimal parallelism workers.with_limit_to_system_cpus(); - + let results = Arc::new(tokio::sync::Mutex::new(Vec::new())); let processed = Arc::new(AtomicUsize::new(0)); let total = dataset.len(); - + let start = Instant::now(); - + for item in dataset { let results = results.clone(); let processed = processed.clone(); - + workers .add(move |_cancel| async move { // Simulate CPU-intensive processing let result = expensive_computation(item).await?; - + // Store result let mut res = results.lock().await; res.push(result); - + // Update progress let count = processed.fetch_add(1, Ordering::SeqCst) + 1; - if count % 10 == 0 { + if count.is_multiple_of(10) { println!(" Processed {}/{} items", count, total); } - + Ok(()) }) .await?; } - + workers.wait().await?; - + let results = results.lock().await; let sum: i32 = results.iter().sum(); - + println!(" Completed in {}ms", start.elapsed().as_millis()); println!(" Sum of results: {}\n", sum); - + Ok(()) } async fn example_pipeline(dataset: Vec) -> anyhow::Result<()> { println!("2. Multi-Stage Pipeline"); println!("-----------------------"); - + // Stage 1: Transform let stage1_output = Arc::new(tokio::sync::Mutex::new(Vec::new())); - + let mut stage1 = Workers::new(); stage1.with_limit(4); - + println!(" Stage 1: Transforming data..."); for item in dataset { let output = stage1_output.clone(); - + stage1 .add(move |_| async move { let transformed = item * 2; tokio::time::sleep(Duration::from_millis(10)).await; - + let mut out = output.lock().await; out.push(transformed); - + Ok(()) }) .await?; } - + stage1.wait().await?; - + // Stage 2: Filter and aggregate let stage2_output = Arc::new(tokio::sync::Mutex::new(Vec::new())); - + let mut stage2 = Workers::new(); stage2.with_limit(2); - + println!(" Stage 2: Filtering and aggregating..."); let stage1_data = stage1_output.lock().await.clone(); - + for chunk in stage1_data.chunks(10) { let chunk = chunk.to_vec(); let output = stage2_output.clone(); - + stage2 .add(move |_| async move { // Filter even numbers and sum - let filtered_sum: i32 = chunk.iter() - .filter(|&&x| x % 2 == 0) - .sum(); - + let filtered_sum: i32 = chunk.iter().filter(|&&x| x % 2 == 0).sum(); + tokio::time::sleep(Duration::from_millis(20)).await; - + let mut out = output.lock().await; out.push(filtered_sum); - + Ok(()) }) .await?; } - + stage2.wait().await?; - + let final_results = stage2_output.lock().await; let total: i32 = final_results.iter().sum(); - + println!(" Pipeline result: {}\n", total); - + Ok(()) } async fn example_batch_processing(dataset: Vec) -> anyhow::Result<()> { println!("3. Batch Processing"); println!("-------------------"); - + let batch_size = 20; let batches: Vec> = dataset .chunks(batch_size) .map(|chunk| chunk.to_vec()) .collect(); - - println!(" Processing {} batches of {} items each", batches.len(), batch_size); - + + println!( + " Processing {} batches of {} items each", + batches.len(), + batch_size + ); + let mut workers = Workers::new(); workers.with_limit(3); // Process 3 batches concurrently - + let results = Arc::new(tokio::sync::Mutex::new(Vec::new())); let start = Instant::now(); - + for (batch_idx, batch) in batches.into_iter().enumerate() { let results = results.clone(); - + workers .add(move |cancel| async move { println!(" Batch {} started", batch_idx); - + // Process batch with cancellation support tokio::select! { batch_result = process_batch(batch, batch_idx) => { @@ -200,38 +202,42 @@ async fn example_batch_processing(dataset: Vec) -> anyhow::Result<()> { }) .await?; } - + workers.wait().await?; - + let results = results.lock().await; let total_processed: usize = results.iter().sum(); - - println!(" Processed {} items in {}ms\n", total_processed, start.elapsed().as_millis()); - + + println!( + " Processed {} items in {}ms\n", + total_processed, + start.elapsed().as_millis() + ); + Ok(()) } async fn expensive_computation(n: i32) -> anyhow::Result { // Simulate CPU-intensive work tokio::time::sleep(Duration::from_millis(5)).await; - + // Some complex calculation let result = (n * n) + (n / 2) - 1; - + Ok(result) } async fn process_batch(batch: Vec, _batch_idx: usize) -> anyhow::Result { // Simulate batch processing tokio::time::sleep(Duration::from_millis(100)).await; - + // Process each item in the batch let processed_count = batch.len(); - + // In a real scenario, you might: // - Write to a database // - Send to an API // - Transform and save to files - + Ok(processed_count) -} \ No newline at end of file +} diff --git a/crates/noworkers/examples/wait_lock.rs b/crates/noworkers/examples/wait_lock.rs new file mode 100644 index 0000000..c30c6e5 --- /dev/null +++ b/crates/noworkers/examples/wait_lock.rs @@ -0,0 +1,62 @@ +#![feature(random)] +use std::{random, sync::atomic::AtomicUsize}; + +use tokio_util::sync::CancellationToken; + +static SLOW_IN_PROGRESS: AtomicUsize = AtomicUsize::new(0); +static FAST_IN_PROGRESS: AtomicUsize = AtomicUsize::new(0); + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tokio::spawn(async move { + loop { + println!( + "slow: {}, fast: {}", + SLOW_IN_PROGRESS.load(std::sync::atomic::Ordering::Relaxed), + FAST_IN_PROGRESS.load(std::sync::atomic::Ordering::Relaxed) + ); + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } + }); + let mut workers = noworkers::Workers::new(); + + workers.with_limit(30); + + for _ in 0..1000 { + let range: u16 = random::random(..); + if range < (u16::MAX / 4) { + workers.add(slow).await?; + continue; + } + + workers.add(fast).await?; + } + + workers.wait().await?; + + Ok(()) +} + +async fn fast(_cancel: CancellationToken) -> anyhow::Result<()> { + FAST_IN_PROGRESS.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + // println!("{}: running fast", now()); + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + + FAST_IN_PROGRESS.fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + Ok(()) +} +async fn slow(_cancel: CancellationToken) -> anyhow::Result<()> { + SLOW_IN_PROGRESS.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + // println!("{}: running slow", now()); + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + // println!("{}: completed slow", now()); + SLOW_IN_PROGRESS.fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + Ok(()) +} + +fn now() -> u128 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() +} diff --git a/crates/noworkers/src/lib.rs b/crates/noworkers/src/lib.rs index 3c6f85d..8e3bb94 100644 --- a/crates/noworkers/src/lib.rs +++ b/crates/noworkers/src/lib.rs @@ -191,7 +191,10 @@ use std::{future::Future, sync::Arc}; -use tokio::{sync::Mutex, task::JoinHandle}; +use tokio::{ + sync::{Mutex, OwnedSemaphorePermit, Semaphore}, + task::JoinHandle, +}; use tokio_util::sync::CancellationToken; /// Extension traits for common patterns. @@ -322,8 +325,7 @@ enum WorkerLimit { #[default] NoLimit, Amount { - queue: tokio::sync::mpsc::Sender<()>, - done: Arc>>, + done: Arc, }, } @@ -331,18 +333,21 @@ impl WorkerLimit { pub async fn queue_worker(&self) -> WorkerGuard { match self { WorkerLimit::NoLimit => {} - WorkerLimit::Amount { queue, .. } => { + WorkerLimit::Amount { done } => { // Queue work, if the channel is limited, we will block until there is enough room - queue - .send(()) + let permit = done + .clone() + .acquire_owned() .await - .expect("tried to queue work on a closed worker channel"); + .expect("to be able to acquire permit"); + + return WorkerGuard { + _permit: Some(permit), + }; } } - WorkerGuard { - limit: self.clone(), - } + WorkerGuard { _permit: None } } } @@ -353,24 +358,7 @@ impl WorkerLimit { /// /// This type is not directly constructible by users. pub struct WorkerGuard { - limit: WorkerLimit, -} - -impl Drop for WorkerGuard { - fn drop(&mut self) { - match &self.limit { - WorkerLimit::NoLimit => { /* no limit on dequeue */ } - WorkerLimit::Amount { done, .. } => { - let done = done.clone(); - tokio::spawn(async move { - let mut done = done.lock().await; - - // dequeue an item, leave room for the next - done.recv().await - }); - } - } - } + _permit: Option, } impl Workers { @@ -542,11 +530,8 @@ impl Workers { /// # } /// ``` pub fn with_limit(&mut self, limit: usize) -> &mut Self { - let (tx, rx) = tokio::sync::mpsc::channel(limit); - self.limit = WorkerLimit::Amount { - queue: tx, - done: Arc::new(Mutex::new(rx)), + done: Arc::new(Semaphore::new(limit)), }; self }