11
examples/pipeline/Cargo.toml
Normal file
11
examples/pipeline/Cargo.toml
Normal file
@@ -0,0 +1,11 @@
|
||||
[package]
|
||||
name = "pipeline"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
noprocess.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tracing.workspace = true
|
||||
tracing-subscriber = "0.3"
|
||||
127
examples/pipeline/src/main.rs
Normal file
127
examples/pipeline/src/main.rs
Normal file
@@ -0,0 +1,127 @@
|
||||
use std::{future::Future, sync::Arc, time::Duration};
|
||||
|
||||
use noprocess::{Process, ProcessHandler, ProcessManager, ProcessResult, ShutdownConfig};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), noprocess::Error> {
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
let manager = ProcessManager::new();
|
||||
|
||||
// Create a channel for the pipeline
|
||||
let (tx, rx) = mpsc::channel::<u64>(100);
|
||||
|
||||
// Producer: generates data
|
||||
let producer = Process::builder(Producer { tx })
|
||||
.handle_id("producer")
|
||||
.shutdown_config(ShutdownConfig {
|
||||
graceful_timeout: Duration::from_secs(2),
|
||||
restart_on_success: true,
|
||||
restart_delay: Duration::from_millis(100),
|
||||
})
|
||||
.build();
|
||||
|
||||
// Consumer: processes data
|
||||
let consumer = Process::builder(Consumer { rx: Arc::new(tokio::sync::Mutex::new(rx)) })
|
||||
.handle_id("consumer")
|
||||
.shutdown_config(ShutdownConfig {
|
||||
graceful_timeout: Duration::from_secs(5),
|
||||
restart_on_success: false,
|
||||
restart_delay: Duration::ZERO,
|
||||
})
|
||||
.on_error(|e| eprintln!("Consumer error: {e}"))
|
||||
.build();
|
||||
|
||||
let producer_id = manager.add_process(producer).await;
|
||||
let consumer_id = manager.add_process(consumer).await;
|
||||
|
||||
// Start consumer first, then producer
|
||||
manager.start_process(&consumer_id).await?;
|
||||
manager.start_process(&producer_id).await?;
|
||||
|
||||
// Let it run for a bit
|
||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||
|
||||
// Graceful shutdown: stop producer first, let consumer drain
|
||||
println!("\n--- Initiating graceful shutdown ---");
|
||||
manager.stop_process(&producer_id).await?;
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
manager.stop_process(&consumer_id).await?;
|
||||
|
||||
println!("Pipeline stopped cleanly");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct Producer {
|
||||
tx: mpsc::Sender<u64>,
|
||||
}
|
||||
|
||||
impl ProcessHandler for Producer {
|
||||
fn call(&self, cancel: CancellationToken) -> impl Future<Output = ProcessResult> + Send {
|
||||
let tx = self.tx.clone();
|
||||
async move {
|
||||
let mut counter = 0u64;
|
||||
let mut interval = tokio::time::interval(Duration::from_millis(200));
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
println!("[producer] shutting down at count {counter}");
|
||||
break;
|
||||
}
|
||||
_ = interval.tick() => {
|
||||
if tx.send(counter).await.is_err() {
|
||||
println!("[producer] consumer gone, stopping");
|
||||
break;
|
||||
}
|
||||
println!("[producer] sent {counter}");
|
||||
counter += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct Consumer {
|
||||
rx: Arc<tokio::sync::Mutex<mpsc::Receiver<u64>>>,
|
||||
}
|
||||
|
||||
impl ProcessHandler for Consumer {
|
||||
fn call(&self, cancel: CancellationToken) -> impl Future<Output = ProcessResult> + Send {
|
||||
let rx = self.rx.clone();
|
||||
async move {
|
||||
let mut rx = rx.lock().await;
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
// Drain remaining items
|
||||
let mut drained = 0;
|
||||
while rx.try_recv().is_ok() {
|
||||
drained += 1;
|
||||
}
|
||||
println!("[consumer] shutting down, drained {drained} remaining items");
|
||||
break;
|
||||
}
|
||||
msg = rx.recv() => {
|
||||
match msg {
|
||||
Some(n) => {
|
||||
// Simulate processing
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
println!("[consumer] processed {n}");
|
||||
}
|
||||
None => {
|
||||
println!("[consumer] channel closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user