1 Commits

Author SHA1 Message Date
dcb8b8cad2 Add renovate.json
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2025-07-02 00:53:28 +00:00
15 changed files with 12 additions and 1742 deletions

View File

@@ -6,17 +6,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.1.0] - 2025-08-08
### Added
- add docs and examples
## [0.0.4] - 2025-08-08
### Other
- Configure Renovate (#3)
Add renovate.json
## [0.0.1] - 2025-07-01
### Added

2
Cargo.lock generated
View File

@@ -88,7 +88,7 @@ dependencies = [
[[package]]
name = "noworkers"
version = "0.0.4"
version = "0.0.1"
dependencies = [
"anyhow",
"tokio",

View File

@@ -3,7 +3,7 @@ members = ["crates/*"]
resolver = "2"
[workspace.package]
version = "0.1.0"
version = "0.0.1"
license = "MIT"
[workspace.dependencies]

View File

@@ -110,6 +110,6 @@ See [LICENSE-MIT](LICENSE-MIT) and [LICENSE-APACHE](LICENSE-APACHE) for details.
## Contribute
Simply create an issue here or pr https://github.com/kjuulh/noworkers.git, development happens publicly at: https://git.kjuulh.io/kjuulh/noworkers.
Simply create an issue here or pr https://github.com/kjuulh/noworkers.git, development happens publicly at: https://git.front.kjuulh.io/kjuulh/noworkers.

View File

@@ -4,7 +4,7 @@ edition = "2024"
readme = "../../README.md"
version.workspace = true
license.workspace = true
repository = "https://git.kjuulh.io/kjuulh/noworkers"
repository = "https://git.front.kjuulh.io/kjuulh/noworkers"
authors = ["kjuulh <contact@kasperhermansen.com>"]
description = "A small asyncronous worker pool manages thread pool limiting, cancellation and error propogation, inspired by golangs errgroup (requires tokio)"
@@ -13,6 +13,3 @@ anyhow.workspace = true
tokio.workspace = true
tokio-util = "0.7.15"
tracing.workspace = true
[dev-dependencies]
tokio = { workspace = true, features = ["full"] }

View File

@@ -1,39 +0,0 @@
//! Basic usage of the noworkers library.
//!
//! This example demonstrates the simplest way to use noworkers to spawn
//! and manage concurrent async tasks.
use noworkers::Workers;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
println!("Basic noworkers example");
println!("=======================\n");
// Create a new worker group
let workers = Workers::new();
// Spawn several async tasks
for i in 0..5 {
workers
.add(move |_cancel| async move {
println!("Task {} starting...", i);
// Simulate some async work
tokio::time::sleep(tokio::time::Duration::from_millis(100 * (i as u64 + 1))).await;
println!("Task {} completed!", i);
Ok(())
})
.await?;
}
println!("\nAll tasks spawned, waiting for completion...\n");
// Wait for all tasks to complete
workers.wait().await?;
println!("\nAll tasks completed successfully!");
Ok(())
}

View File

@@ -1,157 +0,0 @@
//! Cancellation patterns with noworkers.
//!
//! This example demonstrates different ways to cancel running tasks:
//! - External cancellation token
//! - Task-driven cancellation (timeout)
//! - Error-triggered cancellation
use noworkers::Workers;
use tokio_util::sync::CancellationToken;
use std::time::Duration;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
println!("Cancellation Patterns Example");
println!("=============================\n");
// Example 1: External cancellation token
example_external_cancellation().await?;
// Example 2: Task-driven cancellation (timeout)
example_timeout_cancellation().await?;
// Example 3: Error-triggered cancellation
let _ = example_error_cancellation().await;
Ok(())
}
async fn example_external_cancellation() -> anyhow::Result<()> {
println!("1. External Cancellation Token");
println!("------------------------------");
let mut workers = Workers::new();
let cancel_token = CancellationToken::new();
// Link workers to external cancellation token
workers.with_cancel(&cancel_token);
// Spawn tasks that respect cancellation
for i in 0..5 {
workers
.add(move |cancel| async move {
println!(" Task {} started", i);
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(10)) => {
println!(" Task {} completed normally", i);
Ok(())
}
_ = cancel.cancelled() => {
println!(" Task {} cancelled!", i);
Ok(())
}
}
})
.await?;
}
// Cancel after 500ms
let cancel_clone = cancel_token.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(500)).await;
println!(" >>> Triggering external cancellation");
cancel_clone.cancel();
});
workers.wait().await?;
println!();
Ok(())
}
async fn example_timeout_cancellation() -> anyhow::Result<()> {
println!("2. Task-Driven Cancellation (Timeout)");
println!("-------------------------------------");
let mut workers = Workers::new();
// Set up a timeout task that cancels after 1 second
workers.with_cancel_task(async {
tokio::time::sleep(Duration::from_secs(1)).await;
println!(" >>> Timeout reached, cancelling all tasks");
});
// Spawn long-running tasks
for i in 0..3 {
workers
.add(move |cancel| async move {
println!(" Long task {} started", i);
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(60)) => {
println!(" Long task {} completed (shouldn't happen)", i);
Ok(())
}
_ = cancel.cancelled() => {
println!(" Long task {} cancelled by timeout", i);
Ok(())
}
}
})
.await?;
}
workers.wait().await?;
println!();
Ok(())
}
async fn example_error_cancellation() -> anyhow::Result<()> {
println!("3. Error-Triggered Cancellation");
println!("-------------------------------");
let workers = Workers::new();
// Task that will fail and trigger cancellation
workers
.add(|_| async {
println!(" Failing task started");
tokio::time::sleep(Duration::from_millis(200)).await;
println!(" >>> Task failing with error!");
Err(anyhow::anyhow!("Intentional failure"))
})
.await?;
// Tasks that will be cancelled due to the error
for i in 0..3 {
workers
.add(move |cancel| async move {
println!(" Normal task {} started", i);
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(10)) => {
println!(" Normal task {} completed (shouldn't happen)", i);
Ok(())
}
_ = cancel.cancelled() => {
println!(" Normal task {} cancelled due to error", i);
Ok(())
}
}
})
.await?;
}
// This will return the error from the failing task
let result = workers.wait().await;
match result {
Ok(_) => println!(" Unexpected success"),
Err(e) => println!(" Expected error caught: {}", e),
}
println!();
Ok(())
}

View File

@@ -1,159 +0,0 @@
//! Error handling and propagation with noworkers.
//!
//! This example demonstrates how noworkers handles errors with
//! first-error-wins semantics, similar to Go's errgroup.
use noworkers::Workers;
use std::time::Duration;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
println!("Error Handling Example");
println!("=====================\n");
// Example 1: First error wins
let _ = example_first_error_wins().await;
// Example 2: Mixed success and failure
let _ = example_mixed_results().await;
// Example 3: All tasks succeed
example_all_succeed().await?;
Ok(())
}
async fn example_first_error_wins() -> anyhow::Result<()> {
println!("1. First Error Wins");
println!("-------------------");
let workers = Workers::new();
// Spawn multiple tasks that will fail at different times
workers
.add(|_| async {
tokio::time::sleep(Duration::from_millis(300)).await;
println!(" Task A: Failing after 300ms");
Err(anyhow::anyhow!("Error from task A"))
})
.await?;
workers
.add(|_| async {
tokio::time::sleep(Duration::from_millis(100)).await;
println!(" Task B: Failing after 100ms (this should win)");
Err(anyhow::anyhow!("Error from task B"))
})
.await?;
workers
.add(|_| async {
tokio::time::sleep(Duration::from_millis(200)).await;
println!(" Task C: Failing after 200ms");
Err(anyhow::anyhow!("Error from task C"))
})
.await?;
let result = workers.wait().await;
match result {
Ok(_) => println!(" Unexpected success"),
Err(e) => println!(" First error received: {}", e),
}
println!();
Ok(())
}
async fn example_mixed_results() -> anyhow::Result<()> {
println!("2. Mixed Success and Failure");
println!("----------------------------");
let workers = Workers::new();
// Some tasks succeed
for i in 0..3 {
workers
.add(move |cancel| async move {
println!(" Success task {} started", i);
// Check for cancellation
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(50)) => {
println!(" Success task {} completed", i);
Ok(())
}
_ = cancel.cancelled() => {
println!(" Success task {} cancelled", i);
Ok(())
}
}
})
.await?;
}
// One task fails
workers
.add(|_| async {
tokio::time::sleep(Duration::from_millis(150)).await;
println!(" >>> Failure task triggering error");
Err(anyhow::anyhow!("Intentional failure in mixed group"))
})
.await?;
// More successful tasks (that might get cancelled)
for i in 3..6 {
workers
.add(move |cancel| async move {
println!(" Late task {} started", i);
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(1)) => {
println!(" Late task {} completed (unlikely)", i);
Ok(())
}
_ = cancel.cancelled() => {
println!(" Late task {} cancelled", i);
Ok(())
}
}
})
.await?;
}
let result = workers.wait().await;
match result {
Ok(_) => println!(" Unexpected success"),
Err(e) => println!(" Error propagated: {}", e),
}
println!();
Ok(())
}
async fn example_all_succeed() -> anyhow::Result<()> {
println!("3. All Tasks Succeed");
println!("--------------------");
let workers = Workers::new();
// All tasks complete successfully
for i in 0..5 {
workers
.add(move |_| async move {
println!(" Task {} processing...", i);
tokio::time::sleep(Duration::from_millis(100 * (i as u64 + 1))).await;
println!(" Task {} done", i);
Ok(())
})
.await?;
}
// Should complete without error
workers.wait().await?;
println!(" All tasks completed successfully!");
println!();
Ok(())
}

View File

@@ -1,143 +0,0 @@
//! File processing example using noworkers.
//!
//! This example demonstrates processing multiple files concurrently,
//! such as reading, transforming, and writing files in parallel.
use noworkers::Workers;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
println!("File Processing Example");
println!("======================\n");
// Simulate a list of files to process
let files = vec![
"data/input1.txt",
"data/input2.txt",
"data/input3.txt",
"data/input4.txt",
"data/input5.txt",
"data/input6.txt",
"data/input7.txt",
"data/input8.txt",
];
println!("Processing {} files concurrently...\n", files.len());
let mut workers = Workers::new();
// Limit concurrent file operations to avoid file descriptor exhaustion
workers.with_limit(4);
// Track processing statistics
let stats = Arc::new(tokio::sync::Mutex::new(ProcessingStats::default()));
let start = Instant::now();
for file_path in files {
let stats = stats.clone();
let path = file_path.to_string();
workers
.add(move |cancel| async move {
tokio::select! {
result = process_file(&path) => {
let mut s = stats.lock().await;
match result {
Ok(bytes_processed) => {
println!("✓ Processed {} ({} bytes)", path, bytes_processed);
s.files_processed += 1;
s.bytes_processed += bytes_processed;
}
Err(e) => {
println!("✗ Failed to process {}: {}", path, e);
s.files_failed += 1;
}
}
Ok(())
}
_ = cancel.cancelled() => {
println!("⚠ Cancelled processing of {}", path);
Ok(())
}
}
})
.await?;
}
// Wait for all file processing to complete
workers.wait().await?;
let stats = stats.lock().await;
let elapsed = start.elapsed();
println!("\n========== Summary ==========");
println!("Total time: {}ms", elapsed.as_millis());
println!("Files processed: {}", stats.files_processed);
println!("Files failed: {}", stats.files_failed);
println!("Total bytes: {}", stats.bytes_processed);
println!(
"Throughput: {:.2} MB/s",
(stats.bytes_processed as f64 / 1_000_000.0) / elapsed.as_secs_f64()
);
Ok(())
}
#[derive(Default)]
struct ProcessingStats {
files_processed: usize,
files_failed: usize,
bytes_processed: usize,
}
/// Simulates processing a file
async fn process_file(file_path: &str) -> anyhow::Result<usize> {
// Simulate reading file
let content = simulate_read_file(file_path).await?;
// Simulate processing (e.g., compression, encryption, transformation)
let processed = simulate_transform(content).await?;
// Simulate writing output
let output_path = PathBuf::from(file_path)
.with_extension("processed");
simulate_write_file(&output_path, processed.clone()).await?;
Ok(processed.len())
}
async fn simulate_read_file(path: &str) -> anyhow::Result<Vec<u8>> {
// Simulate file I/O delay
tokio::time::sleep(Duration::from_millis(50)).await;
// Generate simulated file content
let size = (path.len() * 1000) % 10000 + 1000;
let content = vec![0u8; size];
Ok(content)
}
async fn simulate_transform(data: Vec<u8>) -> anyhow::Result<Vec<u8>> {
// Simulate CPU-intensive transformation
tokio::time::sleep(Duration::from_millis(100)).await;
// Simulate compression (returns smaller data)
let compressed_size = data.len() / 2;
Ok(vec![1u8; compressed_size])
}
async fn simulate_write_file(_path: &PathBuf, data: Vec<u8>) -> anyhow::Result<()> {
// Simulate file I/O delay
tokio::time::sleep(Duration::from_millis(30)).await;
// In a real implementation, you would write to disk here
let _ = data; // Use the data to avoid warning
Ok(())
}

View File

@@ -1,237 +0,0 @@
//! Parallel data processing pipeline using noworkers.
//!
//! This example shows how to process large datasets in parallel
//! with controlled concurrency and progress tracking.
use noworkers::Workers;
use noworkers::extensions::WithSysLimitCpus;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
println!("Parallel Processing Pipeline Example");
println!("====================================\n");
// Simulate a large dataset
let dataset: Vec<i32> = (1..=100).collect();
let total_items = dataset.len();
println!("Processing {} items in parallel...\n", total_items);
// Example 1: Simple parallel map
example_parallel_map(dataset.clone()).await?;
// Example 2: Pipeline with multiple stages
example_pipeline(dataset.clone()).await?;
// Example 3: Batch processing
example_batch_processing(dataset).await?;
Ok(())
}
async fn example_parallel_map(dataset: Vec<i32>) -> anyhow::Result<()> {
println!("1. Simple Parallel Map");
println!("----------------------");
let mut workers = Workers::new();
// Use system CPU count for optimal parallelism
workers.with_limit_to_system_cpus();
let results = Arc::new(tokio::sync::Mutex::new(Vec::new()));
let processed = Arc::new(AtomicUsize::new(0));
let total = dataset.len();
let start = Instant::now();
for item in dataset {
let results = results.clone();
let processed = processed.clone();
workers
.add(move |_cancel| async move {
// Simulate CPU-intensive processing
let result = expensive_computation(item).await?;
// Store result
let mut res = results.lock().await;
res.push(result);
// Update progress
let count = processed.fetch_add(1, Ordering::SeqCst) + 1;
if count % 10 == 0 {
println!(" Processed {}/{} items", count, total);
}
Ok(())
})
.await?;
}
workers.wait().await?;
let results = results.lock().await;
let sum: i32 = results.iter().sum();
println!(" Completed in {}ms", start.elapsed().as_millis());
println!(" Sum of results: {}\n", sum);
Ok(())
}
async fn example_pipeline(dataset: Vec<i32>) -> anyhow::Result<()> {
println!("2. Multi-Stage Pipeline");
println!("-----------------------");
// Stage 1: Transform
let stage1_output = Arc::new(tokio::sync::Mutex::new(Vec::new()));
let mut stage1 = Workers::new();
stage1.with_limit(4);
println!(" Stage 1: Transforming data...");
for item in dataset {
let output = stage1_output.clone();
stage1
.add(move |_| async move {
let transformed = item * 2;
tokio::time::sleep(Duration::from_millis(10)).await;
let mut out = output.lock().await;
out.push(transformed);
Ok(())
})
.await?;
}
stage1.wait().await?;
// Stage 2: Filter and aggregate
let stage2_output = Arc::new(tokio::sync::Mutex::new(Vec::new()));
let mut stage2 = Workers::new();
stage2.with_limit(2);
println!(" Stage 2: Filtering and aggregating...");
let stage1_data = stage1_output.lock().await.clone();
for chunk in stage1_data.chunks(10) {
let chunk = chunk.to_vec();
let output = stage2_output.clone();
stage2
.add(move |_| async move {
// Filter even numbers and sum
let filtered_sum: i32 = chunk.iter()
.filter(|&&x| x % 2 == 0)
.sum();
tokio::time::sleep(Duration::from_millis(20)).await;
let mut out = output.lock().await;
out.push(filtered_sum);
Ok(())
})
.await?;
}
stage2.wait().await?;
let final_results = stage2_output.lock().await;
let total: i32 = final_results.iter().sum();
println!(" Pipeline result: {}\n", total);
Ok(())
}
async fn example_batch_processing(dataset: Vec<i32>) -> anyhow::Result<()> {
println!("3. Batch Processing");
println!("-------------------");
let batch_size = 20;
let batches: Vec<Vec<i32>> = dataset
.chunks(batch_size)
.map(|chunk| chunk.to_vec())
.collect();
println!(" Processing {} batches of {} items each", batches.len(), batch_size);
let mut workers = Workers::new();
workers.with_limit(3); // Process 3 batches concurrently
let results = Arc::new(tokio::sync::Mutex::new(Vec::new()));
let start = Instant::now();
for (batch_idx, batch) in batches.into_iter().enumerate() {
let results = results.clone();
workers
.add(move |cancel| async move {
println!(" Batch {} started", batch_idx);
// Process batch with cancellation support
tokio::select! {
batch_result = process_batch(batch, batch_idx) => {
match batch_result {
Ok(result) => {
let mut res = results.lock().await;
res.push(result);
println!(" Batch {} completed", batch_idx);
Ok(())
}
Err(e) => {
println!(" Batch {} failed: {}", batch_idx, e);
Err(e)
}
}
}
_ = cancel.cancelled() => {
println!(" Batch {} cancelled", batch_idx);
Ok(())
}
}
})
.await?;
}
workers.wait().await?;
let results = results.lock().await;
let total_processed: usize = results.iter().sum();
println!(" Processed {} items in {}ms\n", total_processed, start.elapsed().as_millis());
Ok(())
}
async fn expensive_computation(n: i32) -> anyhow::Result<i32> {
// Simulate CPU-intensive work
tokio::time::sleep(Duration::from_millis(5)).await;
// Some complex calculation
let result = (n * n) + (n / 2) - 1;
Ok(result)
}
async fn process_batch(batch: Vec<i32>, _batch_idx: usize) -> anyhow::Result<usize> {
// Simulate batch processing
tokio::time::sleep(Duration::from_millis(100)).await;
// Process each item in the batch
let processed_count = batch.len();
// In a real scenario, you might:
// - Write to a database
// - Send to an API
// - Transform and save to files
Ok(processed_count)
}

View File

@@ -1,186 +0,0 @@
//! Retry with exponential backoff example using noworkers.
//!
//! This example shows how to implement retry logic with exponential
//! backoff for handling transient failures in distributed systems.
use noworkers::Workers;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
println!("Retry with Exponential Backoff Example");
println!("======================================\n");
// Simulate multiple API endpoints to call
let endpoints = vec![
"https://api.example.com/endpoint1",
"https://api.example.com/endpoint2",
"https://api.example.com/endpoint3",
"https://api.example.com/endpoint4",
"https://api.example.com/endpoint5",
];
let mut workers = Workers::new();
// Limit concurrent API calls
workers.with_limit(3);
// Global request counter for simulation
let request_counter = Arc::new(AtomicUsize::new(0));
println!("Making API calls with retry logic...\n");
for endpoint in endpoints {
let counter = request_counter.clone();
let url = endpoint.to_string();
workers
.add(move |cancel| async move {
// Retry configuration
let max_retries = 3;
let initial_backoff = Duration::from_millis(100);
let max_backoff = Duration::from_secs(5);
let mut attempt = 0;
let mut backoff = initial_backoff;
loop {
attempt += 1;
// Check for cancellation before each attempt
if cancel.is_cancelled() {
println!("{} - Cancelled before attempt {}", url, attempt);
return Ok(());
}
println!("{} - Attempt {}/{}", url, attempt, max_retries + 1);
match make_api_call(&url, &counter).await {
Ok(response) => {
println!("{} - Success: {}", url, response);
return Ok(());
}
Err(e) if attempt > max_retries => {
println!("{} - Failed after {} attempts: {}", url, attempt, e);
return Err(anyhow::anyhow!("Max retries exceeded for {}: {}", url, e));
}
Err(e) => {
println!("{} - Attempt {} failed: {}", url, attempt, e);
// Check if error is retryable
if !is_retryable_error(&e) {
println!("{} - Non-retryable error: {}", url, e);
return Err(e);
}
// Wait with exponential backoff
println!(" Waiting {}ms before retry...", backoff.as_millis());
tokio::select! {
_ = tokio::time::sleep(backoff) => {
// Calculate next backoff with jitter
backoff = calculate_next_backoff(backoff, max_backoff);
}
_ = cancel.cancelled() => {
println!("{} - Cancelled during backoff", url);
return Ok(());
}
}
}
}
}
})
.await?;
}
// Wait for all API calls to complete
let result = workers.wait().await;
println!("\n========== Summary ==========");
match result {
Ok(_) => println!("All API calls completed successfully!"),
Err(e) => println!("Some API calls failed: {}", e),
}
let total_requests = request_counter.load(Ordering::SeqCst);
println!("Total requests made: {}", total_requests);
Ok(())
}
/// Simulates making an API call
async fn make_api_call(url: &str, counter: &Arc<AtomicUsize>) -> anyhow::Result<String> {
let request_num = counter.fetch_add(1, Ordering::SeqCst);
// Simulate network delay
tokio::time::sleep(Duration::from_millis(50)).await;
// Simulate different failure scenarios
match request_num % 7 {
0 => {
// Simulate timeout
Err(anyhow::anyhow!("Request timeout"))
}
1 | 2 => {
// Simulate temporary server error
Err(anyhow::anyhow!("503 Service Unavailable"))
}
3 if url.contains("endpoint3") => {
// Simulate rate limiting for specific endpoint
Err(anyhow::anyhow!("429 Too Many Requests"))
}
_ => {
// Success
Ok(format!("Response from {} (request #{})", url, request_num))
}
}
}
/// Determines if an error is retryable
fn is_retryable_error(error: &anyhow::Error) -> bool {
let error_str = error.to_string();
// List of retryable error patterns
error_str.contains("timeout") ||
error_str.contains("503") ||
error_str.contains("429") ||
error_str.contains("Service Unavailable") ||
error_str.contains("Too Many Requests")
}
/// Calculates the next backoff duration with jitter
fn calculate_next_backoff(current: Duration, max: Duration) -> Duration {
// Double the backoff
let mut next = current * 2;
// Cap at maximum
if next > max {
next = max;
}
// Add jitter (±10%)
let jitter_range = next.as_millis() / 10;
let jitter = (rand::random::<u64>() % (jitter_range as u64 * 2)) as i64 - jitter_range as i64;
let final_millis = (next.as_millis() as i64 + jitter).max(1) as u64;
Duration::from_millis(final_millis)
}
// Simple random number generator for jitter
mod rand {
use std::sync::atomic::{AtomicU64, Ordering};
static SEED: AtomicU64 = AtomicU64::new(12345);
pub fn random<T>() -> T
where
T: From<u64>,
{
// Simple LCG for demonstration
let old = SEED.fetch_add(1, Ordering::Relaxed);
let new = (old.wrapping_mul(1103515245).wrapping_add(12345)) >> 16;
T::from(new)
}
}

View File

@@ -1,161 +0,0 @@
//! Web scraper example using noworkers.
//!
//! This example demonstrates a real-world use case: scraping multiple
//! URLs concurrently with rate limiting and error handling.
use noworkers::Workers;
use std::time::{Duration, Instant};
/// Simulated web page data
struct WebPage {
url: String,
content: String,
fetch_time: Duration,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
println!("Web Scraper Example");
println!("==================\n");
let urls = vec![
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
"https://example.com/page4",
"https://example.com/page5",
"https://example.com/page6",
"https://example.com/page7",
"https://example.com/page8",
"https://example.com/page9",
"https://example.com/page10",
];
println!("Scraping {} URLs with rate limiting...\n", urls.len());
let mut workers = Workers::new();
// Limit concurrent requests to avoid overwhelming the server
workers.with_limit(3);
// Add timeout for the entire scraping operation
workers.with_cancel_task(async {
tokio::time::sleep(Duration::from_secs(10)).await;
println!("⏰ Timeout reached - cancelling remaining requests");
});
let start = Instant::now();
let results = std::sync::Arc::new(tokio::sync::Mutex::new(Vec::new()));
for (idx, url) in urls.into_iter().enumerate() {
let url = url.to_string();
let results = results.clone();
workers
.add(move |cancel| async move {
// Simulate rate limiting with a small delay
tokio::time::sleep(Duration::from_millis(100)).await;
tokio::select! {
result = fetch_page(&url, idx) => {
match result {
Ok(page) => {
println!(
"✓ [{:>3}ms] Fetched {} ({}ms)",
start.elapsed().as_millis(),
page.url,
page.fetch_time.as_millis()
);
let mut res = results.lock().await;
res.push(page);
Ok(())
}
Err(e) => {
println!(
"✗ [{:>3}ms] Failed to fetch {}: {}",
start.elapsed().as_millis(),
url,
e
);
// In a real scraper, you might want to continue
// despite individual failures
Ok(()) // Continue scraping other pages
// Or propagate the error to stop all scraping:
// Err(e)
}
}
}
_ = cancel.cancelled() => {
println!("⚠ Cancelled fetch for {}", url);
Ok(())
}
}
})
.await?;
}
// Wait for all scraping to complete
let scrape_result = workers.wait().await;
println!("\n========== Results ==========");
match scrape_result {
Ok(_) => {
let results = results.lock().await;
println!("Successfully scraped {} pages", results.len());
println!("Total time: {}ms", start.elapsed().as_millis());
// Process results
let total_content_size: usize = results.iter()
.map(|p| p.content.len())
.sum();
let avg_fetch_time: u128 = if !results.is_empty() {
results.iter()
.map(|p| p.fetch_time.as_millis())
.sum::<u128>() / results.len() as u128
} else {
0
};
println!("Total content size: {} bytes", total_content_size);
println!("Average fetch time: {}ms", avg_fetch_time);
}
Err(e) => {
println!("Scraping failed with error: {}", e);
let results = results.lock().await;
println!("Managed to scrape {} pages before failure", results.len());
}
}
Ok(())
}
/// Simulates fetching a web page
async fn fetch_page(url: &str, idx: usize) -> anyhow::Result<WebPage> {
let fetch_start = Instant::now();
// Simulate network delay (varies by page)
let delay = 200 + (idx * 50) % 300;
tokio::time::sleep(Duration::from_millis(delay as u64)).await;
// Simulate occasional failures
if idx == 7 {
return Err(anyhow::anyhow!("Connection timeout"));
}
// Simulate fetching content
let content = format!(
"<!DOCTYPE html><html><body><h1>Page {}</h1><p>Content for {}</p></body></html>",
idx, url
);
Ok(WebPage {
url: url.to_string(),
content,
fetch_time: fetch_start.elapsed(),
})
}

View File

@@ -1,86 +0,0 @@
//! Concurrency limiting with noworkers.
//!
//! This example shows how to limit the number of concurrent tasks
//! to prevent resource exhaustion and control parallelism.
use noworkers::Workers;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
println!("Concurrency Limiting Example");
println!("============================\n");
// Track concurrent task count
let concurrent_count = Arc::new(AtomicUsize::new(0));
let max_seen = Arc::new(AtomicUsize::new(0));
let mut workers = Workers::new();
// Limit to 3 concurrent tasks
let limit = 3;
workers.with_limit(limit);
println!("Concurrency limit set to: {}\n", limit);
let start = Instant::now();
// Spawn 10 tasks, but only 3 will run at a time
for i in 0..10 {
let count = concurrent_count.clone();
let max = max_seen.clone();
workers
.add(move |_cancel| async move {
// Increment concurrent count
let current = count.fetch_add(1, Ordering::SeqCst) + 1;
// Track maximum concurrency
let mut max_val = max.load(Ordering::SeqCst);
while current > max_val {
match max.compare_exchange_weak(
max_val,
current,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => break,
Err(x) => max_val = x,
}
}
println!(
"[{:>3}ms] Task {:>2} started (concurrent: {})",
start.elapsed().as_millis(),
i,
current
);
// Simulate work
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
// Decrement concurrent count
let remaining = count.fetch_sub(1, Ordering::SeqCst) - 1;
println!(
"[{:>3}ms] Task {:>2} finished (remaining: {})",
start.elapsed().as_millis(),
i,
remaining
);
Ok(())
})
.await?;
}
workers.wait().await?;
println!("\n========== Summary ==========");
println!("Total time: {}ms", start.elapsed().as_millis());
println!("Max concurrent tasks: {}", max_seen.load(Ordering::SeqCst));
println!("Expected batches: {} (10 tasks / 3 limit)", (10 + limit - 1) / limit);
println!("Expected time: ~{}ms (4 batches * 200ms)", ((10 + limit - 1) / limit) * 200);
Ok(())
}

View File

@@ -1,254 +1,8 @@
//! # noworkers
//!
//! A small, ergonomic Rust crate for spawning and supervising groups of asynchronous "workers" on Tokio.
//! Manage concurrent tasks with optional limits, cancellation, and first-error propagation.
//!
//! This crate is inspired by Go's [errgroup](https://pkg.go.dev/golang.org/x/sync/errgroup) package,
//! providing similar functionality in an idiomatic Rust way.
//!
//! ## Overview
//!
//! `noworkers` provides a simple way to manage groups of concurrent async tasks with:
//!
//! * **Bounded or unbounded concurrency** - Control how many tasks run simultaneously
//! * **Automatic cancellation** - First error cancels all remaining tasks
//! * **Flexible cancellation strategies** - External tokens or task-driven cancellation
//! * **Zero-cost abstractions** - Minimal overhead over raw tokio tasks
//!
//! ## Quick Start
//!
//! ```rust
//! use noworkers::Workers;
//!
//! #[tokio::main]
//! async fn main() -> anyhow::Result<()> {
//! // Create a new worker group
//! let mut workers = Workers::new();
//!
//! // Limit concurrent tasks to 5
//! workers.with_limit(5);
//!
//! // Spawn 10 tasks
//! for i in 0..10 {
//! workers.add(move |_cancel| async move {
//! println!("Task {i} running");
//! tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
//! Ok(())
//! }).await?;
//! }
//!
//! // Wait for all tasks to complete
//! workers.wait().await?;
//! Ok(())
//! }
//! ```
//!
//! ## Core Concepts
//!
//! ### Worker Groups
//!
//! A [`Workers`] instance represents a group of related async tasks that should be
//! managed together. All tasks in a group share:
//!
//! * A common concurrency limit (if set)
//! * A cancellation token hierarchy
//! * First-error propagation semantics
//!
//! ### Error Handling
//!
//! The first task to return an error "wins" - its error is captured and all other
//! tasks are immediately cancelled. This provides fail-fast semantics similar to
//! Go's errgroup.
//!
//! ```rust
//! use noworkers::Workers;
//!
//! # async fn example() -> anyhow::Result<()> {
//! let workers = Workers::new();
//!
//! // This task will fail first
//! workers.add(|_| async {
//! tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
//! Err(anyhow::anyhow!("task failed"))
//! }).await?;
//!
//! // These tasks will be cancelled
//! for i in 0..5 {
//! workers.add(move |cancel| async move {
//! // Check for cancellation
//! tokio::select! {
//! _ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => {
//! Ok(())
//! }
//! _ = cancel.cancelled() => {
//! println!("Task {i} cancelled");
//! Ok(())
//! }
//! }
//! }).await?;
//! }
//!
//! // This will return the error from the first task
//! let result = workers.wait().await;
//! assert!(result.is_err());
//! # Ok(())
//! # }
//! ```
//!
//! ### Concurrency Limits
//!
//! You can limit how many tasks run concurrently using [`Workers::with_limit`].
//! When the limit is reached, new tasks will wait for a slot to become available.
//!
//! ```rust
//! use noworkers::Workers;
//! use std::sync::Arc;
//! use std::sync::atomic::{AtomicUsize, Ordering};
//!
//! # async fn example() -> anyhow::Result<()> {
//! let mut workers = Workers::new();
//! workers.with_limit(2); // Only 2 tasks run at once
//!
//! let concurrent_count = Arc::new(AtomicUsize::new(0));
//!
//! for i in 0..10 {
//! let count = concurrent_count.clone();
//! workers.add(move |_| async move {
//! let current = count.fetch_add(1, Ordering::SeqCst) + 1;
//! assert!(current <= 2, "Too many concurrent tasks!");
//!
//! tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
//!
//! count.fetch_sub(1, Ordering::SeqCst);
//! Ok(())
//! }).await?;
//! }
//!
//! workers.wait().await?;
//! # Ok(())
//! # }
//! ```
//!
//! ### Cancellation
//!
//! There are two ways to trigger cancellation:
//!
//! 1. **External cancellation** - Use an existing `CancellationToken`
//! 2. **Task-driven cancellation** - A dedicated task that triggers cancellation when complete
//!
//! ```rust
//! use noworkers::Workers;
//! use tokio_util::sync::CancellationToken;
//!
//! # async fn example() -> anyhow::Result<()> {
//! // External cancellation
//! let mut workers = Workers::new();
//! let cancel = CancellationToken::new();
//! workers.with_cancel(&cancel);
//!
//! // Cancel after 1 second
//! let cancel_clone = cancel.clone();
//! tokio::spawn(async move {
//! tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
//! cancel_clone.cancel();
//! });
//!
//! // Task-driven cancellation
//! let mut workers2 = Workers::new();
//! workers2.with_cancel_task(async {
//! tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
//! });
//! # Ok(())
//! # }
//! ```
//!
//! ## Extensions
//!
//! The crate provides extension traits for common patterns:
//!
//! ```rust
//! use noworkers::Workers;
//! use noworkers::extensions::WithSysLimitCpus;
//!
//! # async fn example() -> anyhow::Result<()> {
//! let mut workers = Workers::new();
//! // Automatically limit to number of CPU cores
//! workers.with_limit_to_system_cpus();
//! # Ok(())
//! # }
//! ```
//!
//! ## Examples
//!
//! See the `examples/` directory for more complete examples:
//!
//! * `basic.rs` - Simple task spawning and waiting
//! * `with_limit.rs` - Concurrency limiting
//! * `cancellation.rs` - Cancellation patterns
//! * `error_handling.rs` - Error propagation
//! * `web_scraper.rs` - Real-world web scraping example
//! * `parallel_processing.rs` - Data processing pipeline
use std::{future::Future, sync::Arc};
use tokio::{sync::Mutex, task::JoinHandle};
use tokio_util::sync::CancellationToken;
/// Extension traits for common patterns.
///
/// This module provides convenient extension methods for [`Workers`] through traits.
pub mod extensions {
use crate::Workers;
/// Extension trait for CPU-based concurrency limits.
///
/// This trait provides a convenient way to limit worker concurrency based on
/// the system's available CPU cores.
pub trait WithSysLimitCpus {
/// Limits the worker group to the number of available CPU cores.
///
/// This is a convenience method that automatically detects the number of
/// logical CPU cores available and sets that as the concurrency limit.
///
/// # Example
///
/// ```rust
/// use noworkers::Workers;
/// use noworkers::extensions::WithSysLimitCpus;
///
/// # async fn example() -> anyhow::Result<()> {
/// let mut workers = Workers::new();
/// // Automatically limit to system CPU count (e.g., 8 on an 8-core machine)
/// workers.with_limit_to_system_cpus();
///
/// // Spawn many tasks, but only CPU count will run concurrently
/// for i in 0..100 {
/// workers.add(move |_| async move {
/// // CPU-bound work here
/// Ok(())
/// }).await?;
/// }
/// # Ok(())
/// # }
/// ```
///
/// # Panics
///
/// Panics if the system's available parallelism cannot be determined.
fn with_limit_to_system_cpus(&mut self) -> &mut Self;
}
impl WithSysLimitCpus for Workers {
fn with_limit_to_system_cpus(&mut self) -> &mut Self {
self.with_limit(
std::thread::available_parallelism()
.expect("to be able to get system cpu info")
.into(),
)
}
}
}
type ErrChan = Arc<
Mutex<(
Option<tokio::sync::oneshot::Sender<anyhow::Error>>,
@@ -258,50 +12,6 @@ type ErrChan = Arc<
type JoinHandles = Arc<Mutex<Vec<JoinHandle<()>>>>;
/// A group of supervised async workers with optional concurrency limits.
///
/// `Workers` manages a collection of async tasks that run concurrently on Tokio.
///
/// # Examples
///
/// Basic usage:
/// ```rust
/// use noworkers::Workers;
///
/// # async fn example() -> anyhow::Result<()> {
/// let workers = Workers::new();
///
/// // Spawn some work
/// workers.add(|_| async {
/// println!("Doing work!");
/// Ok(())
/// }).await?;
///
/// // Wait for completion
/// workers.wait().await?;
/// # Ok(())
/// # }
/// ```
///
/// With concurrency limit:
/// ```rust
/// use noworkers::Workers;
///
/// # async fn example() -> anyhow::Result<()> {
/// let mut workers = Workers::new();
/// workers.with_limit(3); // Max 3 concurrent tasks
///
/// for i in 0..10 {
/// workers.add(move |_| async move {
/// println!("Task {i}");
/// Ok(())
/// }).await?;
/// }
///
/// workers.wait().await?;
/// # Ok(())
/// # }
/// ```
#[derive(Clone)]
pub struct Workers {
limit: WorkerLimit,
@@ -346,12 +56,6 @@ impl WorkerLimit {
}
}
/// Guard that tracks active worker slots for concurrency limiting.
///
/// This guard is automatically created when a worker starts and dropped when it completes.
/// When dropped, it signals that a worker slot is available for the next queued task.
///
/// This type is not directly constructible by users.
pub struct WorkerGuard {
limit: WorkerLimit,
}
@@ -374,19 +78,6 @@ impl Drop for WorkerGuard {
}
impl Workers {
/// Creates a new worker group with no concurrency limit.
///
/// The returned `Workers` instance can spawn unlimited concurrent tasks.
/// Use [`with_limit`](Self::with_limit) to add a concurrency limit.
///
/// # Examples
///
/// ```rust
/// use noworkers::Workers;
///
/// let workers = Workers::new();
/// // Can spawn unlimited concurrent tasks
/// ```
pub fn new() -> Self {
let once = tokio::sync::oneshot::channel();
@@ -398,99 +89,13 @@ impl Workers {
}
}
/// Associates an external cancellation token with this worker group.
///
/// When the provided token is cancelled, all workers in this group will receive
/// cancellation signals. The group creates a child token from the provided token,
/// so cancelling the group won't affect the parent token.
///
/// # Important
///
/// Do not use both `with_cancel` and [`with_cancel_task`](Self::with_cancel_task) on the same
/// worker group. This is undefined behavior.
///
/// # Examples
///
/// ```rust
/// use noworkers::Workers;
/// use tokio_util::sync::CancellationToken;
///
/// # async fn example() -> anyhow::Result<()> {
/// let mut workers = Workers::new();
/// let cancel = CancellationToken::new();
///
/// // Link workers to external cancellation
/// workers.with_cancel(&cancel);
///
/// // Spawn workers that respect cancellation
/// workers.add(|cancel| async move {
/// tokio::select! {
/// _ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => {
/// Ok(())
/// }
/// _ = cancel.cancelled() => {
/// println!("Cancelled!");
/// Ok(())
/// }
/// }
/// }).await?;
///
/// // Cancel from external source
/// cancel.cancel();
/// workers.wait().await?;
/// # Ok(())
/// # }
/// ```
/// respects an external cancellation token, it is undefined behavior to use this with with_cancel_task, we will always only cancel a child token, i.e. never the external token
pub fn with_cancel(&mut self, cancel: &CancellationToken) -> &mut Self {
self.cancellation = cancel.child_token();
self
}
/// Spawns a task that cancels the worker group when it completes.
///
/// This is useful for implementing timeouts or other cancellation conditions.
/// When the provided future completes, all workers in the group receive
/// cancellation signals.
///
/// # Important
///
/// Do not use both [`with_cancel`](Self::with_cancel) and `with_cancel_task` on the same
/// worker group. This is undefined behavior.
///
/// # Examples
///
/// Timeout after 5 seconds:
/// ```rust
/// use noworkers::Workers;
///
/// # async fn example() -> anyhow::Result<()> {
/// let mut workers = Workers::new();
///
/// // Cancel all workers after 5 seconds
/// workers.with_cancel_task(async {
/// tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
/// });
///
/// // Spawn long-running workers
/// for i in 0..10 {
/// workers.add(move |cancel| async move {
/// tokio::select! {
/// _ = tokio::time::sleep(tokio::time::Duration::from_secs(60)) => {
/// println!("Task {i} completed");
/// Ok(())
/// }
/// _ = cancel.cancelled() => {
/// println!("Task {i} cancelled by timeout");
/// Ok(())
/// }
/// }
/// }).await?;
/// }
///
/// workers.wait().await?;
/// # Ok(())
/// # }
/// ```
/// with_cancel and with_cancel_task used together is considered undefined behavior, as we will cancel the external cancellation token on cancel_task completion
pub fn with_cancel_task<T>(&mut self, f: T) -> &mut Self
where
T: Future<Output = ()> + Send + 'static,
@@ -504,43 +109,7 @@ impl Workers {
self
}
/// Sets a concurrency limit for this worker group.
///
/// When the limit is reached, calls to [`add`](Self::add) will wait until a slot
/// becomes available. This provides backpressure and prevents unbounded task spawning.
///
/// # Important
///
/// When using with cancellation, workers should check their cancellation tokens.
/// Even after cancellation, queued tasks will still be scheduled (though they should
/// exit quickly if they respect cancellation).
///
/// # Examples
///
/// ```rust
/// use noworkers::Workers;
/// use std::time::Instant;
///
/// # async fn example() -> anyhow::Result<()> {
/// let mut workers = Workers::new();
/// workers.with_limit(2); // Only 2 tasks run concurrently
///
/// let start = Instant::now();
///
/// // Spawn 4 tasks that each take 100ms
/// for i in 0..4 {
/// workers.add(move |_| async move {
/// println!("Task {i} started at {:?}", start.elapsed());
/// tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
/// Ok(())
/// }).await?;
/// }
///
/// workers.wait().await?;
/// // Total time will be ~200ms (2 batches of 2 tasks)
/// # Ok(())
/// # }
/// ```
/// with_limit can be dangerous if used with an external cancel, because we still queue work after the cancel, it doesn't guarantee that everyone respects said channel, and closes down in a timely manner. Work will still be queued after the cancel, it is up to the provided worker function to respect when cancel is called
pub fn with_limit(&mut self, limit: usize) -> &mut Self {
let (tx, rx) = tokio::sync::mpsc::channel(limit);
@@ -551,70 +120,8 @@ impl Workers {
self
}
/// Spawns a new worker task in this group.
///
/// The provided closure receives a [`CancellationToken`] that will be triggered if:
/// - Another worker returns an error
/// - The group's cancellation token is triggered
/// - [`wait`](Self::wait) completes
///
/// # Immediate Execution
///
/// The task starts executing immediately when `add` is called, not when `wait` is called.
/// The `wait` method is only for awaiting completion and collecting errors.
///
/// # Backpressure
///
/// If a concurrency limit is set via [`with_limit`](Self::with_limit), this method
/// will wait until a slot is available before spawning the task.
///
/// # Error Propagation
///
/// If the worker returns an error:
/// 1. All other workers receive cancellation signals
/// 2. The error is stored to be returned by [`wait`](Self::wait)
/// 3. Subsequent errors are ignored (first error wins)
///
/// # Examples
///
/// Basic task:
/// ```rust
/// use noworkers::Workers;
///
/// # async fn example() -> anyhow::Result<()> {
/// let workers = Workers::new();
///
/// workers.add(|_cancel| async {
/// println!("Doing work");
/// Ok(())
/// }).await?;
/// # Ok(())
/// # }
/// ```
///
/// Task with cancellation handling:
/// ```rust
/// use noworkers::Workers;
///
/// # async fn example() -> anyhow::Result<()> {
/// let workers = Workers::new();
///
/// workers.add(|cancel| async move {
/// tokio::select! {
/// result = do_work() => {
/// result
/// }
/// _ = cancel.cancelled() => {
/// println!("Cancelled");
/// Ok(())
/// }
/// }
/// }).await?;
///
/// # async fn do_work() -> anyhow::Result<()> { Ok(()) }
/// # Ok(())
/// # }
/// ```
/// Add
/// Note: Add is immediate, this means your future will be polled immediately, no matter if wait has been called or not. Wait is just for waiting for completion, as well as receiving errors
pub async fn add<T, TFut>(&self, f: T) -> anyhow::Result<()>
where
T: FnOnce(CancellationToken) -> TFut + Send + 'static,
@@ -646,61 +153,6 @@ impl Workers {
Ok(())
}
/// Waits for all workers to complete and returns the first error (if any).
///
/// This method:
/// 1. Waits for all spawned tasks to complete
/// 2. Cancels any remaining tasks if one fails
/// 3. Returns the first error encountered, or `Ok(())` if all succeed
///
/// # Consumption
///
/// This method consumes the `Workers` instance. After calling `wait`,
/// you cannot add more tasks to this group.
///
/// # Examples
///
/// Success case:
/// ```rust
/// use noworkers::Workers;
///
/// # async fn example() -> anyhow::Result<()> {
/// let workers = Workers::new();
///
/// for i in 0..5 {
/// workers.add(move |_| async move {
/// println!("Task {i}");
/// Ok(())
/// }).await?;
/// }
///
/// // Blocks until all complete
/// workers.wait().await?;
/// println!("All done!");
/// # Ok(())
/// # }
/// ```
///
/// Error case:
/// ```rust
/// use noworkers::Workers;
///
/// # async fn example() {
/// let workers = Workers::new();
///
/// workers.add(|_| async {
/// Err(anyhow::anyhow!("Task failed"))
/// }).await.unwrap();
///
/// workers.add(|_| async {
/// Ok(()) // This might be cancelled
/// }).await.unwrap();
///
/// // Returns the error from the first task
/// let result = workers.wait().await;
/// assert!(result.is_err());
/// # }
/// ```
pub async fn wait(self) -> anyhow::Result<()> {
{
let mut handles = self.handles.lock().await;

View File

@@ -1,6 +1,6 @@
# yaml-language-server: $schema=https://git.kjuulh.io/kjuulh/cuddle/raw/branch/main/schemas/base.json
# yaml-language-server: $schema=https://git.front.kjuulh.io/kjuulh/cuddle/raw/branch/main/schemas/base.json
base: "git@git.kjuulh.io:kjuulh/cuddle-rust-lib-plan.git"
base: "git@git.front.kjuulh.io:kjuulh/cuddle-rust-lib-plan.git"
vars:
service: "noworkers"
@@ -12,6 +12,6 @@ please:
repository: "noworkers"
branch: main
settings:
api_url: "https://git.kjuulh.io"
api_url: "https://git.front.kjuulh.io"
actions:
rust: