use anyhow::Context; use component::churn_tasks::process::HostProcess; use futures::StreamExt; use std::sync::Arc; use tokio::io::AsyncWriteExt; use tokio::sync::Mutex; use wasmtime::component::*; use wasmtime::{Config, Engine, Store}; use wasmtime_wasi::{DirPerms, FilePerms, WasiCtx, WasiCtxBuilder, WasiView}; use super::config::AgentConfig; wasmtime::component::bindgen!({ path: "wit/world.wit", //world: "churn", async: true, with: { "component:churn-tasks/process/process": CustomProcess, "component:churn-tasks/http/client": http::HttpClient } }); mod http; pub struct CustomProcess { agent_config: AgentConfig, } impl CustomProcess { pub fn new(agent_config: AgentConfig) -> Self { Self { agent_config } } pub fn run(&self, args: Vec) -> String { tracing::info!("calling function"); match args.split_first() { Some((item, rest)) => { let mut cmd = std::process::Command::new(item); match cmd.args(rest).output() { Ok(output) => std::str::from_utf8(&output.stdout) .expect("to be able to parse utf8") .to_string(), Err(e) => { tracing::error!("command failed with output: {e}"); e.to_string() } } } None => { tracing::warn!("failed to call function because it is empty"); panic!("failed to call function because it is empty") } } } pub fn get_label(&self, label_key: &str) -> Option { self.agent_config.labels.get(label_key).cloned() } } #[derive(Clone)] pub struct PluginStore { inner: Arc>, } impl PluginStore { pub fn new(config: AgentConfig) -> anyhow::Result { Ok(Self { inner: Arc::new(Mutex::new(InnerPluginStore::new(config)?)), }) } pub async fn id(&self, plugin: &str) -> anyhow::Result { let mut inner = self.inner.lock().await; inner.id(plugin).await } pub async fn execute(&self, plugin: &str) -> anyhow::Result<()> { let mut inner = self.inner.lock().await; // FIXME: hack to avoid memory leak issues from instantiating plugins *inner = InnerPluginStore::new(inner.agent_config.clone())?; inner.execute(plugin).await } } pub struct InnerPluginStore { store: wasmtime::Store, linker: wasmtime::component::Linker, engine: wasmtime::Engine, agent_config: AgentConfig, } impl InnerPluginStore { pub fn new(agent_config: AgentConfig) -> anyhow::Result { let mut config = Config::default(); config.wasm_component_model(true); config.async_support(true); let engine = Engine::new(&config)?; let mut linker: wasmtime::component::Linker = Linker::new(&engine); // Add the command world (aka WASI CLI) to the linker wasmtime_wasi::add_to_linker_async(&mut linker).context("Failed to link command world")?; component::churn_tasks::process::add_to_linker( &mut linker, |state: &mut ServerWasiView| state, )?; component::churn_tasks::http::add_to_linker(&mut linker, |state: &mut ServerWasiView| { state })?; let wasi_view = ServerWasiView::new(agent_config.clone()); let store = Store::new(&engine, wasi_view); Ok(Self { store, linker, engine, agent_config, }) } pub async fn id(&mut self, plugin: &str) -> anyhow::Result { let plugin = self.ensure_plugin(plugin).await?; plugin .interface0 .call_id(&mut self.store) .await .context("Failed to call add function") } pub async fn execute(&mut self, plugin: &str) -> anyhow::Result<()> { let plugin = self.ensure_plugin(plugin).await?; self.store.gc_async().await; if plugin .interface0 .call_should_run(&mut self.store) .await .context("Failed to call should run")? { tracing::info!("job was marked as required to run"); return plugin .interface0 .call_execute(&mut self.store) .await .context("Failed to call add function"); } Ok(()) } async fn ensure_plugin(&mut self, plugin: &str) -> anyhow::Result { let cache = dirs::cache_dir() .ok_or(anyhow::anyhow!("failed to find cache dir"))? .join("io.kjuulh.churn"); let (plugin_name, plugin_version) = plugin.split_once("@").unwrap_or((plugin, "latest")); let plugin_path = cache .join("plugins") .join(plugin_name) .join(plugin_version) .join(format!("{plugin_name}.wasm")); let no_cache: bool = std::env::var("CHURN_NO_CACHE") .unwrap_or("false".into()) .parse()?; if !plugin_path.exists() || no_cache { tracing::info!( plugin_name = plugin_name, plugin_version = plugin_version, "downloading plugin" ); if let Some(parent) = plugin_path.parent() { tokio::fs::create_dir_all(parent).await?; } let req = reqwest::get(format!("https://api-minio.front.kjuulh.io/churn-registry/{plugin_name}/{plugin_version}/{plugin_name}.wasm")).await.context("failed to get plugin from registry")?; let mut stream = req.bytes_stream(); tracing::info!( plugin_name = plugin_name, plugin_path = plugin_path.display().to_string(), "writing plugin to file" ); let mut file = tokio::fs::File::create(&plugin_path).await?; while let Some(chunk) = stream.next().await { let chunk = chunk?; file.write_all(&chunk).await?; } file.flush().await?; } let component = Component::from_file(&self.engine, plugin_path).context("Component file not found")?; tracing::debug!( plugin_name = plugin_name, plugin_version = plugin_version, "instantiating plugin" ); let instance = Churn::instantiate_async(&mut self.store, &component, &self.linker) .await .context("Failed to instantiate the example world") .unwrap(); Ok(instance) } } struct ServerWasiView { table: ResourceTable, ctx: WasiCtx, processes: ResourceTable, clients: ResourceTable, agent_config: AgentConfig, } impl ServerWasiView { fn new(agent_config: AgentConfig) -> Self { let table = ResourceTable::new(); let ctx = WasiCtxBuilder::new() .inherit_stdio() .inherit_stdout() .inherit_env() .inherit_stderr() .inherit_network() .preopened_dir("/", "/", DirPerms::all(), FilePerms::all()) .expect("to be able to open root") .build(); Self { table, ctx, processes: ResourceTable::default(), clients: ResourceTable::default(), agent_config, } } } impl WasiView for ServerWasiView { fn table(&mut self) -> &mut ResourceTable { &mut self.table } fn ctx(&mut self) -> &mut WasiCtx { &mut self.ctx } } impl component::churn_tasks::process::Host for ServerWasiView {} #[async_trait::async_trait] impl HostProcess for ServerWasiView { async fn new( &mut self, ) -> wasmtime::component::Resource { self.processes .push(CustomProcess::new(self.agent_config.clone())) .unwrap() } async fn run_process( &mut self, self_: wasmtime::component::Resource, inputs: wasmtime::component::__internal::Vec, ) -> String { let process = self.processes.get(&self_).unwrap(); process.run(inputs) } async fn get_variable( &mut self, self_: wasmtime::component::Resource, key: wasmtime::component::__internal::String, ) -> String { let process = self.processes.get(&self_).unwrap(); process.get_label(&key).unwrap() } async fn drop( &mut self, rep: wasmtime::component::Resource, ) -> wasmtime::Result<()> { self.processes.delete(rep)?; Ok(()) } } impl component::churn_tasks::http::Host for ServerWasiView {} #[async_trait::async_trait] impl component::churn_tasks::http::HostClient for ServerWasiView { async fn new(&mut self) -> wasmtime::component::Resource { self.clients.push(http::HttpClient::new()).unwrap() } async fn get( &mut self, self_: wasmtime::component::Resource, url: wasmtime::component::__internal::String, ) -> Vec { let process = self.clients.get(&self_).unwrap(); process .get(&url) .await .expect("to be able to make http call") } async fn drop( &mut self, rep: wasmtime::component::Resource, ) -> wasmtime::Result<()> { self.clients.delete(rep)?; Ok(()) } }