Files
churn-v2/crates/churn/src/agent/queue.rs
kjuulh 6647bb89be
All checks were successful
continuous-integration/drone/push Build is passing
feat: add queue
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 21:24:50 +01:00

68 lines
1.6 KiB
Rust

use std::sync::Arc;
use notmad::{Component, MadError};
use tokio::sync::Mutex;
use super::{models::Commands, scheduler::Scheduler};
#[derive(Clone)]
pub struct AgentQueue {
sender: Arc<tokio::sync::mpsc::Sender<Commands>>,
receiver: Arc<Mutex<tokio::sync::mpsc::Receiver<Commands>>>,
scheduler: Scheduler,
}
impl AgentQueue {
pub fn new(scheduler: Scheduler) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(5);
Self {
sender: Arc::new(tx),
receiver: Arc::new(Mutex::new(rx)),
scheduler,
}
}
pub async fn handler(&self, command: Commands) -> anyhow::Result<()> {
tracing::debug!("handling task");
self.scheduler.handle(command).await?;
Ok(())
}
pub async fn publish(&self, command: Commands) -> anyhow::Result<()> {
tracing::debug!("publishing task: {}", command.to_string());
self.sender.send(command).await?;
Ok(())
}
}
#[async_trait::async_trait]
impl Component for AgentQueue {
async fn run(
&self,
cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), notmad::MadError> {
loop {
let mut recv = self.receiver.lock().await;
tokio::select! {
res = recv.recv() => {
if let Some(res) = res {
self.handler(res).await.map_err(MadError::Inner)?;
}
}
_ = cancellation_token.cancelled() => {
break
}
}
}
Ok(())
}
}