Compare commits
7 Commits
Author | SHA1 | Date | |
---|---|---|---|
119fb101cf | |||
177820f3c3 | |||
3fc9c3143f | |||
8b1ef0c2cb | |||
688e742f20 | |||
c6dbf5f4ea | |||
8f4d61b9e1 |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -88,7 +88,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "noworkers"
|
name = "noworkers"
|
||||||
version = "0.1.0"
|
version = "0.0.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
@@ -4,6 +4,7 @@ resolver = "2"
|
|||||||
|
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
version = "0.0.1"
|
version = "0.0.1"
|
||||||
|
license = "MIT"
|
||||||
|
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
noworkers = { path = "crates/noworkers" }
|
noworkers = { path = "crates/noworkers" }
|
||||||
|
32
README.md
32
README.md
@@ -5,6 +5,12 @@ Manage concurrent tasks with optional limits, cancellation, and first-error prop
|
|||||||
|
|
||||||
Inpired by golang (errgroups)
|
Inpired by golang (errgroups)
|
||||||
|
|
||||||
|
## Disclaimer
|
||||||
|
|
||||||
|
The library is still new, and as such the API is subject to change, I don't expect changes to the add and wait functions, but the rest may change. I might also move to custom error types, and or removing the tokio_utils entirely to slim down the package. It shouldn't affect the user too much however.
|
||||||
|
|
||||||
|
The crate is in production, and has seen extensive use
|
||||||
|
|
||||||
## Features
|
## Features
|
||||||
|
|
||||||
- **Unlimited or bounded concurrency** via `with_limit(usize)`.
|
- **Unlimited or bounded concurrency** via `with_limit(usize)`.
|
||||||
@@ -25,33 +31,39 @@ Then in your code:
|
|||||||
|
|
||||||
```rust
|
```rust
|
||||||
use noworkers::Workers;
|
use noworkers::Workers;
|
||||||
use tokio_util::sync::CancellationToken;
|
|
||||||
```
|
```
|
||||||
|
|
||||||
## Quick Example
|
## Quick Example
|
||||||
|
|
||||||
```rust,no_run
|
```rust
|
||||||
use noworkers::Workers;
|
use noworkers::Workers;
|
||||||
use tokio_util::sync::CancellationToken;
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
// Create a worker group with up to 5 concurrent tasks
|
// Create a worker group with up to 5 concurrent tasks
|
||||||
let mut workers = Workers::new();
|
let mut workers = Workers::new();
|
||||||
|
|
||||||
workers
|
// Limit amount of concurrent workers
|
||||||
.with_limit(5)
|
workers.with_limit(5);
|
||||||
.with_cancel(&CancellationToken::new());
|
|
||||||
|
// Adds cancellation signal
|
||||||
|
workers.with_cancel_task(async move {
|
||||||
|
// send cancellation to tasks after 60 seconds
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(60)).await
|
||||||
|
});
|
||||||
|
|
||||||
// Spawn 10 async jobs
|
// Spawn 10 async jobs
|
||||||
for i in 0..10 {
|
for i in 0..10 {
|
||||||
|
// Work is done immediatley, so this will wait in two batches of 1 seconds each (because of limit)
|
||||||
workers.add(move |cancel_token| async move {
|
workers.add(move |cancel_token| async move {
|
||||||
// Respect cancellation, or not, if you don't care about blocking forever
|
// optional tokio::select, if you use cancellation for your tasks, if not just do your work
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
// Do work, in this case just sleep
|
||||||
_ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => {
|
_ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => {
|
||||||
println!("Job {i} done");
|
println!("Job {i} done");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
// If we receive cancel we close
|
||||||
_ = cancel_token.cancelled() => {
|
_ = cancel_token.cancelled() => {
|
||||||
println!("Job {i} cancelled");
|
println!("Job {i} cancelled");
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -95,5 +107,9 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
|
|
||||||
Dual-licensed under **MIT** or **Apache-2.0**.
|
Dual-licensed under **MIT** or **Apache-2.0**.
|
||||||
See [LICENSE-MIT](LICENSE-MIT) and [LICENSE-APACHE](LICENSE-APACHE) for details.
|
See [LICENSE-MIT](LICENSE-MIT) and [LICENSE-APACHE](LICENSE-APACHE) for details.
|
||||||
```
|
|
||||||
|
## Contribute
|
||||||
|
|
||||||
|
Simply create an issue here or pr https://github.com/kjuulh/noworkers.git, development happens publicly at: https://git.front.kjuulh.io/kjuulh/noworkers.
|
||||||
|
|
||||||
|
|
||||||
|
@@ -3,7 +3,7 @@ name = "noworkers"
|
|||||||
edition = "2024"
|
edition = "2024"
|
||||||
readme = "../../README.md"
|
readme = "../../README.md"
|
||||||
version.workspace = true
|
version.workspace = true
|
||||||
license = "MIT or APACHE"
|
license.workspace = true
|
||||||
repository = "https://git.front.kjuulh.io/kjuulh/noworkers"
|
repository = "https://git.front.kjuulh.io/kjuulh/noworkers"
|
||||||
authors = ["kjuulh <contact@kasperhermansen.com>"]
|
authors = ["kjuulh <contact@kasperhermansen.com>"]
|
||||||
description = "A small asyncronous worker pool manages thread pool limiting, cancellation and error propogation, inspired by golangs errgroup (requires tokio)"
|
description = "A small asyncronous worker pool manages thread pool limiting, cancellation and error propogation, inspired by golangs errgroup (requires tokio)"
|
||||||
|
@@ -3,6 +3,24 @@ use std::{future::Future, sync::Arc};
|
|||||||
use tokio::{sync::Mutex, task::JoinHandle};
|
use tokio::{sync::Mutex, task::JoinHandle};
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
|
pub mod extensions {
|
||||||
|
use crate::Workers;
|
||||||
|
|
||||||
|
pub trait WithSysLimitCpus {
|
||||||
|
fn with_limit_to_system_cpus(&mut self) -> &mut Self;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WithSysLimitCpus for Workers {
|
||||||
|
fn with_limit_to_system_cpus(&mut self) -> &mut Self {
|
||||||
|
self.with_limit(
|
||||||
|
std::thread::available_parallelism()
|
||||||
|
.expect("to be able to get system cpu info")
|
||||||
|
.into(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type ErrChan = Arc<
|
type ErrChan = Arc<
|
||||||
Mutex<(
|
Mutex<(
|
||||||
Option<tokio::sync::oneshot::Sender<anyhow::Error>>,
|
Option<tokio::sync::oneshot::Sender<anyhow::Error>>,
|
||||||
|
Reference in New Issue
Block a user