feat: add basic process manager
Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
12
crates/noprocess/Cargo.toml
Normal file
12
crates/noprocess/Cargo.toml
Normal file
@@ -0,0 +1,12 @@
|
||||
[package]
|
||||
name = "noprocess"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
thiserror = "2.0.17"
|
||||
tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tracing.workspace = true
|
||||
uuid = { version = "1.19.0", features = ["v4", "v7"] }
|
||||
373
crates/noprocess/src/lib.rs
Normal file
373
crates/noprocess/src/lib.rs
Normal file
@@ -0,0 +1,373 @@
|
||||
use std::{collections::BTreeMap, fmt::Display, pin::Pin, sync::Arc, time::Duration};
|
||||
|
||||
use tokio::{
|
||||
sync::{Mutex, mpsc, oneshot, watch},
|
||||
task::JoinHandle,
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
pub struct ProcessManager {
|
||||
inner: Arc<Mutex<InnerProcessManager>>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct InnerProcessManager {
|
||||
handles: BTreeMap<HandleID, Process>,
|
||||
}
|
||||
|
||||
impl InnerProcessManager {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
handles: BTreeMap::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_process(&mut self, process: Process) -> &mut Self {
|
||||
self.handles.insert(process.handle_id.clone(), process);
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn start_process(&mut self, handle_id: &HandleID) -> anyhow::Result<Option<()>> {
|
||||
let Some(handle) = self.handles.get_mut(handle_id) else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
handle.start().await?;
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub async fn restart_process(&mut self, handle_id: &HandleID) -> anyhow::Result<Option<()>> {
|
||||
let Some(handle) = self.handles.get_mut(handle_id) else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
handle.restart().await?;
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub async fn stop_process(&mut self, handle_id: &HandleID) -> anyhow::Result<Option<()>> {
|
||||
let Some(handle) = self.handles.get_mut(handle_id) else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
handle.stop().await?;
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Process {
|
||||
handle_id: HandleID,
|
||||
command_tx: mpsc::Sender<ProcessCommand>,
|
||||
state: watch::Receiver<ProcessState>,
|
||||
task: tokio::task::JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl Process {
|
||||
pub fn new(process: impl IntoProcess) -> Self {
|
||||
Self::new_with_handle(HandleID::new_random(), process)
|
||||
}
|
||||
|
||||
pub fn new_with_handle(handle_id: impl Into<HandleID>, process: impl IntoProcess) -> Self {
|
||||
let handle_id = handle_id.into();
|
||||
let process = process.into_process();
|
||||
let shutdown_config = ShutdownConfig::default();
|
||||
let (command_tx, command_rx) = mpsc::channel(1);
|
||||
let (event_tx, event_rx) = watch::channel(ProcessState::Pending);
|
||||
|
||||
let handle = tokio::spawn(Self::run_loop(
|
||||
handle_id.clone(),
|
||||
process,
|
||||
command_rx,
|
||||
shutdown_config,
|
||||
event_tx,
|
||||
));
|
||||
|
||||
Self {
|
||||
handle_id,
|
||||
state: event_rx,
|
||||
command_tx,
|
||||
|
||||
task: handle,
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_loop(
|
||||
handle_id: HandleID,
|
||||
process: SharedProcess,
|
||||
mut commands: mpsc::Receiver<ProcessCommand>,
|
||||
shutdown_config: ShutdownConfig,
|
||||
state: watch::Sender<ProcessState>,
|
||||
) {
|
||||
let mut current_task: Option<RunningTask> = None;
|
||||
|
||||
loop {
|
||||
match current_task.take() {
|
||||
Some(mut task) => {
|
||||
tokio::select! {
|
||||
biased;
|
||||
Some(cmd) = commands.recv() => {
|
||||
current_task = Self::handle_command_while_running(
|
||||
cmd,
|
||||
task,
|
||||
&state,
|
||||
&shutdown_config
|
||||
).await;
|
||||
}
|
||||
|
||||
result = &mut task.handle => {
|
||||
Self::handle_task_completion(result, process.clone(), &state, &shutdown_config, &mut current_task).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
None => match commands.recv().await {
|
||||
Some(ProcessCommand::Start) => {
|
||||
if matches!(
|
||||
*state.borrow(),
|
||||
ProcessState::Pending | ProcessState::Stopped | ProcessState::Errored
|
||||
) {
|
||||
current_task = Some(RunningTask::spawn(process.clone()));
|
||||
let _ = state.send(ProcessState::Running);
|
||||
}
|
||||
}
|
||||
Some(ProcessCommand::Stop { response } | ProcessCommand::Kill { response }) => {
|
||||
let _ = response.send(());
|
||||
}
|
||||
None => break,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start(&mut self) -> anyhow::Result<()> {
|
||||
self.command_tx
|
||||
.send(ProcessCommand::Start)
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("runner terminated"))
|
||||
}
|
||||
|
||||
pub async fn stop(&mut self) -> anyhow::Result<()> {
|
||||
let (resp_tx, resp_rx) = oneshot::channel();
|
||||
|
||||
self.command_tx
|
||||
.send(ProcessCommand::Stop { response: resp_tx })
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("runner terminated"))?;
|
||||
|
||||
resp_rx
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("runner terminated"))
|
||||
}
|
||||
|
||||
pub async fn kill(&mut self) -> anyhow::Result<()> {
|
||||
let (resp_tx, resp_rx) = oneshot::channel();
|
||||
|
||||
self.command_tx
|
||||
.send(ProcessCommand::Kill { response: resp_tx })
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("runner terminated"))?;
|
||||
|
||||
resp_rx
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("runner terminated"))
|
||||
}
|
||||
|
||||
pub async fn restart(&mut self) -> anyhow::Result<()> {
|
||||
self.stop().await?;
|
||||
self.start().await
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_for(&self, desired: ProcessState) {
|
||||
let mut rx = self.state.clone();
|
||||
while *rx.borrow_and_update() != desired {
|
||||
if rx.changed().await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_id(&self) -> &HandleID {
|
||||
&self.handle_id
|
||||
}
|
||||
|
||||
async fn handle_command_while_running(
|
||||
cmd: ProcessCommand,
|
||||
task: RunningTask,
|
||||
state: &watch::Sender<ProcessState>,
|
||||
config: &ShutdownConfig,
|
||||
) -> Option<RunningTask> {
|
||||
match cmd {
|
||||
ProcessCommand::Start => Some(task),
|
||||
ProcessCommand::Stop { response } => {
|
||||
task.shutdown_gracefully(config.graceful_timeout).await;
|
||||
let _ = state.send(ProcessState::Stopped);
|
||||
let _ = response.send(());
|
||||
None
|
||||
}
|
||||
ProcessCommand::Kill { response } => {
|
||||
task.handle.abort();
|
||||
let _ = task.handle.await;
|
||||
let _ = state.send(ProcessState::Stopped);
|
||||
let _ = response.send(());
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_task_completion(
|
||||
result: Result<anyhow::Result<()>, tokio::task::JoinError>,
|
||||
process: SharedProcess,
|
||||
state: &watch::Sender<ProcessState>,
|
||||
config: &ShutdownConfig,
|
||||
current_task: &mut Option<RunningTask>,
|
||||
) {
|
||||
match result {
|
||||
Ok(Ok(())) => {
|
||||
if config.restart_on_success {
|
||||
tokio::time::sleep(config.restart_delay).await;
|
||||
*current_task = Some(RunningTask::spawn(process));
|
||||
} else {
|
||||
let _ = state.send(ProcessState::Stopped);
|
||||
}
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
tracing::error!("process error: {e:#}");
|
||||
let _ = state.send(ProcessState::Errored);
|
||||
}
|
||||
Err(e) if e.is_cancelled() => {
|
||||
// Aborted - state already set by kill handler
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("process panicked: {e}");
|
||||
let _ = state.send(ProcessState::Errored);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Process {
|
||||
fn drop(&mut self) {
|
||||
self.task.abort();
|
||||
}
|
||||
}
|
||||
|
||||
struct RunningTask {
|
||||
handle: JoinHandle<anyhow::Result<()>>,
|
||||
cancellation: CancellationToken,
|
||||
}
|
||||
|
||||
impl RunningTask {
|
||||
pub fn spawn(process: SharedProcess) -> Self {
|
||||
let cancellation = CancellationToken::new();
|
||||
let handle = tokio::spawn({
|
||||
let cancellation = cancellation.child_token();
|
||||
async move { process.process.call_async(cancellation).await }
|
||||
});
|
||||
|
||||
Self {
|
||||
handle,
|
||||
cancellation,
|
||||
}
|
||||
}
|
||||
|
||||
async fn shutdown_gracefully(mut self, timeout: Duration) {
|
||||
self.cancellation.cancel();
|
||||
match tokio::time::timeout(timeout, &mut self.handle).await {
|
||||
Ok(_) => {
|
||||
tracing::debug!("process stopped")
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::error!("graceful shutdown timed out, process will be dropped");
|
||||
self.handle.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum ProcessCommand {
|
||||
Start,
|
||||
Stop { response: oneshot::Sender<()> },
|
||||
Kill { response: oneshot::Sender<()> },
|
||||
}
|
||||
|
||||
pub struct ShutdownConfig {
|
||||
pub graceful_timeout: Duration,
|
||||
pub restart_on_success: bool,
|
||||
pub restart_delay: Duration,
|
||||
}
|
||||
|
||||
impl Default for ShutdownConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
graceful_timeout: Duration::from_secs(5),
|
||||
restart_on_success: true,
|
||||
restart_delay: Duration::ZERO,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq)]
|
||||
pub(crate) enum ProcessState {
|
||||
Pending,
|
||||
Running,
|
||||
Errored,
|
||||
Stopped,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Ord, Eq, PartialEq, PartialOrd)]
|
||||
pub struct HandleID(String);
|
||||
impl HandleID {
|
||||
fn new_random() -> Self {
|
||||
Self(uuid::Uuid::now_v7().to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for HandleID {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(async_fn_in_trait)]
|
||||
pub trait ProcessHandler: Send + Sync + 'static {
|
||||
fn call(
|
||||
&self,
|
||||
cancellation: CancellationToken,
|
||||
) -> impl Future<Output = anyhow::Result<()>> + Send;
|
||||
}
|
||||
|
||||
trait AsyncProcess: Send + Sync {
|
||||
fn call_async(
|
||||
&self,
|
||||
cancellation: CancellationToken,
|
||||
) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
|
||||
}
|
||||
|
||||
impl<E: ProcessHandler> AsyncProcess for E {
|
||||
fn call_async(
|
||||
&self,
|
||||
cancellation: CancellationToken,
|
||||
) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
|
||||
Box::pin(self.call(cancellation))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SharedProcess {
|
||||
process: Arc<dyn AsyncProcess + Send + Sync + 'static>,
|
||||
}
|
||||
|
||||
pub trait IntoProcess {
|
||||
fn into_process(self) -> SharedProcess;
|
||||
}
|
||||
|
||||
impl<E: ProcessHandler> IntoProcess for E {
|
||||
fn into_process(self) -> SharedProcess {
|
||||
SharedProcess {
|
||||
process: Arc::new(self),
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user