Files
churn-v2/crates/churn/src/agent/event_handler.rs
kjuulh ea5adb2f93
All checks were successful
continuous-integration/drone/push Build is passing
feat: add common queue
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 21:08:37 +01:00

64 lines
1.6 KiB
Rust

use notmad::{Component, MadError};
use crate::agent::models::Commands;
use super::{
agent_state::AgentState, config::AgentConfig, grpc_client::GrpcClient, queue::AgentQueue,
};
#[derive(Clone)]
pub struct EventHandler {
config: AgentConfig,
grpc: GrpcClient,
queue: AgentQueue,
}
impl EventHandler {
pub fn new(state: impl Into<AgentState>) -> Self {
let state: AgentState = state.into();
Self {
config: state.config.clone(),
grpc: state.grpc.clone(),
queue: state.queue.clone(),
}
}
}
#[async_trait::async_trait]
impl Component for EventHandler {
fn name(&self) -> Option<String> {
Some("event_handler".into())
}
async fn run(
&self,
cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), notmad::MadError> {
tokio::select! {
_ = cancellation_token.cancelled() => {},
res = self.grpc.listen_events("agents", None::<String>, self.clone()) => {
res.map_err(MadError::Inner)?;
},
res = self.grpc.listen_events("agents", Some(&self.config.agent_id), self.clone()) => {
res.map_err(MadError::Inner)?;
}
}
Ok(())
}
}
#[async_trait::async_trait]
impl super::grpc_client::ListenEventsExecutor for EventHandler {
async fn execute(&self, event: crate::grpc::ListenEventsResponse) -> anyhow::Result<()> {
tracing::info!(value = event.id, "received event");
let event: Commands = serde_json::from_str(&event.value)?;
self.queue.publish(event).await?;
Ok(())
}
}