feat: replace channel with semaphore for better performance
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
@@ -18,10 +18,10 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
workers
|
workers
|
||||||
.add(move |_cancel| async move {
|
.add(move |_cancel| async move {
|
||||||
println!("Task {} starting...", i);
|
println!("Task {} starting...", i);
|
||||||
|
|
||||||
// Simulate some async work
|
// Simulate some async work
|
||||||
tokio::time::sleep(tokio::time::Duration::from_millis(100 * (i as u64 + 1))).await;
|
tokio::time::sleep(tokio::time::Duration::from_millis(100 * (i as u64 + 1))).await;
|
||||||
|
|
||||||
println!("Task {} completed!", i);
|
println!("Task {} completed!", i);
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
@@ -36,4 +36,4 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
println!("\nAll tasks completed successfully!");
|
println!("\nAll tasks completed successfully!");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,15 +17,15 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
// Simulate a large dataset
|
// Simulate a large dataset
|
||||||
let dataset: Vec<i32> = (1..=100).collect();
|
let dataset: Vec<i32> = (1..=100).collect();
|
||||||
let total_items = dataset.len();
|
let total_items = dataset.len();
|
||||||
|
|
||||||
println!("Processing {} items in parallel...\n", total_items);
|
println!("Processing {} items in parallel...\n", total_items);
|
||||||
|
|
||||||
// Example 1: Simple parallel map
|
// Example 1: Simple parallel map
|
||||||
example_parallel_map(dataset.clone()).await?;
|
example_parallel_map(dataset.clone()).await?;
|
||||||
|
|
||||||
// Example 2: Pipeline with multiple stages
|
// Example 2: Pipeline with multiple stages
|
||||||
example_pipeline(dataset.clone()).await?;
|
example_pipeline(dataset.clone()).await?;
|
||||||
|
|
||||||
// Example 3: Batch processing
|
// Example 3: Batch processing
|
||||||
example_batch_processing(dataset).await?;
|
example_batch_processing(dataset).await?;
|
||||||
|
|
||||||
@@ -35,147 +35,149 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
async fn example_parallel_map(dataset: Vec<i32>) -> anyhow::Result<()> {
|
async fn example_parallel_map(dataset: Vec<i32>) -> anyhow::Result<()> {
|
||||||
println!("1. Simple Parallel Map");
|
println!("1. Simple Parallel Map");
|
||||||
println!("----------------------");
|
println!("----------------------");
|
||||||
|
|
||||||
let mut workers = Workers::new();
|
let mut workers = Workers::new();
|
||||||
|
|
||||||
// Use system CPU count for optimal parallelism
|
// Use system CPU count for optimal parallelism
|
||||||
workers.with_limit_to_system_cpus();
|
workers.with_limit_to_system_cpus();
|
||||||
|
|
||||||
let results = Arc::new(tokio::sync::Mutex::new(Vec::new()));
|
let results = Arc::new(tokio::sync::Mutex::new(Vec::new()));
|
||||||
let processed = Arc::new(AtomicUsize::new(0));
|
let processed = Arc::new(AtomicUsize::new(0));
|
||||||
let total = dataset.len();
|
let total = dataset.len();
|
||||||
|
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
for item in dataset {
|
for item in dataset {
|
||||||
let results = results.clone();
|
let results = results.clone();
|
||||||
let processed = processed.clone();
|
let processed = processed.clone();
|
||||||
|
|
||||||
workers
|
workers
|
||||||
.add(move |_cancel| async move {
|
.add(move |_cancel| async move {
|
||||||
// Simulate CPU-intensive processing
|
// Simulate CPU-intensive processing
|
||||||
let result = expensive_computation(item).await?;
|
let result = expensive_computation(item).await?;
|
||||||
|
|
||||||
// Store result
|
// Store result
|
||||||
let mut res = results.lock().await;
|
let mut res = results.lock().await;
|
||||||
res.push(result);
|
res.push(result);
|
||||||
|
|
||||||
// Update progress
|
// Update progress
|
||||||
let count = processed.fetch_add(1, Ordering::SeqCst) + 1;
|
let count = processed.fetch_add(1, Ordering::SeqCst) + 1;
|
||||||
if count % 10 == 0 {
|
if count.is_multiple_of(10) {
|
||||||
println!(" Processed {}/{} items", count, total);
|
println!(" Processed {}/{} items", count, total);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
workers.wait().await?;
|
workers.wait().await?;
|
||||||
|
|
||||||
let results = results.lock().await;
|
let results = results.lock().await;
|
||||||
let sum: i32 = results.iter().sum();
|
let sum: i32 = results.iter().sum();
|
||||||
|
|
||||||
println!(" Completed in {}ms", start.elapsed().as_millis());
|
println!(" Completed in {}ms", start.elapsed().as_millis());
|
||||||
println!(" Sum of results: {}\n", sum);
|
println!(" Sum of results: {}\n", sum);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn example_pipeline(dataset: Vec<i32>) -> anyhow::Result<()> {
|
async fn example_pipeline(dataset: Vec<i32>) -> anyhow::Result<()> {
|
||||||
println!("2. Multi-Stage Pipeline");
|
println!("2. Multi-Stage Pipeline");
|
||||||
println!("-----------------------");
|
println!("-----------------------");
|
||||||
|
|
||||||
// Stage 1: Transform
|
// Stage 1: Transform
|
||||||
let stage1_output = Arc::new(tokio::sync::Mutex::new(Vec::new()));
|
let stage1_output = Arc::new(tokio::sync::Mutex::new(Vec::new()));
|
||||||
|
|
||||||
let mut stage1 = Workers::new();
|
let mut stage1 = Workers::new();
|
||||||
stage1.with_limit(4);
|
stage1.with_limit(4);
|
||||||
|
|
||||||
println!(" Stage 1: Transforming data...");
|
println!(" Stage 1: Transforming data...");
|
||||||
for item in dataset {
|
for item in dataset {
|
||||||
let output = stage1_output.clone();
|
let output = stage1_output.clone();
|
||||||
|
|
||||||
stage1
|
stage1
|
||||||
.add(move |_| async move {
|
.add(move |_| async move {
|
||||||
let transformed = item * 2;
|
let transformed = item * 2;
|
||||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||||
|
|
||||||
let mut out = output.lock().await;
|
let mut out = output.lock().await;
|
||||||
out.push(transformed);
|
out.push(transformed);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
stage1.wait().await?;
|
stage1.wait().await?;
|
||||||
|
|
||||||
// Stage 2: Filter and aggregate
|
// Stage 2: Filter and aggregate
|
||||||
let stage2_output = Arc::new(tokio::sync::Mutex::new(Vec::new()));
|
let stage2_output = Arc::new(tokio::sync::Mutex::new(Vec::new()));
|
||||||
|
|
||||||
let mut stage2 = Workers::new();
|
let mut stage2 = Workers::new();
|
||||||
stage2.with_limit(2);
|
stage2.with_limit(2);
|
||||||
|
|
||||||
println!(" Stage 2: Filtering and aggregating...");
|
println!(" Stage 2: Filtering and aggregating...");
|
||||||
let stage1_data = stage1_output.lock().await.clone();
|
let stage1_data = stage1_output.lock().await.clone();
|
||||||
|
|
||||||
for chunk in stage1_data.chunks(10) {
|
for chunk in stage1_data.chunks(10) {
|
||||||
let chunk = chunk.to_vec();
|
let chunk = chunk.to_vec();
|
||||||
let output = stage2_output.clone();
|
let output = stage2_output.clone();
|
||||||
|
|
||||||
stage2
|
stage2
|
||||||
.add(move |_| async move {
|
.add(move |_| async move {
|
||||||
// Filter even numbers and sum
|
// Filter even numbers and sum
|
||||||
let filtered_sum: i32 = chunk.iter()
|
let filtered_sum: i32 = chunk.iter().filter(|&&x| x % 2 == 0).sum();
|
||||||
.filter(|&&x| x % 2 == 0)
|
|
||||||
.sum();
|
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_millis(20)).await;
|
tokio::time::sleep(Duration::from_millis(20)).await;
|
||||||
|
|
||||||
let mut out = output.lock().await;
|
let mut out = output.lock().await;
|
||||||
out.push(filtered_sum);
|
out.push(filtered_sum);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
stage2.wait().await?;
|
stage2.wait().await?;
|
||||||
|
|
||||||
let final_results = stage2_output.lock().await;
|
let final_results = stage2_output.lock().await;
|
||||||
let total: i32 = final_results.iter().sum();
|
let total: i32 = final_results.iter().sum();
|
||||||
|
|
||||||
println!(" Pipeline result: {}\n", total);
|
println!(" Pipeline result: {}\n", total);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn example_batch_processing(dataset: Vec<i32>) -> anyhow::Result<()> {
|
async fn example_batch_processing(dataset: Vec<i32>) -> anyhow::Result<()> {
|
||||||
println!("3. Batch Processing");
|
println!("3. Batch Processing");
|
||||||
println!("-------------------");
|
println!("-------------------");
|
||||||
|
|
||||||
let batch_size = 20;
|
let batch_size = 20;
|
||||||
let batches: Vec<Vec<i32>> = dataset
|
let batches: Vec<Vec<i32>> = dataset
|
||||||
.chunks(batch_size)
|
.chunks(batch_size)
|
||||||
.map(|chunk| chunk.to_vec())
|
.map(|chunk| chunk.to_vec())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
println!(" Processing {} batches of {} items each", batches.len(), batch_size);
|
println!(
|
||||||
|
" Processing {} batches of {} items each",
|
||||||
|
batches.len(),
|
||||||
|
batch_size
|
||||||
|
);
|
||||||
|
|
||||||
let mut workers = Workers::new();
|
let mut workers = Workers::new();
|
||||||
workers.with_limit(3); // Process 3 batches concurrently
|
workers.with_limit(3); // Process 3 batches concurrently
|
||||||
|
|
||||||
let results = Arc::new(tokio::sync::Mutex::new(Vec::new()));
|
let results = Arc::new(tokio::sync::Mutex::new(Vec::new()));
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
for (batch_idx, batch) in batches.into_iter().enumerate() {
|
for (batch_idx, batch) in batches.into_iter().enumerate() {
|
||||||
let results = results.clone();
|
let results = results.clone();
|
||||||
|
|
||||||
workers
|
workers
|
||||||
.add(move |cancel| async move {
|
.add(move |cancel| async move {
|
||||||
println!(" Batch {} started", batch_idx);
|
println!(" Batch {} started", batch_idx);
|
||||||
|
|
||||||
// Process batch with cancellation support
|
// Process batch with cancellation support
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
batch_result = process_batch(batch, batch_idx) => {
|
batch_result = process_batch(batch, batch_idx) => {
|
||||||
@@ -200,38 +202,42 @@ async fn example_batch_processing(dataset: Vec<i32>) -> anyhow::Result<()> {
|
|||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
workers.wait().await?;
|
workers.wait().await?;
|
||||||
|
|
||||||
let results = results.lock().await;
|
let results = results.lock().await;
|
||||||
let total_processed: usize = results.iter().sum();
|
let total_processed: usize = results.iter().sum();
|
||||||
|
|
||||||
println!(" Processed {} items in {}ms\n", total_processed, start.elapsed().as_millis());
|
println!(
|
||||||
|
" Processed {} items in {}ms\n",
|
||||||
|
total_processed,
|
||||||
|
start.elapsed().as_millis()
|
||||||
|
);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn expensive_computation(n: i32) -> anyhow::Result<i32> {
|
async fn expensive_computation(n: i32) -> anyhow::Result<i32> {
|
||||||
// Simulate CPU-intensive work
|
// Simulate CPU-intensive work
|
||||||
tokio::time::sleep(Duration::from_millis(5)).await;
|
tokio::time::sleep(Duration::from_millis(5)).await;
|
||||||
|
|
||||||
// Some complex calculation
|
// Some complex calculation
|
||||||
let result = (n * n) + (n / 2) - 1;
|
let result = (n * n) + (n / 2) - 1;
|
||||||
|
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn process_batch(batch: Vec<i32>, _batch_idx: usize) -> anyhow::Result<usize> {
|
async fn process_batch(batch: Vec<i32>, _batch_idx: usize) -> anyhow::Result<usize> {
|
||||||
// Simulate batch processing
|
// Simulate batch processing
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
|
||||||
// Process each item in the batch
|
// Process each item in the batch
|
||||||
let processed_count = batch.len();
|
let processed_count = batch.len();
|
||||||
|
|
||||||
// In a real scenario, you might:
|
// In a real scenario, you might:
|
||||||
// - Write to a database
|
// - Write to a database
|
||||||
// - Send to an API
|
// - Send to an API
|
||||||
// - Transform and save to files
|
// - Transform and save to files
|
||||||
|
|
||||||
Ok(processed_count)
|
Ok(processed_count)
|
||||||
}
|
}
|
||||||
|
|||||||
62
crates/noworkers/examples/wait_lock.rs
Normal file
62
crates/noworkers/examples/wait_lock.rs
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
#![feature(random)]
|
||||||
|
use std::{random, sync::atomic::AtomicUsize};
|
||||||
|
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
|
static SLOW_IN_PROGRESS: AtomicUsize = AtomicUsize::new(0);
|
||||||
|
static FAST_IN_PROGRESS: AtomicUsize = AtomicUsize::new(0);
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
println!(
|
||||||
|
"slow: {}, fast: {}",
|
||||||
|
SLOW_IN_PROGRESS.load(std::sync::atomic::Ordering::Relaxed),
|
||||||
|
FAST_IN_PROGRESS.load(std::sync::atomic::Ordering::Relaxed)
|
||||||
|
);
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
let mut workers = noworkers::Workers::new();
|
||||||
|
|
||||||
|
workers.with_limit(30);
|
||||||
|
|
||||||
|
for _ in 0..1000 {
|
||||||
|
let range: u16 = random::random(..);
|
||||||
|
if range < (u16::MAX / 4) {
|
||||||
|
workers.add(slow).await?;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
workers.add(fast).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
workers.wait().await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fast(_cancel: CancellationToken) -> anyhow::Result<()> {
|
||||||
|
FAST_IN_PROGRESS.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
||||||
|
// println!("{}: running fast", now());
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
|
||||||
|
|
||||||
|
FAST_IN_PROGRESS.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
async fn slow(_cancel: CancellationToken) -> anyhow::Result<()> {
|
||||||
|
SLOW_IN_PROGRESS.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
||||||
|
// println!("{}: running slow", now());
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
|
||||||
|
// println!("{}: completed slow", now());
|
||||||
|
SLOW_IN_PROGRESS.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn now() -> u128 {
|
||||||
|
std::time::SystemTime::now()
|
||||||
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
|
.unwrap()
|
||||||
|
.as_millis()
|
||||||
|
}
|
||||||
@@ -191,7 +191,10 @@
|
|||||||
|
|
||||||
use std::{future::Future, sync::Arc};
|
use std::{future::Future, sync::Arc};
|
||||||
|
|
||||||
use tokio::{sync::Mutex, task::JoinHandle};
|
use tokio::{
|
||||||
|
sync::{Mutex, OwnedSemaphorePermit, Semaphore},
|
||||||
|
task::JoinHandle,
|
||||||
|
};
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
/// Extension traits for common patterns.
|
/// Extension traits for common patterns.
|
||||||
@@ -322,8 +325,7 @@ enum WorkerLimit {
|
|||||||
#[default]
|
#[default]
|
||||||
NoLimit,
|
NoLimit,
|
||||||
Amount {
|
Amount {
|
||||||
queue: tokio::sync::mpsc::Sender<()>,
|
done: Arc<tokio::sync::Semaphore>,
|
||||||
done: Arc<Mutex<tokio::sync::mpsc::Receiver<()>>>,
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -331,18 +333,21 @@ impl WorkerLimit {
|
|||||||
pub async fn queue_worker(&self) -> WorkerGuard {
|
pub async fn queue_worker(&self) -> WorkerGuard {
|
||||||
match self {
|
match self {
|
||||||
WorkerLimit::NoLimit => {}
|
WorkerLimit::NoLimit => {}
|
||||||
WorkerLimit::Amount { queue, .. } => {
|
WorkerLimit::Amount { done } => {
|
||||||
// Queue work, if the channel is limited, we will block until there is enough room
|
// Queue work, if the channel is limited, we will block until there is enough room
|
||||||
queue
|
let permit = done
|
||||||
.send(())
|
.clone()
|
||||||
|
.acquire_owned()
|
||||||
.await
|
.await
|
||||||
.expect("tried to queue work on a closed worker channel");
|
.expect("to be able to acquire permit");
|
||||||
|
|
||||||
|
return WorkerGuard {
|
||||||
|
_permit: Some(permit),
|
||||||
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
WorkerGuard {
|
WorkerGuard { _permit: None }
|
||||||
limit: self.clone(),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -353,24 +358,7 @@ impl WorkerLimit {
|
|||||||
///
|
///
|
||||||
/// This type is not directly constructible by users.
|
/// This type is not directly constructible by users.
|
||||||
pub struct WorkerGuard {
|
pub struct WorkerGuard {
|
||||||
limit: WorkerLimit,
|
_permit: Option<OwnedSemaphorePermit>,
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for WorkerGuard {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
match &self.limit {
|
|
||||||
WorkerLimit::NoLimit => { /* no limit on dequeue */ }
|
|
||||||
WorkerLimit::Amount { done, .. } => {
|
|
||||||
let done = done.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut done = done.lock().await;
|
|
||||||
|
|
||||||
// dequeue an item, leave room for the next
|
|
||||||
done.recv().await
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Workers {
|
impl Workers {
|
||||||
@@ -542,11 +530,8 @@ impl Workers {
|
|||||||
/// # }
|
/// # }
|
||||||
/// ```
|
/// ```
|
||||||
pub fn with_limit(&mut self, limit: usize) -> &mut Self {
|
pub fn with_limit(&mut self, limit: usize) -> &mut Self {
|
||||||
let (tx, rx) = tokio::sync::mpsc::channel(limit);
|
|
||||||
|
|
||||||
self.limit = WorkerLimit::Amount {
|
self.limit = WorkerLimit::Amount {
|
||||||
queue: tx,
|
done: Arc::new(Semaphore::new(limit)),
|
||||||
done: Arc::new(Mutex::new(rx)),
|
|
||||||
};
|
};
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user