Compare commits
6 Commits
renovate/c
...
v0.1.0
Author | SHA1 | Date | |
---|---|---|---|
2df59b60b6 | |||
f6d07134f9
|
|||
8121bf7985 | |||
49b931775c | |||
5ab54585c0
|
|||
119fb101cf |
11
CHANGELOG.md
11
CHANGELOG.md
@@ -6,6 +6,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [0.1.0] - 2025-08-08
|
||||
|
||||
### Added
|
||||
- add docs and examples
|
||||
|
||||
## [0.0.4] - 2025-08-08
|
||||
|
||||
### Other
|
||||
- Configure Renovate (#3)
|
||||
Add renovate.json
|
||||
|
||||
## [0.0.1] - 2025-07-01
|
||||
|
||||
### Added
|
||||
|
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -88,7 +88,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "noworkers"
|
||||
version = "0.0.1"
|
||||
version = "0.0.4"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"tokio",
|
||||
|
@@ -3,7 +3,7 @@ members = ["crates/*"]
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
version = "0.0.1"
|
||||
version = "0.1.0"
|
||||
license = "MIT"
|
||||
|
||||
[workspace.dependencies]
|
||||
|
@@ -110,6 +110,6 @@ See [LICENSE-MIT](LICENSE-MIT) and [LICENSE-APACHE](LICENSE-APACHE) for details.
|
||||
|
||||
## Contribute
|
||||
|
||||
Simply create an issue here or pr https://github.com/kjuulh/noworkers.git, development happens publicly at: https://git.front.kjuulh.io/kjuulh/noworkers.
|
||||
Simply create an issue here or pr https://github.com/kjuulh/noworkers.git, development happens publicly at: https://git.kjuulh.io/kjuulh/noworkers.
|
||||
|
||||
|
||||
|
@@ -4,7 +4,7 @@ edition = "2024"
|
||||
readme = "../../README.md"
|
||||
version.workspace = true
|
||||
license.workspace = true
|
||||
repository = "https://git.front.kjuulh.io/kjuulh/noworkers"
|
||||
repository = "https://git.kjuulh.io/kjuulh/noworkers"
|
||||
authors = ["kjuulh <contact@kasperhermansen.com>"]
|
||||
description = "A small asyncronous worker pool manages thread pool limiting, cancellation and error propogation, inspired by golangs errgroup (requires tokio)"
|
||||
|
||||
@@ -13,3 +13,6 @@ anyhow.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-util = "0.7.15"
|
||||
tracing.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
|
39
crates/noworkers/examples/basic.rs
Normal file
39
crates/noworkers/examples/basic.rs
Normal file
@@ -0,0 +1,39 @@
|
||||
//! Basic usage of the noworkers library.
|
||||
//!
|
||||
//! This example demonstrates the simplest way to use noworkers to spawn
|
||||
//! and manage concurrent async tasks.
|
||||
|
||||
use noworkers::Workers;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
println!("Basic noworkers example");
|
||||
println!("=======================\n");
|
||||
|
||||
// Create a new worker group
|
||||
let workers = Workers::new();
|
||||
|
||||
// Spawn several async tasks
|
||||
for i in 0..5 {
|
||||
workers
|
||||
.add(move |_cancel| async move {
|
||||
println!("Task {} starting...", i);
|
||||
|
||||
// Simulate some async work
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(100 * (i as u64 + 1))).await;
|
||||
|
||||
println!("Task {} completed!", i);
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
println!("\nAll tasks spawned, waiting for completion...\n");
|
||||
|
||||
// Wait for all tasks to complete
|
||||
workers.wait().await?;
|
||||
|
||||
println!("\nAll tasks completed successfully!");
|
||||
|
||||
Ok(())
|
||||
}
|
157
crates/noworkers/examples/cancellation.rs
Normal file
157
crates/noworkers/examples/cancellation.rs
Normal file
@@ -0,0 +1,157 @@
|
||||
//! Cancellation patterns with noworkers.
|
||||
//!
|
||||
//! This example demonstrates different ways to cancel running tasks:
|
||||
//! - External cancellation token
|
||||
//! - Task-driven cancellation (timeout)
|
||||
//! - Error-triggered cancellation
|
||||
|
||||
use noworkers::Workers;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use std::time::Duration;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
println!("Cancellation Patterns Example");
|
||||
println!("=============================\n");
|
||||
|
||||
// Example 1: External cancellation token
|
||||
example_external_cancellation().await?;
|
||||
|
||||
// Example 2: Task-driven cancellation (timeout)
|
||||
example_timeout_cancellation().await?;
|
||||
|
||||
// Example 3: Error-triggered cancellation
|
||||
let _ = example_error_cancellation().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn example_external_cancellation() -> anyhow::Result<()> {
|
||||
println!("1. External Cancellation Token");
|
||||
println!("------------------------------");
|
||||
|
||||
let mut workers = Workers::new();
|
||||
let cancel_token = CancellationToken::new();
|
||||
|
||||
// Link workers to external cancellation token
|
||||
workers.with_cancel(&cancel_token);
|
||||
|
||||
// Spawn tasks that respect cancellation
|
||||
for i in 0..5 {
|
||||
workers
|
||||
.add(move |cancel| async move {
|
||||
println!(" Task {} started", i);
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(Duration::from_secs(10)) => {
|
||||
println!(" Task {} completed normally", i);
|
||||
Ok(())
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
println!(" Task {} cancelled!", i);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Cancel after 500ms
|
||||
let cancel_clone = cancel_token.clone();
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
println!(" >>> Triggering external cancellation");
|
||||
cancel_clone.cancel();
|
||||
});
|
||||
|
||||
workers.wait().await?;
|
||||
println!();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn example_timeout_cancellation() -> anyhow::Result<()> {
|
||||
println!("2. Task-Driven Cancellation (Timeout)");
|
||||
println!("-------------------------------------");
|
||||
|
||||
let mut workers = Workers::new();
|
||||
|
||||
// Set up a timeout task that cancels after 1 second
|
||||
workers.with_cancel_task(async {
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
println!(" >>> Timeout reached, cancelling all tasks");
|
||||
});
|
||||
|
||||
// Spawn long-running tasks
|
||||
for i in 0..3 {
|
||||
workers
|
||||
.add(move |cancel| async move {
|
||||
println!(" Long task {} started", i);
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(Duration::from_secs(60)) => {
|
||||
println!(" Long task {} completed (shouldn't happen)", i);
|
||||
Ok(())
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
println!(" Long task {} cancelled by timeout", i);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
workers.wait().await?;
|
||||
println!();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn example_error_cancellation() -> anyhow::Result<()> {
|
||||
println!("3. Error-Triggered Cancellation");
|
||||
println!("-------------------------------");
|
||||
|
||||
let workers = Workers::new();
|
||||
|
||||
// Task that will fail and trigger cancellation
|
||||
workers
|
||||
.add(|_| async {
|
||||
println!(" Failing task started");
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
println!(" >>> Task failing with error!");
|
||||
Err(anyhow::anyhow!("Intentional failure"))
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Tasks that will be cancelled due to the error
|
||||
for i in 0..3 {
|
||||
workers
|
||||
.add(move |cancel| async move {
|
||||
println!(" Normal task {} started", i);
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(Duration::from_secs(10)) => {
|
||||
println!(" Normal task {} completed (shouldn't happen)", i);
|
||||
Ok(())
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
println!(" Normal task {} cancelled due to error", i);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
// This will return the error from the failing task
|
||||
let result = workers.wait().await;
|
||||
|
||||
match result {
|
||||
Ok(_) => println!(" Unexpected success"),
|
||||
Err(e) => println!(" Expected error caught: {}", e),
|
||||
}
|
||||
|
||||
println!();
|
||||
Ok(())
|
||||
}
|
159
crates/noworkers/examples/error_handling.rs
Normal file
159
crates/noworkers/examples/error_handling.rs
Normal file
@@ -0,0 +1,159 @@
|
||||
//! Error handling and propagation with noworkers.
|
||||
//!
|
||||
//! This example demonstrates how noworkers handles errors with
|
||||
//! first-error-wins semantics, similar to Go's errgroup.
|
||||
|
||||
use noworkers::Workers;
|
||||
use std::time::Duration;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
println!("Error Handling Example");
|
||||
println!("=====================\n");
|
||||
|
||||
// Example 1: First error wins
|
||||
let _ = example_first_error_wins().await;
|
||||
|
||||
// Example 2: Mixed success and failure
|
||||
let _ = example_mixed_results().await;
|
||||
|
||||
// Example 3: All tasks succeed
|
||||
example_all_succeed().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn example_first_error_wins() -> anyhow::Result<()> {
|
||||
println!("1. First Error Wins");
|
||||
println!("-------------------");
|
||||
|
||||
let workers = Workers::new();
|
||||
|
||||
// Spawn multiple tasks that will fail at different times
|
||||
workers
|
||||
.add(|_| async {
|
||||
tokio::time::sleep(Duration::from_millis(300)).await;
|
||||
println!(" Task A: Failing after 300ms");
|
||||
Err(anyhow::anyhow!("Error from task A"))
|
||||
})
|
||||
.await?;
|
||||
|
||||
workers
|
||||
.add(|_| async {
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
println!(" Task B: Failing after 100ms (this should win)");
|
||||
Err(anyhow::anyhow!("Error from task B"))
|
||||
})
|
||||
.await?;
|
||||
|
||||
workers
|
||||
.add(|_| async {
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
println!(" Task C: Failing after 200ms");
|
||||
Err(anyhow::anyhow!("Error from task C"))
|
||||
})
|
||||
.await?;
|
||||
|
||||
let result = workers.wait().await;
|
||||
|
||||
match result {
|
||||
Ok(_) => println!(" Unexpected success"),
|
||||
Err(e) => println!(" First error received: {}", e),
|
||||
}
|
||||
|
||||
println!();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn example_mixed_results() -> anyhow::Result<()> {
|
||||
println!("2. Mixed Success and Failure");
|
||||
println!("----------------------------");
|
||||
|
||||
let workers = Workers::new();
|
||||
|
||||
// Some tasks succeed
|
||||
for i in 0..3 {
|
||||
workers
|
||||
.add(move |cancel| async move {
|
||||
println!(" Success task {} started", i);
|
||||
|
||||
// Check for cancellation
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(Duration::from_millis(50)) => {
|
||||
println!(" Success task {} completed", i);
|
||||
Ok(())
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
println!(" Success task {} cancelled", i);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
// One task fails
|
||||
workers
|
||||
.add(|_| async {
|
||||
tokio::time::sleep(Duration::from_millis(150)).await;
|
||||
println!(" >>> Failure task triggering error");
|
||||
Err(anyhow::anyhow!("Intentional failure in mixed group"))
|
||||
})
|
||||
.await?;
|
||||
|
||||
// More successful tasks (that might get cancelled)
|
||||
for i in 3..6 {
|
||||
workers
|
||||
.add(move |cancel| async move {
|
||||
println!(" Late task {} started", i);
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(Duration::from_secs(1)) => {
|
||||
println!(" Late task {} completed (unlikely)", i);
|
||||
Ok(())
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
println!(" Late task {} cancelled", i);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
let result = workers.wait().await;
|
||||
|
||||
match result {
|
||||
Ok(_) => println!(" Unexpected success"),
|
||||
Err(e) => println!(" Error propagated: {}", e),
|
||||
}
|
||||
|
||||
println!();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn example_all_succeed() -> anyhow::Result<()> {
|
||||
println!("3. All Tasks Succeed");
|
||||
println!("--------------------");
|
||||
|
||||
let workers = Workers::new();
|
||||
|
||||
// All tasks complete successfully
|
||||
for i in 0..5 {
|
||||
workers
|
||||
.add(move |_| async move {
|
||||
println!(" Task {} processing...", i);
|
||||
tokio::time::sleep(Duration::from_millis(100 * (i as u64 + 1))).await;
|
||||
println!(" Task {} done", i);
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Should complete without error
|
||||
workers.wait().await?;
|
||||
println!(" All tasks completed successfully!");
|
||||
|
||||
println!();
|
||||
Ok(())
|
||||
}
|
143
crates/noworkers/examples/file_processor.rs
Normal file
143
crates/noworkers/examples/file_processor.rs
Normal file
@@ -0,0 +1,143 @@
|
||||
//! File processing example using noworkers.
|
||||
//!
|
||||
//! This example demonstrates processing multiple files concurrently,
|
||||
//! such as reading, transforming, and writing files in parallel.
|
||||
|
||||
use noworkers::Workers;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
println!("File Processing Example");
|
||||
println!("======================\n");
|
||||
|
||||
// Simulate a list of files to process
|
||||
let files = vec![
|
||||
"data/input1.txt",
|
||||
"data/input2.txt",
|
||||
"data/input3.txt",
|
||||
"data/input4.txt",
|
||||
"data/input5.txt",
|
||||
"data/input6.txt",
|
||||
"data/input7.txt",
|
||||
"data/input8.txt",
|
||||
];
|
||||
|
||||
println!("Processing {} files concurrently...\n", files.len());
|
||||
|
||||
let mut workers = Workers::new();
|
||||
|
||||
// Limit concurrent file operations to avoid file descriptor exhaustion
|
||||
workers.with_limit(4);
|
||||
|
||||
// Track processing statistics
|
||||
let stats = Arc::new(tokio::sync::Mutex::new(ProcessingStats::default()));
|
||||
let start = Instant::now();
|
||||
|
||||
for file_path in files {
|
||||
let stats = stats.clone();
|
||||
let path = file_path.to_string();
|
||||
|
||||
workers
|
||||
.add(move |cancel| async move {
|
||||
tokio::select! {
|
||||
result = process_file(&path) => {
|
||||
let mut s = stats.lock().await;
|
||||
|
||||
match result {
|
||||
Ok(bytes_processed) => {
|
||||
println!("✓ Processed {} ({} bytes)", path, bytes_processed);
|
||||
s.files_processed += 1;
|
||||
s.bytes_processed += bytes_processed;
|
||||
}
|
||||
Err(e) => {
|
||||
println!("✗ Failed to process {}: {}", path, e);
|
||||
s.files_failed += 1;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
println!("⚠ Cancelled processing of {}", path);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Wait for all file processing to complete
|
||||
workers.wait().await?;
|
||||
|
||||
let stats = stats.lock().await;
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
println!("\n========== Summary ==========");
|
||||
println!("Total time: {}ms", elapsed.as_millis());
|
||||
println!("Files processed: {}", stats.files_processed);
|
||||
println!("Files failed: {}", stats.files_failed);
|
||||
println!("Total bytes: {}", stats.bytes_processed);
|
||||
println!(
|
||||
"Throughput: {:.2} MB/s",
|
||||
(stats.bytes_processed as f64 / 1_000_000.0) / elapsed.as_secs_f64()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct ProcessingStats {
|
||||
files_processed: usize,
|
||||
files_failed: usize,
|
||||
bytes_processed: usize,
|
||||
}
|
||||
|
||||
/// Simulates processing a file
|
||||
async fn process_file(file_path: &str) -> anyhow::Result<usize> {
|
||||
// Simulate reading file
|
||||
let content = simulate_read_file(file_path).await?;
|
||||
|
||||
// Simulate processing (e.g., compression, encryption, transformation)
|
||||
let processed = simulate_transform(content).await?;
|
||||
|
||||
// Simulate writing output
|
||||
let output_path = PathBuf::from(file_path)
|
||||
.with_extension("processed");
|
||||
|
||||
simulate_write_file(&output_path, processed.clone()).await?;
|
||||
|
||||
Ok(processed.len())
|
||||
}
|
||||
|
||||
async fn simulate_read_file(path: &str) -> anyhow::Result<Vec<u8>> {
|
||||
// Simulate file I/O delay
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
|
||||
// Generate simulated file content
|
||||
let size = (path.len() * 1000) % 10000 + 1000;
|
||||
let content = vec![0u8; size];
|
||||
|
||||
Ok(content)
|
||||
}
|
||||
|
||||
async fn simulate_transform(data: Vec<u8>) -> anyhow::Result<Vec<u8>> {
|
||||
// Simulate CPU-intensive transformation
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
// Simulate compression (returns smaller data)
|
||||
let compressed_size = data.len() / 2;
|
||||
Ok(vec![1u8; compressed_size])
|
||||
}
|
||||
|
||||
async fn simulate_write_file(_path: &PathBuf, data: Vec<u8>) -> anyhow::Result<()> {
|
||||
// Simulate file I/O delay
|
||||
tokio::time::sleep(Duration::from_millis(30)).await;
|
||||
|
||||
// In a real implementation, you would write to disk here
|
||||
let _ = data; // Use the data to avoid warning
|
||||
|
||||
Ok(())
|
||||
}
|
237
crates/noworkers/examples/parallel_processing.rs
Normal file
237
crates/noworkers/examples/parallel_processing.rs
Normal file
@@ -0,0 +1,237 @@
|
||||
//! Parallel data processing pipeline using noworkers.
|
||||
//!
|
||||
//! This example shows how to process large datasets in parallel
|
||||
//! with controlled concurrency and progress tracking.
|
||||
|
||||
use noworkers::Workers;
|
||||
use noworkers::extensions::WithSysLimitCpus;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
println!("Parallel Processing Pipeline Example");
|
||||
println!("====================================\n");
|
||||
|
||||
// Simulate a large dataset
|
||||
let dataset: Vec<i32> = (1..=100).collect();
|
||||
let total_items = dataset.len();
|
||||
|
||||
println!("Processing {} items in parallel...\n", total_items);
|
||||
|
||||
// Example 1: Simple parallel map
|
||||
example_parallel_map(dataset.clone()).await?;
|
||||
|
||||
// Example 2: Pipeline with multiple stages
|
||||
example_pipeline(dataset.clone()).await?;
|
||||
|
||||
// Example 3: Batch processing
|
||||
example_batch_processing(dataset).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn example_parallel_map(dataset: Vec<i32>) -> anyhow::Result<()> {
|
||||
println!("1. Simple Parallel Map");
|
||||
println!("----------------------");
|
||||
|
||||
let mut workers = Workers::new();
|
||||
|
||||
// Use system CPU count for optimal parallelism
|
||||
workers.with_limit_to_system_cpus();
|
||||
|
||||
let results = Arc::new(tokio::sync::Mutex::new(Vec::new()));
|
||||
let processed = Arc::new(AtomicUsize::new(0));
|
||||
let total = dataset.len();
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
for item in dataset {
|
||||
let results = results.clone();
|
||||
let processed = processed.clone();
|
||||
|
||||
workers
|
||||
.add(move |_cancel| async move {
|
||||
// Simulate CPU-intensive processing
|
||||
let result = expensive_computation(item).await?;
|
||||
|
||||
// Store result
|
||||
let mut res = results.lock().await;
|
||||
res.push(result);
|
||||
|
||||
// Update progress
|
||||
let count = processed.fetch_add(1, Ordering::SeqCst) + 1;
|
||||
if count % 10 == 0 {
|
||||
println!(" Processed {}/{} items", count, total);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
workers.wait().await?;
|
||||
|
||||
let results = results.lock().await;
|
||||
let sum: i32 = results.iter().sum();
|
||||
|
||||
println!(" Completed in {}ms", start.elapsed().as_millis());
|
||||
println!(" Sum of results: {}\n", sum);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn example_pipeline(dataset: Vec<i32>) -> anyhow::Result<()> {
|
||||
println!("2. Multi-Stage Pipeline");
|
||||
println!("-----------------------");
|
||||
|
||||
// Stage 1: Transform
|
||||
let stage1_output = Arc::new(tokio::sync::Mutex::new(Vec::new()));
|
||||
|
||||
let mut stage1 = Workers::new();
|
||||
stage1.with_limit(4);
|
||||
|
||||
println!(" Stage 1: Transforming data...");
|
||||
for item in dataset {
|
||||
let output = stage1_output.clone();
|
||||
|
||||
stage1
|
||||
.add(move |_| async move {
|
||||
let transformed = item * 2;
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
|
||||
let mut out = output.lock().await;
|
||||
out.push(transformed);
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
stage1.wait().await?;
|
||||
|
||||
// Stage 2: Filter and aggregate
|
||||
let stage2_output = Arc::new(tokio::sync::Mutex::new(Vec::new()));
|
||||
|
||||
let mut stage2 = Workers::new();
|
||||
stage2.with_limit(2);
|
||||
|
||||
println!(" Stage 2: Filtering and aggregating...");
|
||||
let stage1_data = stage1_output.lock().await.clone();
|
||||
|
||||
for chunk in stage1_data.chunks(10) {
|
||||
let chunk = chunk.to_vec();
|
||||
let output = stage2_output.clone();
|
||||
|
||||
stage2
|
||||
.add(move |_| async move {
|
||||
// Filter even numbers and sum
|
||||
let filtered_sum: i32 = chunk.iter()
|
||||
.filter(|&&x| x % 2 == 0)
|
||||
.sum();
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(20)).await;
|
||||
|
||||
let mut out = output.lock().await;
|
||||
out.push(filtered_sum);
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
stage2.wait().await?;
|
||||
|
||||
let final_results = stage2_output.lock().await;
|
||||
let total: i32 = final_results.iter().sum();
|
||||
|
||||
println!(" Pipeline result: {}\n", total);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn example_batch_processing(dataset: Vec<i32>) -> anyhow::Result<()> {
|
||||
println!("3. Batch Processing");
|
||||
println!("-------------------");
|
||||
|
||||
let batch_size = 20;
|
||||
let batches: Vec<Vec<i32>> = dataset
|
||||
.chunks(batch_size)
|
||||
.map(|chunk| chunk.to_vec())
|
||||
.collect();
|
||||
|
||||
println!(" Processing {} batches of {} items each", batches.len(), batch_size);
|
||||
|
||||
let mut workers = Workers::new();
|
||||
workers.with_limit(3); // Process 3 batches concurrently
|
||||
|
||||
let results = Arc::new(tokio::sync::Mutex::new(Vec::new()));
|
||||
let start = Instant::now();
|
||||
|
||||
for (batch_idx, batch) in batches.into_iter().enumerate() {
|
||||
let results = results.clone();
|
||||
|
||||
workers
|
||||
.add(move |cancel| async move {
|
||||
println!(" Batch {} started", batch_idx);
|
||||
|
||||
// Process batch with cancellation support
|
||||
tokio::select! {
|
||||
batch_result = process_batch(batch, batch_idx) => {
|
||||
match batch_result {
|
||||
Ok(result) => {
|
||||
let mut res = results.lock().await;
|
||||
res.push(result);
|
||||
println!(" Batch {} completed", batch_idx);
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
println!(" Batch {} failed: {}", batch_idx, e);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
println!(" Batch {} cancelled", batch_idx);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
workers.wait().await?;
|
||||
|
||||
let results = results.lock().await;
|
||||
let total_processed: usize = results.iter().sum();
|
||||
|
||||
println!(" Processed {} items in {}ms\n", total_processed, start.elapsed().as_millis());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn expensive_computation(n: i32) -> anyhow::Result<i32> {
|
||||
// Simulate CPU-intensive work
|
||||
tokio::time::sleep(Duration::from_millis(5)).await;
|
||||
|
||||
// Some complex calculation
|
||||
let result = (n * n) + (n / 2) - 1;
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn process_batch(batch: Vec<i32>, _batch_idx: usize) -> anyhow::Result<usize> {
|
||||
// Simulate batch processing
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
// Process each item in the batch
|
||||
let processed_count = batch.len();
|
||||
|
||||
// In a real scenario, you might:
|
||||
// - Write to a database
|
||||
// - Send to an API
|
||||
// - Transform and save to files
|
||||
|
||||
Ok(processed_count)
|
||||
}
|
186
crates/noworkers/examples/retry_with_backoff.rs
Normal file
186
crates/noworkers/examples/retry_with_backoff.rs
Normal file
@@ -0,0 +1,186 @@
|
||||
//! Retry with exponential backoff example using noworkers.
|
||||
//!
|
||||
//! This example shows how to implement retry logic with exponential
|
||||
//! backoff for handling transient failures in distributed systems.
|
||||
|
||||
use noworkers::Workers;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::time::Duration;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
println!("Retry with Exponential Backoff Example");
|
||||
println!("======================================\n");
|
||||
|
||||
// Simulate multiple API endpoints to call
|
||||
let endpoints = vec![
|
||||
"https://api.example.com/endpoint1",
|
||||
"https://api.example.com/endpoint2",
|
||||
"https://api.example.com/endpoint3",
|
||||
"https://api.example.com/endpoint4",
|
||||
"https://api.example.com/endpoint5",
|
||||
];
|
||||
|
||||
let mut workers = Workers::new();
|
||||
|
||||
// Limit concurrent API calls
|
||||
workers.with_limit(3);
|
||||
|
||||
// Global request counter for simulation
|
||||
let request_counter = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
println!("Making API calls with retry logic...\n");
|
||||
|
||||
for endpoint in endpoints {
|
||||
let counter = request_counter.clone();
|
||||
let url = endpoint.to_string();
|
||||
|
||||
workers
|
||||
.add(move |cancel| async move {
|
||||
// Retry configuration
|
||||
let max_retries = 3;
|
||||
let initial_backoff = Duration::from_millis(100);
|
||||
let max_backoff = Duration::from_secs(5);
|
||||
|
||||
let mut attempt = 0;
|
||||
let mut backoff = initial_backoff;
|
||||
|
||||
loop {
|
||||
attempt += 1;
|
||||
|
||||
// Check for cancellation before each attempt
|
||||
if cancel.is_cancelled() {
|
||||
println!("⚠ {} - Cancelled before attempt {}", url, attempt);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
println!("→ {} - Attempt {}/{}", url, attempt, max_retries + 1);
|
||||
|
||||
match make_api_call(&url, &counter).await {
|
||||
Ok(response) => {
|
||||
println!("✓ {} - Success: {}", url, response);
|
||||
return Ok(());
|
||||
}
|
||||
Err(e) if attempt > max_retries => {
|
||||
println!("✗ {} - Failed after {} attempts: {}", url, attempt, e);
|
||||
return Err(anyhow::anyhow!("Max retries exceeded for {}: {}", url, e));
|
||||
}
|
||||
Err(e) => {
|
||||
println!("↻ {} - Attempt {} failed: {}", url, attempt, e);
|
||||
|
||||
// Check if error is retryable
|
||||
if !is_retryable_error(&e) {
|
||||
println!("✗ {} - Non-retryable error: {}", url, e);
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
// Wait with exponential backoff
|
||||
println!(" Waiting {}ms before retry...", backoff.as_millis());
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(backoff) => {
|
||||
// Calculate next backoff with jitter
|
||||
backoff = calculate_next_backoff(backoff, max_backoff);
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
println!("⚠ {} - Cancelled during backoff", url);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Wait for all API calls to complete
|
||||
let result = workers.wait().await;
|
||||
|
||||
println!("\n========== Summary ==========");
|
||||
match result {
|
||||
Ok(_) => println!("All API calls completed successfully!"),
|
||||
Err(e) => println!("Some API calls failed: {}", e),
|
||||
}
|
||||
|
||||
let total_requests = request_counter.load(Ordering::SeqCst);
|
||||
println!("Total requests made: {}", total_requests);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Simulates making an API call
|
||||
async fn make_api_call(url: &str, counter: &Arc<AtomicUsize>) -> anyhow::Result<String> {
|
||||
let request_num = counter.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
// Simulate network delay
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
|
||||
// Simulate different failure scenarios
|
||||
match request_num % 7 {
|
||||
0 => {
|
||||
// Simulate timeout
|
||||
Err(anyhow::anyhow!("Request timeout"))
|
||||
}
|
||||
1 | 2 => {
|
||||
// Simulate temporary server error
|
||||
Err(anyhow::anyhow!("503 Service Unavailable"))
|
||||
}
|
||||
3 if url.contains("endpoint3") => {
|
||||
// Simulate rate limiting for specific endpoint
|
||||
Err(anyhow::anyhow!("429 Too Many Requests"))
|
||||
}
|
||||
_ => {
|
||||
// Success
|
||||
Ok(format!("Response from {} (request #{})", url, request_num))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Determines if an error is retryable
|
||||
fn is_retryable_error(error: &anyhow::Error) -> bool {
|
||||
let error_str = error.to_string();
|
||||
|
||||
// List of retryable error patterns
|
||||
error_str.contains("timeout") ||
|
||||
error_str.contains("503") ||
|
||||
error_str.contains("429") ||
|
||||
error_str.contains("Service Unavailable") ||
|
||||
error_str.contains("Too Many Requests")
|
||||
}
|
||||
|
||||
/// Calculates the next backoff duration with jitter
|
||||
fn calculate_next_backoff(current: Duration, max: Duration) -> Duration {
|
||||
// Double the backoff
|
||||
let mut next = current * 2;
|
||||
|
||||
// Cap at maximum
|
||||
if next > max {
|
||||
next = max;
|
||||
}
|
||||
|
||||
// Add jitter (±10%)
|
||||
let jitter_range = next.as_millis() / 10;
|
||||
let jitter = (rand::random::<u64>() % (jitter_range as u64 * 2)) as i64 - jitter_range as i64;
|
||||
|
||||
let final_millis = (next.as_millis() as i64 + jitter).max(1) as u64;
|
||||
Duration::from_millis(final_millis)
|
||||
}
|
||||
|
||||
// Simple random number generator for jitter
|
||||
mod rand {
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
static SEED: AtomicU64 = AtomicU64::new(12345);
|
||||
|
||||
pub fn random<T>() -> T
|
||||
where
|
||||
T: From<u64>,
|
||||
{
|
||||
// Simple LCG for demonstration
|
||||
let old = SEED.fetch_add(1, Ordering::Relaxed);
|
||||
let new = (old.wrapping_mul(1103515245).wrapping_add(12345)) >> 16;
|
||||
T::from(new)
|
||||
}
|
||||
}
|
161
crates/noworkers/examples/web_scraper.rs
Normal file
161
crates/noworkers/examples/web_scraper.rs
Normal file
@@ -0,0 +1,161 @@
|
||||
//! Web scraper example using noworkers.
|
||||
//!
|
||||
//! This example demonstrates a real-world use case: scraping multiple
|
||||
//! URLs concurrently with rate limiting and error handling.
|
||||
|
||||
use noworkers::Workers;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
/// Simulated web page data
|
||||
struct WebPage {
|
||||
url: String,
|
||||
content: String,
|
||||
fetch_time: Duration,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
println!("Web Scraper Example");
|
||||
println!("==================\n");
|
||||
|
||||
let urls = vec![
|
||||
"https://example.com/page1",
|
||||
"https://example.com/page2",
|
||||
"https://example.com/page3",
|
||||
"https://example.com/page4",
|
||||
"https://example.com/page5",
|
||||
"https://example.com/page6",
|
||||
"https://example.com/page7",
|
||||
"https://example.com/page8",
|
||||
"https://example.com/page9",
|
||||
"https://example.com/page10",
|
||||
];
|
||||
|
||||
println!("Scraping {} URLs with rate limiting...\n", urls.len());
|
||||
|
||||
let mut workers = Workers::new();
|
||||
|
||||
// Limit concurrent requests to avoid overwhelming the server
|
||||
workers.with_limit(3);
|
||||
|
||||
// Add timeout for the entire scraping operation
|
||||
workers.with_cancel_task(async {
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
println!("⏰ Timeout reached - cancelling remaining requests");
|
||||
});
|
||||
|
||||
let start = Instant::now();
|
||||
let results = std::sync::Arc::new(tokio::sync::Mutex::new(Vec::new()));
|
||||
|
||||
for (idx, url) in urls.into_iter().enumerate() {
|
||||
let url = url.to_string();
|
||||
let results = results.clone();
|
||||
|
||||
workers
|
||||
.add(move |cancel| async move {
|
||||
// Simulate rate limiting with a small delay
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
tokio::select! {
|
||||
result = fetch_page(&url, idx) => {
|
||||
match result {
|
||||
Ok(page) => {
|
||||
println!(
|
||||
"✓ [{:>3}ms] Fetched {} ({}ms)",
|
||||
start.elapsed().as_millis(),
|
||||
page.url,
|
||||
page.fetch_time.as_millis()
|
||||
);
|
||||
|
||||
let mut res = results.lock().await;
|
||||
res.push(page);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
println!(
|
||||
"✗ [{:>3}ms] Failed to fetch {}: {}",
|
||||
start.elapsed().as_millis(),
|
||||
url,
|
||||
e
|
||||
);
|
||||
// In a real scraper, you might want to continue
|
||||
// despite individual failures
|
||||
Ok(()) // Continue scraping other pages
|
||||
|
||||
// Or propagate the error to stop all scraping:
|
||||
// Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
println!("⚠ Cancelled fetch for {}", url);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Wait for all scraping to complete
|
||||
let scrape_result = workers.wait().await;
|
||||
|
||||
println!("\n========== Results ==========");
|
||||
|
||||
match scrape_result {
|
||||
Ok(_) => {
|
||||
let results = results.lock().await;
|
||||
println!("Successfully scraped {} pages", results.len());
|
||||
println!("Total time: {}ms", start.elapsed().as_millis());
|
||||
|
||||
// Process results
|
||||
let total_content_size: usize = results.iter()
|
||||
.map(|p| p.content.len())
|
||||
.sum();
|
||||
|
||||
let avg_fetch_time: u128 = if !results.is_empty() {
|
||||
results.iter()
|
||||
.map(|p| p.fetch_time.as_millis())
|
||||
.sum::<u128>() / results.len() as u128
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
println!("Total content size: {} bytes", total_content_size);
|
||||
println!("Average fetch time: {}ms", avg_fetch_time);
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Scraping failed with error: {}", e);
|
||||
let results = results.lock().await;
|
||||
println!("Managed to scrape {} pages before failure", results.len());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Simulates fetching a web page
|
||||
async fn fetch_page(url: &str, idx: usize) -> anyhow::Result<WebPage> {
|
||||
let fetch_start = Instant::now();
|
||||
|
||||
// Simulate network delay (varies by page)
|
||||
let delay = 200 + (idx * 50) % 300;
|
||||
tokio::time::sleep(Duration::from_millis(delay as u64)).await;
|
||||
|
||||
// Simulate occasional failures
|
||||
if idx == 7 {
|
||||
return Err(anyhow::anyhow!("Connection timeout"));
|
||||
}
|
||||
|
||||
// Simulate fetching content
|
||||
let content = format!(
|
||||
"<!DOCTYPE html><html><body><h1>Page {}</h1><p>Content for {}</p></body></html>",
|
||||
idx, url
|
||||
);
|
||||
|
||||
Ok(WebPage {
|
||||
url: url.to_string(),
|
||||
content,
|
||||
fetch_time: fetch_start.elapsed(),
|
||||
})
|
||||
}
|
86
crates/noworkers/examples/with_limit.rs
Normal file
86
crates/noworkers/examples/with_limit.rs
Normal file
@@ -0,0 +1,86 @@
|
||||
//! Concurrency limiting with noworkers.
|
||||
//!
|
||||
//! This example shows how to limit the number of concurrent tasks
|
||||
//! to prevent resource exhaustion and control parallelism.
|
||||
|
||||
use noworkers::Workers;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::time::Instant;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
println!("Concurrency Limiting Example");
|
||||
println!("============================\n");
|
||||
|
||||
// Track concurrent task count
|
||||
let concurrent_count = Arc::new(AtomicUsize::new(0));
|
||||
let max_seen = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
let mut workers = Workers::new();
|
||||
|
||||
// Limit to 3 concurrent tasks
|
||||
let limit = 3;
|
||||
workers.with_limit(limit);
|
||||
println!("Concurrency limit set to: {}\n", limit);
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
// Spawn 10 tasks, but only 3 will run at a time
|
||||
for i in 0..10 {
|
||||
let count = concurrent_count.clone();
|
||||
let max = max_seen.clone();
|
||||
|
||||
workers
|
||||
.add(move |_cancel| async move {
|
||||
// Increment concurrent count
|
||||
let current = count.fetch_add(1, Ordering::SeqCst) + 1;
|
||||
|
||||
// Track maximum concurrency
|
||||
let mut max_val = max.load(Ordering::SeqCst);
|
||||
while current > max_val {
|
||||
match max.compare_exchange_weak(
|
||||
max_val,
|
||||
current,
|
||||
Ordering::SeqCst,
|
||||
Ordering::SeqCst,
|
||||
) {
|
||||
Ok(_) => break,
|
||||
Err(x) => max_val = x,
|
||||
}
|
||||
}
|
||||
|
||||
println!(
|
||||
"[{:>3}ms] Task {:>2} started (concurrent: {})",
|
||||
start.elapsed().as_millis(),
|
||||
i,
|
||||
current
|
||||
);
|
||||
|
||||
// Simulate work
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
|
||||
|
||||
// Decrement concurrent count
|
||||
let remaining = count.fetch_sub(1, Ordering::SeqCst) - 1;
|
||||
println!(
|
||||
"[{:>3}ms] Task {:>2} finished (remaining: {})",
|
||||
start.elapsed().as_millis(),
|
||||
i,
|
||||
remaining
|
||||
);
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
workers.wait().await?;
|
||||
|
||||
println!("\n========== Summary ==========");
|
||||
println!("Total time: {}ms", start.elapsed().as_millis());
|
||||
println!("Max concurrent tasks: {}", max_seen.load(Ordering::SeqCst));
|
||||
println!("Expected batches: {} (10 tasks / 3 limit)", (10 + limit - 1) / limit);
|
||||
println!("Expected time: ~{}ms (4 batches * 200ms)", ((10 + limit - 1) / limit) * 200);
|
||||
|
||||
Ok(())
|
||||
}
|
@@ -1,8 +1,254 @@
|
||||
//! # noworkers
|
||||
//!
|
||||
//! A small, ergonomic Rust crate for spawning and supervising groups of asynchronous "workers" on Tokio.
|
||||
//! Manage concurrent tasks with optional limits, cancellation, and first-error propagation.
|
||||
//!
|
||||
//! This crate is inspired by Go's [errgroup](https://pkg.go.dev/golang.org/x/sync/errgroup) package,
|
||||
//! providing similar functionality in an idiomatic Rust way.
|
||||
//!
|
||||
//! ## Overview
|
||||
//!
|
||||
//! `noworkers` provides a simple way to manage groups of concurrent async tasks with:
|
||||
//!
|
||||
//! * **Bounded or unbounded concurrency** - Control how many tasks run simultaneously
|
||||
//! * **Automatic cancellation** - First error cancels all remaining tasks
|
||||
//! * **Flexible cancellation strategies** - External tokens or task-driven cancellation
|
||||
//! * **Zero-cost abstractions** - Minimal overhead over raw tokio tasks
|
||||
//!
|
||||
//! ## Quick Start
|
||||
//!
|
||||
//! ```rust
|
||||
//! use noworkers::Workers;
|
||||
//!
|
||||
//! #[tokio::main]
|
||||
//! async fn main() -> anyhow::Result<()> {
|
||||
//! // Create a new worker group
|
||||
//! let mut workers = Workers::new();
|
||||
//!
|
||||
//! // Limit concurrent tasks to 5
|
||||
//! workers.with_limit(5);
|
||||
//!
|
||||
//! // Spawn 10 tasks
|
||||
//! for i in 0..10 {
|
||||
//! workers.add(move |_cancel| async move {
|
||||
//! println!("Task {i} running");
|
||||
//! tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
|
||||
//! Ok(())
|
||||
//! }).await?;
|
||||
//! }
|
||||
//!
|
||||
//! // Wait for all tasks to complete
|
||||
//! workers.wait().await?;
|
||||
//! Ok(())
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
//! ## Core Concepts
|
||||
//!
|
||||
//! ### Worker Groups
|
||||
//!
|
||||
//! A [`Workers`] instance represents a group of related async tasks that should be
|
||||
//! managed together. All tasks in a group share:
|
||||
//!
|
||||
//! * A common concurrency limit (if set)
|
||||
//! * A cancellation token hierarchy
|
||||
//! * First-error propagation semantics
|
||||
//!
|
||||
//! ### Error Handling
|
||||
//!
|
||||
//! The first task to return an error "wins" - its error is captured and all other
|
||||
//! tasks are immediately cancelled. This provides fail-fast semantics similar to
|
||||
//! Go's errgroup.
|
||||
//!
|
||||
//! ```rust
|
||||
//! use noworkers::Workers;
|
||||
//!
|
||||
//! # async fn example() -> anyhow::Result<()> {
|
||||
//! let workers = Workers::new();
|
||||
//!
|
||||
//! // This task will fail first
|
||||
//! workers.add(|_| async {
|
||||
//! tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
|
||||
//! Err(anyhow::anyhow!("task failed"))
|
||||
//! }).await?;
|
||||
//!
|
||||
//! // These tasks will be cancelled
|
||||
//! for i in 0..5 {
|
||||
//! workers.add(move |cancel| async move {
|
||||
//! // Check for cancellation
|
||||
//! tokio::select! {
|
||||
//! _ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => {
|
||||
//! Ok(())
|
||||
//! }
|
||||
//! _ = cancel.cancelled() => {
|
||||
//! println!("Task {i} cancelled");
|
||||
//! Ok(())
|
||||
//! }
|
||||
//! }
|
||||
//! }).await?;
|
||||
//! }
|
||||
//!
|
||||
//! // This will return the error from the first task
|
||||
//! let result = workers.wait().await;
|
||||
//! assert!(result.is_err());
|
||||
//! # Ok(())
|
||||
//! # }
|
||||
//! ```
|
||||
//!
|
||||
//! ### Concurrency Limits
|
||||
//!
|
||||
//! You can limit how many tasks run concurrently using [`Workers::with_limit`].
|
||||
//! When the limit is reached, new tasks will wait for a slot to become available.
|
||||
//!
|
||||
//! ```rust
|
||||
//! use noworkers::Workers;
|
||||
//! use std::sync::Arc;
|
||||
//! use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
//!
|
||||
//! # async fn example() -> anyhow::Result<()> {
|
||||
//! let mut workers = Workers::new();
|
||||
//! workers.with_limit(2); // Only 2 tasks run at once
|
||||
//!
|
||||
//! let concurrent_count = Arc::new(AtomicUsize::new(0));
|
||||
//!
|
||||
//! for i in 0..10 {
|
||||
//! let count = concurrent_count.clone();
|
||||
//! workers.add(move |_| async move {
|
||||
//! let current = count.fetch_add(1, Ordering::SeqCst) + 1;
|
||||
//! assert!(current <= 2, "Too many concurrent tasks!");
|
||||
//!
|
||||
//! tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
|
||||
//!
|
||||
//! count.fetch_sub(1, Ordering::SeqCst);
|
||||
//! Ok(())
|
||||
//! }).await?;
|
||||
//! }
|
||||
//!
|
||||
//! workers.wait().await?;
|
||||
//! # Ok(())
|
||||
//! # }
|
||||
//! ```
|
||||
//!
|
||||
//! ### Cancellation
|
||||
//!
|
||||
//! There are two ways to trigger cancellation:
|
||||
//!
|
||||
//! 1. **External cancellation** - Use an existing `CancellationToken`
|
||||
//! 2. **Task-driven cancellation** - A dedicated task that triggers cancellation when complete
|
||||
//!
|
||||
//! ```rust
|
||||
//! use noworkers::Workers;
|
||||
//! use tokio_util::sync::CancellationToken;
|
||||
//!
|
||||
//! # async fn example() -> anyhow::Result<()> {
|
||||
//! // External cancellation
|
||||
//! let mut workers = Workers::new();
|
||||
//! let cancel = CancellationToken::new();
|
||||
//! workers.with_cancel(&cancel);
|
||||
//!
|
||||
//! // Cancel after 1 second
|
||||
//! let cancel_clone = cancel.clone();
|
||||
//! tokio::spawn(async move {
|
||||
//! tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
||||
//! cancel_clone.cancel();
|
||||
//! });
|
||||
//!
|
||||
//! // Task-driven cancellation
|
||||
//! let mut workers2 = Workers::new();
|
||||
//! workers2.with_cancel_task(async {
|
||||
//! tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
||||
//! });
|
||||
//! # Ok(())
|
||||
//! # }
|
||||
//! ```
|
||||
//!
|
||||
//! ## Extensions
|
||||
//!
|
||||
//! The crate provides extension traits for common patterns:
|
||||
//!
|
||||
//! ```rust
|
||||
//! use noworkers::Workers;
|
||||
//! use noworkers::extensions::WithSysLimitCpus;
|
||||
//!
|
||||
//! # async fn example() -> anyhow::Result<()> {
|
||||
//! let mut workers = Workers::new();
|
||||
//! // Automatically limit to number of CPU cores
|
||||
//! workers.with_limit_to_system_cpus();
|
||||
//! # Ok(())
|
||||
//! # }
|
||||
//! ```
|
||||
//!
|
||||
//! ## Examples
|
||||
//!
|
||||
//! See the `examples/` directory for more complete examples:
|
||||
//!
|
||||
//! * `basic.rs` - Simple task spawning and waiting
|
||||
//! * `with_limit.rs` - Concurrency limiting
|
||||
//! * `cancellation.rs` - Cancellation patterns
|
||||
//! * `error_handling.rs` - Error propagation
|
||||
//! * `web_scraper.rs` - Real-world web scraping example
|
||||
//! * `parallel_processing.rs` - Data processing pipeline
|
||||
|
||||
use std::{future::Future, sync::Arc};
|
||||
|
||||
use tokio::{sync::Mutex, task::JoinHandle};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
/// Extension traits for common patterns.
|
||||
///
|
||||
/// This module provides convenient extension methods for [`Workers`] through traits.
|
||||
pub mod extensions {
|
||||
use crate::Workers;
|
||||
|
||||
/// Extension trait for CPU-based concurrency limits.
|
||||
///
|
||||
/// This trait provides a convenient way to limit worker concurrency based on
|
||||
/// the system's available CPU cores.
|
||||
pub trait WithSysLimitCpus {
|
||||
/// Limits the worker group to the number of available CPU cores.
|
||||
///
|
||||
/// This is a convenience method that automatically detects the number of
|
||||
/// logical CPU cores available and sets that as the concurrency limit.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// use noworkers::Workers;
|
||||
/// use noworkers::extensions::WithSysLimitCpus;
|
||||
///
|
||||
/// # async fn example() -> anyhow::Result<()> {
|
||||
/// let mut workers = Workers::new();
|
||||
/// // Automatically limit to system CPU count (e.g., 8 on an 8-core machine)
|
||||
/// workers.with_limit_to_system_cpus();
|
||||
///
|
||||
/// // Spawn many tasks, but only CPU count will run concurrently
|
||||
/// for i in 0..100 {
|
||||
/// workers.add(move |_| async move {
|
||||
/// // CPU-bound work here
|
||||
/// Ok(())
|
||||
/// }).await?;
|
||||
/// }
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the system's available parallelism cannot be determined.
|
||||
fn with_limit_to_system_cpus(&mut self) -> &mut Self;
|
||||
}
|
||||
|
||||
impl WithSysLimitCpus for Workers {
|
||||
fn with_limit_to_system_cpus(&mut self) -> &mut Self {
|
||||
self.with_limit(
|
||||
std::thread::available_parallelism()
|
||||
.expect("to be able to get system cpu info")
|
||||
.into(),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type ErrChan = Arc<
|
||||
Mutex<(
|
||||
Option<tokio::sync::oneshot::Sender<anyhow::Error>>,
|
||||
@@ -12,6 +258,50 @@ type ErrChan = Arc<
|
||||
|
||||
type JoinHandles = Arc<Mutex<Vec<JoinHandle<()>>>>;
|
||||
|
||||
/// A group of supervised async workers with optional concurrency limits.
|
||||
///
|
||||
/// `Workers` manages a collection of async tasks that run concurrently on Tokio.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// Basic usage:
|
||||
/// ```rust
|
||||
/// use noworkers::Workers;
|
||||
///
|
||||
/// # async fn example() -> anyhow::Result<()> {
|
||||
/// let workers = Workers::new();
|
||||
///
|
||||
/// // Spawn some work
|
||||
/// workers.add(|_| async {
|
||||
/// println!("Doing work!");
|
||||
/// Ok(())
|
||||
/// }).await?;
|
||||
///
|
||||
/// // Wait for completion
|
||||
/// workers.wait().await?;
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
///
|
||||
/// With concurrency limit:
|
||||
/// ```rust
|
||||
/// use noworkers::Workers;
|
||||
///
|
||||
/// # async fn example() -> anyhow::Result<()> {
|
||||
/// let mut workers = Workers::new();
|
||||
/// workers.with_limit(3); // Max 3 concurrent tasks
|
||||
///
|
||||
/// for i in 0..10 {
|
||||
/// workers.add(move |_| async move {
|
||||
/// println!("Task {i}");
|
||||
/// Ok(())
|
||||
/// }).await?;
|
||||
/// }
|
||||
///
|
||||
/// workers.wait().await?;
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
#[derive(Clone)]
|
||||
pub struct Workers {
|
||||
limit: WorkerLimit,
|
||||
@@ -56,6 +346,12 @@ impl WorkerLimit {
|
||||
}
|
||||
}
|
||||
|
||||
/// Guard that tracks active worker slots for concurrency limiting.
|
||||
///
|
||||
/// This guard is automatically created when a worker starts and dropped when it completes.
|
||||
/// When dropped, it signals that a worker slot is available for the next queued task.
|
||||
///
|
||||
/// This type is not directly constructible by users.
|
||||
pub struct WorkerGuard {
|
||||
limit: WorkerLimit,
|
||||
}
|
||||
@@ -78,6 +374,19 @@ impl Drop for WorkerGuard {
|
||||
}
|
||||
|
||||
impl Workers {
|
||||
/// Creates a new worker group with no concurrency limit.
|
||||
///
|
||||
/// The returned `Workers` instance can spawn unlimited concurrent tasks.
|
||||
/// Use [`with_limit`](Self::with_limit) to add a concurrency limit.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```rust
|
||||
/// use noworkers::Workers;
|
||||
///
|
||||
/// let workers = Workers::new();
|
||||
/// // Can spawn unlimited concurrent tasks
|
||||
/// ```
|
||||
pub fn new() -> Self {
|
||||
let once = tokio::sync::oneshot::channel();
|
||||
|
||||
@@ -89,13 +398,99 @@ impl Workers {
|
||||
}
|
||||
}
|
||||
|
||||
/// respects an external cancellation token, it is undefined behavior to use this with with_cancel_task, we will always only cancel a child token, i.e. never the external token
|
||||
/// Associates an external cancellation token with this worker group.
|
||||
///
|
||||
/// When the provided token is cancelled, all workers in this group will receive
|
||||
/// cancellation signals. The group creates a child token from the provided token,
|
||||
/// so cancelling the group won't affect the parent token.
|
||||
///
|
||||
/// # Important
|
||||
///
|
||||
/// Do not use both `with_cancel` and [`with_cancel_task`](Self::with_cancel_task) on the same
|
||||
/// worker group. This is undefined behavior.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```rust
|
||||
/// use noworkers::Workers;
|
||||
/// use tokio_util::sync::CancellationToken;
|
||||
///
|
||||
/// # async fn example() -> anyhow::Result<()> {
|
||||
/// let mut workers = Workers::new();
|
||||
/// let cancel = CancellationToken::new();
|
||||
///
|
||||
/// // Link workers to external cancellation
|
||||
/// workers.with_cancel(&cancel);
|
||||
///
|
||||
/// // Spawn workers that respect cancellation
|
||||
/// workers.add(|cancel| async move {
|
||||
/// tokio::select! {
|
||||
/// _ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => {
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// _ = cancel.cancelled() => {
|
||||
/// println!("Cancelled!");
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// }
|
||||
/// }).await?;
|
||||
///
|
||||
/// // Cancel from external source
|
||||
/// cancel.cancel();
|
||||
/// workers.wait().await?;
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn with_cancel(&mut self, cancel: &CancellationToken) -> &mut Self {
|
||||
self.cancellation = cancel.child_token();
|
||||
self
|
||||
}
|
||||
|
||||
/// with_cancel and with_cancel_task used together is considered undefined behavior, as we will cancel the external cancellation token on cancel_task completion
|
||||
/// Spawns a task that cancels the worker group when it completes.
|
||||
///
|
||||
/// This is useful for implementing timeouts or other cancellation conditions.
|
||||
/// When the provided future completes, all workers in the group receive
|
||||
/// cancellation signals.
|
||||
///
|
||||
/// # Important
|
||||
///
|
||||
/// Do not use both [`with_cancel`](Self::with_cancel) and `with_cancel_task` on the same
|
||||
/// worker group. This is undefined behavior.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// Timeout after 5 seconds:
|
||||
/// ```rust
|
||||
/// use noworkers::Workers;
|
||||
///
|
||||
/// # async fn example() -> anyhow::Result<()> {
|
||||
/// let mut workers = Workers::new();
|
||||
///
|
||||
/// // Cancel all workers after 5 seconds
|
||||
/// workers.with_cancel_task(async {
|
||||
/// tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
|
||||
/// });
|
||||
///
|
||||
/// // Spawn long-running workers
|
||||
/// for i in 0..10 {
|
||||
/// workers.add(move |cancel| async move {
|
||||
/// tokio::select! {
|
||||
/// _ = tokio::time::sleep(tokio::time::Duration::from_secs(60)) => {
|
||||
/// println!("Task {i} completed");
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// _ = cancel.cancelled() => {
|
||||
/// println!("Task {i} cancelled by timeout");
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// }
|
||||
/// }).await?;
|
||||
/// }
|
||||
///
|
||||
/// workers.wait().await?;
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn with_cancel_task<T>(&mut self, f: T) -> &mut Self
|
||||
where
|
||||
T: Future<Output = ()> + Send + 'static,
|
||||
@@ -109,7 +504,43 @@ impl Workers {
|
||||
self
|
||||
}
|
||||
|
||||
/// with_limit can be dangerous if used with an external cancel, because we still queue work after the cancel, it doesn't guarantee that everyone respects said channel, and closes down in a timely manner. Work will still be queued after the cancel, it is up to the provided worker function to respect when cancel is called
|
||||
/// Sets a concurrency limit for this worker group.
|
||||
///
|
||||
/// When the limit is reached, calls to [`add`](Self::add) will wait until a slot
|
||||
/// becomes available. This provides backpressure and prevents unbounded task spawning.
|
||||
///
|
||||
/// # Important
|
||||
///
|
||||
/// When using with cancellation, workers should check their cancellation tokens.
|
||||
/// Even after cancellation, queued tasks will still be scheduled (though they should
|
||||
/// exit quickly if they respect cancellation).
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```rust
|
||||
/// use noworkers::Workers;
|
||||
/// use std::time::Instant;
|
||||
///
|
||||
/// # async fn example() -> anyhow::Result<()> {
|
||||
/// let mut workers = Workers::new();
|
||||
/// workers.with_limit(2); // Only 2 tasks run concurrently
|
||||
///
|
||||
/// let start = Instant::now();
|
||||
///
|
||||
/// // Spawn 4 tasks that each take 100ms
|
||||
/// for i in 0..4 {
|
||||
/// workers.add(move |_| async move {
|
||||
/// println!("Task {i} started at {:?}", start.elapsed());
|
||||
/// tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
|
||||
/// Ok(())
|
||||
/// }).await?;
|
||||
/// }
|
||||
///
|
||||
/// workers.wait().await?;
|
||||
/// // Total time will be ~200ms (2 batches of 2 tasks)
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn with_limit(&mut self, limit: usize) -> &mut Self {
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(limit);
|
||||
|
||||
@@ -120,8 +551,70 @@ impl Workers {
|
||||
self
|
||||
}
|
||||
|
||||
/// Add
|
||||
/// Note: Add is immediate, this means your future will be polled immediately, no matter if wait has been called or not. Wait is just for waiting for completion, as well as receiving errors
|
||||
/// Spawns a new worker task in this group.
|
||||
///
|
||||
/// The provided closure receives a [`CancellationToken`] that will be triggered if:
|
||||
/// - Another worker returns an error
|
||||
/// - The group's cancellation token is triggered
|
||||
/// - [`wait`](Self::wait) completes
|
||||
///
|
||||
/// # Immediate Execution
|
||||
///
|
||||
/// The task starts executing immediately when `add` is called, not when `wait` is called.
|
||||
/// The `wait` method is only for awaiting completion and collecting errors.
|
||||
///
|
||||
/// # Backpressure
|
||||
///
|
||||
/// If a concurrency limit is set via [`with_limit`](Self::with_limit), this method
|
||||
/// will wait until a slot is available before spawning the task.
|
||||
///
|
||||
/// # Error Propagation
|
||||
///
|
||||
/// If the worker returns an error:
|
||||
/// 1. All other workers receive cancellation signals
|
||||
/// 2. The error is stored to be returned by [`wait`](Self::wait)
|
||||
/// 3. Subsequent errors are ignored (first error wins)
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// Basic task:
|
||||
/// ```rust
|
||||
/// use noworkers::Workers;
|
||||
///
|
||||
/// # async fn example() -> anyhow::Result<()> {
|
||||
/// let workers = Workers::new();
|
||||
///
|
||||
/// workers.add(|_cancel| async {
|
||||
/// println!("Doing work");
|
||||
/// Ok(())
|
||||
/// }).await?;
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
///
|
||||
/// Task with cancellation handling:
|
||||
/// ```rust
|
||||
/// use noworkers::Workers;
|
||||
///
|
||||
/// # async fn example() -> anyhow::Result<()> {
|
||||
/// let workers = Workers::new();
|
||||
///
|
||||
/// workers.add(|cancel| async move {
|
||||
/// tokio::select! {
|
||||
/// result = do_work() => {
|
||||
/// result
|
||||
/// }
|
||||
/// _ = cancel.cancelled() => {
|
||||
/// println!("Cancelled");
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// }
|
||||
/// }).await?;
|
||||
///
|
||||
/// # async fn do_work() -> anyhow::Result<()> { Ok(()) }
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub async fn add<T, TFut>(&self, f: T) -> anyhow::Result<()>
|
||||
where
|
||||
T: FnOnce(CancellationToken) -> TFut + Send + 'static,
|
||||
@@ -153,6 +646,61 @@ impl Workers {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Waits for all workers to complete and returns the first error (if any).
|
||||
///
|
||||
/// This method:
|
||||
/// 1. Waits for all spawned tasks to complete
|
||||
/// 2. Cancels any remaining tasks if one fails
|
||||
/// 3. Returns the first error encountered, or `Ok(())` if all succeed
|
||||
///
|
||||
/// # Consumption
|
||||
///
|
||||
/// This method consumes the `Workers` instance. After calling `wait`,
|
||||
/// you cannot add more tasks to this group.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// Success case:
|
||||
/// ```rust
|
||||
/// use noworkers::Workers;
|
||||
///
|
||||
/// # async fn example() -> anyhow::Result<()> {
|
||||
/// let workers = Workers::new();
|
||||
///
|
||||
/// for i in 0..5 {
|
||||
/// workers.add(move |_| async move {
|
||||
/// println!("Task {i}");
|
||||
/// Ok(())
|
||||
/// }).await?;
|
||||
/// }
|
||||
///
|
||||
/// // Blocks until all complete
|
||||
/// workers.wait().await?;
|
||||
/// println!("All done!");
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
///
|
||||
/// Error case:
|
||||
/// ```rust
|
||||
/// use noworkers::Workers;
|
||||
///
|
||||
/// # async fn example() {
|
||||
/// let workers = Workers::new();
|
||||
///
|
||||
/// workers.add(|_| async {
|
||||
/// Err(anyhow::anyhow!("Task failed"))
|
||||
/// }).await.unwrap();
|
||||
///
|
||||
/// workers.add(|_| async {
|
||||
/// Ok(()) // This might be cancelled
|
||||
/// }).await.unwrap();
|
||||
///
|
||||
/// // Returns the error from the first task
|
||||
/// let result = workers.wait().await;
|
||||
/// assert!(result.is_err());
|
||||
/// # }
|
||||
/// ```
|
||||
pub async fn wait(self) -> anyhow::Result<()> {
|
||||
{
|
||||
let mut handles = self.handles.lock().await;
|
||||
|
@@ -1,6 +1,6 @@
|
||||
# yaml-language-server: $schema=https://git.front.kjuulh.io/kjuulh/cuddle/raw/branch/main/schemas/base.json
|
||||
# yaml-language-server: $schema=https://git.kjuulh.io/kjuulh/cuddle/raw/branch/main/schemas/base.json
|
||||
|
||||
base: "git@git.front.kjuulh.io:kjuulh/cuddle-rust-lib-plan.git"
|
||||
base: "git@git.kjuulh.io:kjuulh/cuddle-rust-lib-plan.git"
|
||||
|
||||
vars:
|
||||
service: "noworkers"
|
||||
@@ -12,6 +12,6 @@ please:
|
||||
repository: "noworkers"
|
||||
branch: main
|
||||
settings:
|
||||
api_url: "https://git.front.kjuulh.io"
|
||||
api_url: "https://git.kjuulh.io"
|
||||
actions:
|
||||
rust:
|
||||
|
3
renovate.json
Normal file
3
renovate.json
Normal file
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
|
||||
}
|
Reference in New Issue
Block a user