refactor: into more reasonable project structure

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
2026-01-07 14:38:17 +01:00
parent a5f6dfa972
commit 1fc3127b84
8 changed files with 571 additions and 449 deletions

176
Cargo.lock generated
View File

@@ -20,12 +20,6 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bumpalo"
version = "3.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510"
[[package]]
name = "bytes"
version = "1.11.0"
@@ -52,24 +46,13 @@ checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
[[package]]
name = "getrandom"
version = "0.3.4"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd"
checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592"
dependencies = [
"cfg-if",
"libc",
"r-efi",
"wasip2",
]
[[package]]
name = "js-sys"
version = "0.3.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "464a3709c7f55f1f721e5389aa6ea4e3bc6aba669353300af094b29ffbdde1d8"
dependencies = [
"once_cell",
"wasm-bindgen",
"wasi",
]
[[package]]
@@ -116,11 +99,10 @@ name = "noprocess"
version = "0.1.0"
dependencies = [
"anyhow",
"thiserror",
"rand",
"tokio",
"tokio-util",
"tracing",
"uuid",
]
[[package]]
@@ -167,6 +149,15 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
[[package]]
name = "ppv-lite86"
version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9"
dependencies = [
"zerocopy",
]
[[package]]
name = "proc-macro2"
version = "1.0.105"
@@ -186,10 +177,34 @@ dependencies = [
]
[[package]]
name = "r-efi"
version = "5.3.0"
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom",
]
[[package]]
name = "redox_syscall"
@@ -200,12 +215,6 @@ dependencies = [
"bitflags",
]
[[package]]
name = "rustversion"
version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d"
[[package]]
name = "scopeguard"
version = "1.1.0"
@@ -269,26 +278,6 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "thiserror"
version = "2.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "2.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "thread_local"
version = "1.1.9"
@@ -404,17 +393,6 @@ version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15811caf2415fb889178633e7724bad2509101cde276048e013b9def5e51fa0"
[[package]]
name = "uuid"
version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a"
dependencies = [
"getrandom",
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "valuable"
version = "0.1.1"
@@ -427,60 +405,6 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasip2"
version = "1.0.1+wasi-0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7"
dependencies = [
"wit-bindgen",
]
[[package]]
name = "wasm-bindgen"
version = "0.2.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d759f433fa64a2d763d1340820e46e111a7a5ab75f993d1852d70b03dbb80fd"
dependencies = [
"cfg-if",
"once_cell",
"rustversion",
"wasm-bindgen-macro",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48cb0d2638f8baedbc542ed444afc0644a29166f1595371af4fecf8ce1e7eeb3"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cefb59d5cd5f92d9dcf80e4683949f15ca4b511f4ac0a6e14d4e1ac60c6ecd40"
dependencies = [
"bumpalo",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cbc538057e648b67f72a982e708d485b2efa771e1ac05fec311f9f63e5800db4"
dependencies = [
"unicode-ident",
]
[[package]]
name = "windows-link"
version = "0.2.1"
@@ -628,7 +552,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650"
[[package]]
name = "wit-bindgen"
version = "0.46.0"
name = "zerocopy"
version = "0.8.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59"
checksum = "1fabae64378cb18147bb18bca364e63bdbe72a0ffe4adf0addfec8aa166b2c56"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9c2d862265a8bb4471d87e033e730f536e2a285cc7cb05dbce09a2a97075f90"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

View File

@@ -6,6 +6,7 @@ resolver = "2"
noprocess = { path = "crates/noprocess" }
anyhow = { version = "1.0.71" }
rand = "0.8"
tokio = { version = "1", features = ["full"] }
tracing = { version = "0.1", features = ["log"] }
tokio-util = "0.7.18"

View File

@@ -5,8 +5,7 @@ edition = "2024"
[dependencies]
anyhow.workspace = true
thiserror = "2.0.17"
rand.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true
uuid = { version = "1.19.0", features = ["v4", "v7"] }

View File

@@ -0,0 +1,35 @@
use std::fmt::Display;
const ID_BYTES: usize = 6;
#[derive(Debug, Clone, Ord, Eq, PartialEq, PartialOrd)]
pub struct HandleID(String);
impl HandleID {
pub(crate) fn new_random() -> Self {
Self(generate_short_id())
}
}
impl Display for HandleID {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
impl From<&str> for HandleID {
fn from(value: &str) -> Self {
Self(value.to_string())
}
}
impl From<String> for HandleID {
fn from(value: String) -> Self {
Self(value)
}
}
pub(crate) fn generate_short_id() -> String {
let bytes: [u8; ID_BYTES] = rand::random();
bytes.iter().map(|b| format!("{b:02x}")).collect()
}

View File

@@ -1,35 +1,72 @@
use std::{collections::BTreeMap, fmt::Display, pin::Pin, sync::Arc, time::Duration};
mod handle_id;
mod process;
mod process_handler;
mod shutdown_config;
use tokio::{
sync::{Mutex, mpsc, oneshot, watch},
task::JoinHandle,
};
use tokio_util::sync::CancellationToken;
use std::{collections::BTreeMap, sync::Arc};
use tokio::sync::Mutex;
use tracing::{debug, info, warn};
pub use handle_id::HandleID;
pub use process::Process;
pub use process_handler::ProcessHandler;
pub use shutdown_config::ShutdownConfig;
pub struct ProcessManager {
inner: Arc<Mutex<InnerProcessManager>>,
inner: Arc<Mutex<Inner>>,
}
#[derive(Default)]
pub struct InnerProcessManager {
handles: BTreeMap<HandleID, Process>,
impl Default for ProcessManager {
fn default() -> Self {
Self::new()
}
}
impl InnerProcessManager {
impl ProcessManager {
pub fn new() -> Self {
debug!("creating process manager");
Self {
handles: BTreeMap::default(),
inner: Arc::new(Mutex::new(Inner::default())),
}
}
pub fn add_process(&mut self, process: Process) -> &mut Self {
self.handles.insert(process.handle_id.clone(), process);
self
pub async fn add_process(&self, process: Process) {
let handle_id = process.handle_id().clone();
info!(%handle_id, "adding process to manager");
self.inner.lock().await.add_process(process);
}
pub async fn start_process(&mut self, handle_id: &HandleID) -> anyhow::Result<Option<()>> {
pub async fn start_process(&self, handle_id: &HandleID) -> anyhow::Result<Option<()>> {
self.inner.lock().await.start_process(handle_id).await
}
pub async fn restart_process(&self, handle_id: &HandleID) -> anyhow::Result<Option<()>> {
self.inner.lock().await.restart_process(handle_id).await
}
pub async fn stop_process(&self, handle_id: &HandleID) -> anyhow::Result<Option<()>> {
self.inner.lock().await.stop_process(handle_id).await
}
pub async fn kill_process(&self, handle_id: &HandleID) -> anyhow::Result<Option<()>> {
self.inner.lock().await.kill_process(handle_id).await
}
}
#[derive(Default)]
struct Inner {
handles: BTreeMap<HandleID, Process>,
}
impl Inner {
fn add_process(&mut self, process: Process) {
self.handles.insert(process.handle_id().clone(), process);
}
async fn start_process(&mut self, handle_id: &HandleID) -> anyhow::Result<Option<()>> {
let Some(handle) = self.handles.get_mut(handle_id) else {
warn!(%handle_id, "process not found");
return Ok(None);
};
@@ -38,8 +75,9 @@ impl InnerProcessManager {
Ok(None)
}
pub async fn restart_process(&mut self, handle_id: &HandleID) -> anyhow::Result<Option<()>> {
async fn restart_process(&mut self, handle_id: &HandleID) -> anyhow::Result<Option<()>> {
let Some(handle) = self.handles.get_mut(handle_id) else {
warn!(%handle_id, "process not found");
return Ok(None);
};
@@ -48,8 +86,9 @@ impl InnerProcessManager {
Ok(None)
}
pub async fn stop_process(&mut self, handle_id: &HandleID) -> anyhow::Result<Option<()>> {
async fn stop_process(&mut self, handle_id: &HandleID) -> anyhow::Result<Option<()>> {
let Some(handle) = self.handles.get_mut(handle_id) else {
warn!(%handle_id, "process not found");
return Ok(None);
};
@@ -57,317 +96,15 @@ impl InnerProcessManager {
Ok(None)
}
}
pub struct Process {
handle_id: HandleID,
command_tx: mpsc::Sender<ProcessCommand>,
state: watch::Receiver<ProcessState>,
task: tokio::task::JoinHandle<()>,
}
async fn kill_process(&mut self, handle_id: &HandleID) -> anyhow::Result<Option<()>> {
let Some(handle) = self.handles.get_mut(handle_id) else {
warn!(%handle_id, "process not found");
return Ok(None);
};
impl Process {
pub fn new(process: impl IntoProcess) -> Self {
Self::new_with_handle(HandleID::new_random(), process)
}
handle.kill().await?;
pub fn new_with_handle(handle_id: impl Into<HandleID>, process: impl IntoProcess) -> Self {
let handle_id = handle_id.into();
let process = process.into_process();
let shutdown_config = ShutdownConfig::default();
let (command_tx, command_rx) = mpsc::channel(1);
let (event_tx, event_rx) = watch::channel(ProcessState::Pending);
let handle = tokio::spawn(Self::run_loop(
handle_id.clone(),
process,
command_rx,
shutdown_config,
event_tx,
));
Self {
handle_id,
state: event_rx,
command_tx,
task: handle,
}
}
async fn run_loop(
handle_id: HandleID,
process: SharedProcess,
mut commands: mpsc::Receiver<ProcessCommand>,
shutdown_config: ShutdownConfig,
state: watch::Sender<ProcessState>,
) {
let mut current_task: Option<RunningTask> = None;
loop {
match current_task.take() {
Some(mut task) => {
tokio::select! {
biased;
Some(cmd) = commands.recv() => {
current_task = Self::handle_command_while_running(
cmd,
task,
&state,
&shutdown_config
).await;
}
result = &mut task.handle => {
Self::handle_task_completion(result, process.clone(), &state, &shutdown_config, &mut current_task).await;
}
}
}
None => match commands.recv().await {
Some(ProcessCommand::Start) => {
if matches!(
*state.borrow(),
ProcessState::Pending | ProcessState::Stopped | ProcessState::Errored
) {
current_task = Some(RunningTask::spawn(process.clone()));
let _ = state.send(ProcessState::Running);
}
}
Some(ProcessCommand::Stop { response } | ProcessCommand::Kill { response }) => {
let _ = response.send(());
}
None => break,
},
}
}
}
pub async fn start(&mut self) -> anyhow::Result<()> {
self.command_tx
.send(ProcessCommand::Start)
.await
.map_err(|_| anyhow::anyhow!("runner terminated"))
}
pub async fn stop(&mut self) -> anyhow::Result<()> {
let (resp_tx, resp_rx) = oneshot::channel();
self.command_tx
.send(ProcessCommand::Stop { response: resp_tx })
.await
.map_err(|_| anyhow::anyhow!("runner terminated"))?;
resp_rx
.await
.map_err(|_| anyhow::anyhow!("runner terminated"))
}
pub async fn kill(&mut self) -> anyhow::Result<()> {
let (resp_tx, resp_rx) = oneshot::channel();
self.command_tx
.send(ProcessCommand::Kill { response: resp_tx })
.await
.map_err(|_| anyhow::anyhow!("runner terminated"))?;
resp_rx
.await
.map_err(|_| anyhow::anyhow!("runner terminated"))
}
pub async fn restart(&mut self) -> anyhow::Result<()> {
self.stop().await?;
self.start().await
}
pub(crate) async fn wait_for(&self, desired: ProcessState) {
let mut rx = self.state.clone();
while *rx.borrow_and_update() != desired {
if rx.changed().await.is_err() {
break;
}
}
}
pub fn handle_id(&self) -> &HandleID {
&self.handle_id
}
async fn handle_command_while_running(
cmd: ProcessCommand,
task: RunningTask,
state: &watch::Sender<ProcessState>,
config: &ShutdownConfig,
) -> Option<RunningTask> {
match cmd {
ProcessCommand::Start => Some(task),
ProcessCommand::Stop { response } => {
task.shutdown_gracefully(config.graceful_timeout).await;
let _ = state.send(ProcessState::Stopped);
let _ = response.send(());
None
}
ProcessCommand::Kill { response } => {
task.handle.abort();
let _ = task.handle.await;
let _ = state.send(ProcessState::Stopped);
let _ = response.send(());
None
}
}
}
async fn handle_task_completion(
result: Result<anyhow::Result<()>, tokio::task::JoinError>,
process: SharedProcess,
state: &watch::Sender<ProcessState>,
config: &ShutdownConfig,
current_task: &mut Option<RunningTask>,
) {
match result {
Ok(Ok(())) => {
if config.restart_on_success {
tokio::time::sleep(config.restart_delay).await;
*current_task = Some(RunningTask::spawn(process));
} else {
let _ = state.send(ProcessState::Stopped);
}
}
Ok(Err(e)) => {
tracing::error!("process error: {e:#}");
let _ = state.send(ProcessState::Errored);
}
Err(e) if e.is_cancelled() => {
// Aborted - state already set by kill handler
}
Err(e) => {
tracing::error!("process panicked: {e}");
let _ = state.send(ProcessState::Errored);
}
}
}
}
impl Drop for Process {
fn drop(&mut self) {
self.task.abort();
}
}
struct RunningTask {
handle: JoinHandle<anyhow::Result<()>>,
cancellation: CancellationToken,
}
impl RunningTask {
pub fn spawn(process: SharedProcess) -> Self {
let cancellation = CancellationToken::new();
let handle = tokio::spawn({
let cancellation = cancellation.child_token();
async move { process.process.call_async(cancellation).await }
});
Self {
handle,
cancellation,
}
}
async fn shutdown_gracefully(mut self, timeout: Duration) {
self.cancellation.cancel();
match tokio::time::timeout(timeout, &mut self.handle).await {
Ok(_) => {
tracing::debug!("process stopped")
}
Err(_) => {
tracing::error!("graceful shutdown timed out, process will be dropped");
self.handle.abort();
}
}
}
}
enum ProcessCommand {
Start,
Stop { response: oneshot::Sender<()> },
Kill { response: oneshot::Sender<()> },
}
pub struct ShutdownConfig {
pub graceful_timeout: Duration,
pub restart_on_success: bool,
pub restart_delay: Duration,
}
impl Default for ShutdownConfig {
fn default() -> Self {
Self {
graceful_timeout: Duration::from_secs(5),
restart_on_success: true,
restart_delay: Duration::ZERO,
}
}
}
#[derive(PartialEq)]
pub(crate) enum ProcessState {
Pending,
Running,
Errored,
Stopped,
}
#[derive(Debug, Clone, Ord, Eq, PartialEq, PartialOrd)]
pub struct HandleID(String);
impl HandleID {
fn new_random() -> Self {
Self(uuid::Uuid::now_v7().to_string())
}
}
impl Display for HandleID {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
#[allow(async_fn_in_trait)]
pub trait ProcessHandler: Send + Sync + 'static {
fn call(
&self,
cancellation: CancellationToken,
) -> impl Future<Output = anyhow::Result<()>> + Send;
}
trait AsyncProcess: Send + Sync {
fn call_async(
&self,
cancellation: CancellationToken,
) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
}
impl<E: ProcessHandler> AsyncProcess for E {
fn call_async(
&self,
cancellation: CancellationToken,
) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
Box::pin(self.call(cancellation))
}
}
#[derive(Clone)]
pub struct SharedProcess {
process: Arc<dyn AsyncProcess + Send + Sync + 'static>,
}
pub trait IntoProcess {
fn into_process(self) -> SharedProcess;
}
impl<E: ProcessHandler> IntoProcess for E {
fn into_process(self) -> SharedProcess {
SharedProcess {
process: Arc::new(self),
}
Ok(None)
}
}

View File

@@ -0,0 +1,351 @@
use std::time::Duration;
use tokio::{
sync::{mpsc, oneshot, watch},
task::JoinHandle,
};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, debug_span, info, trace, warn};
use crate::{
handle_id::{generate_short_id, HandleID},
process_handler::{IntoProcess, ProcessHandler, SharedProcess},
shutdown_config::ShutdownConfig,
};
/// Unique identifier for each process invocation (generated each time a process starts)
#[derive(Debug, Clone)]
struct InvocationID(String);
impl InvocationID {
fn new() -> Self {
Self(generate_short_id())
}
}
impl std::fmt::Display for InvocationID {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
pub struct Process {
handle_id: HandleID,
command_tx: mpsc::Sender<ProcessCommand>,
state: watch::Receiver<ProcessState>,
task: JoinHandle<()>,
}
impl Process {
pub fn new(process: impl ProcessHandler) -> Self {
Self::new_with_handle(HandleID::new_random(), process)
}
pub fn new_with_handle(handle_id: impl Into<HandleID>, process: impl ProcessHandler) -> Self {
let handle_id = handle_id.into();
let process = process.into_process();
let shutdown_config = ShutdownConfig::default();
let (command_tx, command_rx) = mpsc::channel(1);
let (event_tx, event_rx) = watch::channel(ProcessState::Pending);
debug!(handle_id = %handle_id, "creating process");
let span = debug_span!("process_loop", handle_id = %handle_id);
let handle = tokio::spawn(
Self::run_loop(handle_id.clone(), process, command_rx, shutdown_config, event_tx)
.instrument(span),
);
Self {
handle_id,
state: event_rx,
command_tx,
task: handle,
}
}
async fn run_loop(
handle_id: HandleID,
process: SharedProcess,
mut commands: mpsc::Receiver<ProcessCommand>,
shutdown_config: ShutdownConfig,
state: watch::Sender<ProcessState>,
) {
let mut current_task: Option<RunningTask> = None;
trace!("entering run loop");
loop {
match current_task.take() {
Some(mut task) => {
tokio::select! {
biased;
Some(cmd) = commands.recv() => {
trace!(command = ?cmd, "received command while running");
current_task = Self::handle_command_while_running(
cmd,
task,
&state,
&shutdown_config
).await;
}
result = &mut task.handle => {
let invocation_id = task.invocation_id.clone();
trace!(%invocation_id, "task completed, handling result");
Self::handle_task_completion(
result,
process.clone(),
&handle_id,
&state,
&shutdown_config,
&mut current_task
).await;
}
}
}
None => match commands.recv().await {
Some(ProcessCommand::Start) => {
if matches!(
*state.borrow(),
ProcessState::Pending | ProcessState::Stopped | ProcessState::Errored
) {
let task = RunningTask::spawn(process.clone(), &handle_id);
debug!(
invocation_id = %task.invocation_id,
"process started"
);
current_task = Some(task);
let _ = state.send(ProcessState::Running);
} else {
debug!("ignoring start command, process already running");
}
}
Some(ProcessCommand::Stop { response } | ProcessCommand::Kill { response }) => {
trace!("received stop/kill command while not running");
let _ = response.send(());
}
None => {
trace!("command channel closed, exiting run loop");
break;
}
},
}
}
trace!("exiting run loop");
}
pub async fn start(&mut self) -> anyhow::Result<()> {
info!(handle_id = %self.handle_id, "starting process");
self.command_tx
.send(ProcessCommand::Start)
.await
.map_err(|_| anyhow::anyhow!("runner terminated"))
}
pub async fn stop(&mut self) -> anyhow::Result<()> {
info!(handle_id = %self.handle_id, "stopping process");
let (resp_tx, resp_rx) = oneshot::channel();
self.command_tx
.send(ProcessCommand::Stop { response: resp_tx })
.await
.map_err(|_| anyhow::anyhow!("runner terminated"))?;
resp_rx
.await
.map_err(|_| anyhow::anyhow!("runner terminated"))
}
pub async fn kill(&mut self) -> anyhow::Result<()> {
info!(handle_id = %self.handle_id, "killing process");
let (resp_tx, resp_rx) = oneshot::channel();
self.command_tx
.send(ProcessCommand::Kill { response: resp_tx })
.await
.map_err(|_| anyhow::anyhow!("runner terminated"))?;
resp_rx
.await
.map_err(|_| anyhow::anyhow!("runner terminated"))
}
pub async fn restart(&mut self) -> anyhow::Result<()> {
info!(handle_id = %self.handle_id, "restarting process");
// Stop without the info log
let (resp_tx, resp_rx) = oneshot::channel();
self.command_tx
.send(ProcessCommand::Stop { response: resp_tx })
.await
.map_err(|_| anyhow::anyhow!("runner terminated"))?;
resp_rx
.await
.map_err(|_| anyhow::anyhow!("runner terminated"))?;
// Start without the info log
self.command_tx
.send(ProcessCommand::Start)
.await
.map_err(|_| anyhow::anyhow!("runner terminated"))
}
pub(crate) async fn wait_for(&self, desired: ProcessState) {
let mut rx = self.state.clone();
while *rx.borrow_and_update() != desired {
if rx.changed().await.is_err() {
break;
}
}
}
pub fn handle_id(&self) -> &HandleID {
&self.handle_id
}
async fn handle_command_while_running(
cmd: ProcessCommand,
task: RunningTask,
state: &watch::Sender<ProcessState>,
config: &ShutdownConfig,
) -> Option<RunningTask> {
let invocation_id = task.invocation_id.clone();
match cmd {
ProcessCommand::Start => {
debug!(%invocation_id, "ignoring start command, process already running");
Some(task)
}
ProcessCommand::Stop { response } => {
debug!(%invocation_id, timeout_secs = config.graceful_timeout.as_secs(), "initiating graceful shutdown");
task.shutdown_gracefully(config.graceful_timeout).await;
let _ = state.send(ProcessState::Stopped);
let _ = response.send(());
debug!(%invocation_id, "process stopped");
None
}
ProcessCommand::Kill { response } => {
debug!(%invocation_id, "aborting process");
task.handle.abort();
let _ = task.handle.await;
let _ = state.send(ProcessState::Stopped);
let _ = response.send(());
debug!(%invocation_id, "process killed");
None
}
}
}
async fn handle_task_completion(
result: Result<anyhow::Result<()>, tokio::task::JoinError>,
process: SharedProcess,
handle_id: &HandleID,
state: &watch::Sender<ProcessState>,
config: &ShutdownConfig,
current_task: &mut Option<RunningTask>,
) {
match result {
Ok(Ok(())) => {
debug!("process completed successfully");
if config.restart_on_success {
if !config.restart_delay.is_zero() {
debug!(delay_ms = config.restart_delay.as_millis(), "waiting before restart");
}
tokio::time::sleep(config.restart_delay).await;
let task = RunningTask::spawn(process, handle_id);
debug!(invocation_id = %task.invocation_id, "process auto-restarted");
*current_task = Some(task);
} else {
debug!("restart_on_success disabled, not restarting");
let _ = state.send(ProcessState::Stopped);
}
}
Ok(Err(e)) => {
warn!(error = %e, "process returned error");
let _ = state.send(ProcessState::Errored);
}
Err(e) if e.is_cancelled() => {
trace!("task was cancelled/aborted");
}
Err(e) => {
warn!(error = %e, "process panicked");
let _ = state.send(ProcessState::Errored);
}
}
}
}
impl Drop for Process {
fn drop(&mut self) {
trace!(handle_id = %self.handle_id, "dropping process, aborting task");
self.task.abort();
}
}
struct RunningTask {
handle: JoinHandle<anyhow::Result<()>>,
cancellation: CancellationToken,
invocation_id: InvocationID,
}
impl RunningTask {
pub fn spawn(process: SharedProcess, handle_id: &HandleID) -> Self {
let cancellation = CancellationToken::new();
let invocation_id = InvocationID::new();
trace!(%handle_id, invocation_id = %invocation_id, "spawning process task");
let span = debug_span!(
"process_invocation",
%handle_id,
invocation_id = %invocation_id
);
let handle = tokio::spawn({
let cancellation = cancellation.child_token();
async move { process.process.call_async(cancellation).await }.instrument(span)
});
Self {
handle,
cancellation,
invocation_id,
}
}
async fn shutdown_gracefully(mut self, timeout: Duration) {
trace!(invocation_id = %self.invocation_id, "cancelling process token");
self.cancellation.cancel();
match tokio::time::timeout(timeout, &mut self.handle).await {
Ok(_) => {
debug!(invocation_id = %self.invocation_id, "process stopped gracefully");
}
Err(_) => {
warn!(
invocation_id = %self.invocation_id,
timeout_secs = timeout.as_secs(),
"graceful shutdown timed out, aborting"
);
self.handle.abort();
}
}
}
}
#[derive(Debug)]
enum ProcessCommand {
Start,
Stop { response: oneshot::Sender<()> },
Kill { response: oneshot::Sender<()> },
}
#[derive(PartialEq)]
pub(crate) enum ProcessState {
Pending,
Running,
Errored,
Stopped,
}

View File

@@ -0,0 +1,44 @@
use std::{future::Future, pin::Pin, sync::Arc};
use tokio_util::sync::CancellationToken;
#[allow(async_fn_in_trait)]
pub trait ProcessHandler: Send + Sync + 'static {
fn call(
&self,
cancellation: CancellationToken,
) -> impl Future<Output = anyhow::Result<()>> + Send;
}
pub(crate) trait AsyncProcess: Send + Sync {
fn call_async(
&self,
cancellation: CancellationToken,
) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
}
impl<E: ProcessHandler> AsyncProcess for E {
fn call_async(
&self,
cancellation: CancellationToken,
) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
Box::pin(self.call(cancellation))
}
}
#[derive(Clone)]
pub(crate) struct SharedProcess {
pub(crate) process: Arc<dyn AsyncProcess + Send + Sync + 'static>,
}
pub(crate) trait IntoProcess {
fn into_process(self) -> SharedProcess;
}
impl<E: ProcessHandler> IntoProcess for E {
fn into_process(self) -> SharedProcess {
SharedProcess {
process: Arc::new(self),
}
}
}

View File

@@ -0,0 +1,17 @@
use std::time::Duration;
pub struct ShutdownConfig {
pub graceful_timeout: Duration,
pub restart_on_success: bool,
pub restart_delay: Duration,
}
impl Default for ShutdownConfig {
fn default() -> Self {
Self {
graceful_timeout: Duration::from_secs(5),
restart_on_success: true,
restart_delay: Duration::ZERO,
}
}
}