329 lines
9.7 KiB
Rust
329 lines
9.7 KiB
Rust
use anyhow::Context;
|
|
use component::churn_tasks::process::HostProcess;
|
|
use futures::StreamExt;
|
|
use std::sync::Arc;
|
|
use tokio::io::AsyncWriteExt;
|
|
use tokio::sync::Mutex;
|
|
use wasmtime::component::*;
|
|
use wasmtime::{Config, Engine, Store};
|
|
use wasmtime_wasi::{DirPerms, FilePerms, WasiCtx, WasiCtxBuilder, WasiView};
|
|
|
|
use super::config::AgentConfig;
|
|
|
|
wasmtime::component::bindgen!({
|
|
path: "wit/world.wit",
|
|
//world: "churn",
|
|
async: true,
|
|
with: {
|
|
"component:churn-tasks/process/process": CustomProcess,
|
|
"component:churn-tasks/http/client": http::HttpClient
|
|
}
|
|
});
|
|
|
|
mod http;
|
|
|
|
pub struct CustomProcess {
|
|
agent_config: AgentConfig,
|
|
}
|
|
impl CustomProcess {
|
|
pub fn new(agent_config: AgentConfig) -> Self {
|
|
Self { agent_config }
|
|
}
|
|
|
|
pub fn run(&self, args: Vec<String>) -> String {
|
|
tracing::info!("calling function");
|
|
|
|
match args.split_first() {
|
|
Some((item, rest)) => {
|
|
let mut cmd = std::process::Command::new(item);
|
|
match cmd.args(rest).output() {
|
|
Ok(output) => std::str::from_utf8(&output.stdout)
|
|
.expect("to be able to parse utf8")
|
|
.to_string(),
|
|
Err(e) => {
|
|
tracing::error!("command failed with output: {e}");
|
|
e.to_string()
|
|
}
|
|
}
|
|
}
|
|
None => {
|
|
tracing::warn!("failed to call function because it is empty");
|
|
panic!("failed to call function because it is empty")
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn get_label(&self, label_key: &str) -> Option<String> {
|
|
self.agent_config.labels.get(label_key).cloned()
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct PluginStore {
|
|
inner: Arc<Mutex<InnerPluginStore>>,
|
|
}
|
|
|
|
impl PluginStore {
|
|
pub fn new(config: AgentConfig) -> anyhow::Result<Self> {
|
|
Ok(Self {
|
|
inner: Arc::new(Mutex::new(InnerPluginStore::new(config)?)),
|
|
})
|
|
}
|
|
|
|
pub async fn id(&self, plugin: &str) -> anyhow::Result<String> {
|
|
let mut inner = self.inner.lock().await;
|
|
inner.id(plugin).await
|
|
}
|
|
|
|
pub async fn execute(&self, plugin: &str) -> anyhow::Result<()> {
|
|
let mut inner = self.inner.lock().await;
|
|
|
|
// FIXME: hack to avoid memory leak issues from instantiating plugins
|
|
*inner = InnerPluginStore::new(inner.agent_config.clone())?;
|
|
|
|
inner.execute(plugin).await
|
|
}
|
|
}
|
|
|
|
pub struct InnerPluginStore {
|
|
store: wasmtime::Store<ServerWasiView>,
|
|
linker: wasmtime::component::Linker<ServerWasiView>,
|
|
engine: wasmtime::Engine,
|
|
agent_config: AgentConfig,
|
|
}
|
|
|
|
impl InnerPluginStore {
|
|
pub fn new(agent_config: AgentConfig) -> anyhow::Result<Self> {
|
|
let mut config = Config::default();
|
|
config.wasm_component_model(true);
|
|
config.async_support(true);
|
|
let engine = Engine::new(&config)?;
|
|
let mut linker: wasmtime::component::Linker<ServerWasiView> = Linker::new(&engine);
|
|
|
|
// Add the command world (aka WASI CLI) to the linker
|
|
wasmtime_wasi::add_to_linker_async(&mut linker).context("Failed to link command world")?;
|
|
|
|
component::churn_tasks::process::add_to_linker(
|
|
&mut linker,
|
|
|state: &mut ServerWasiView| state,
|
|
)?;
|
|
|
|
component::churn_tasks::http::add_to_linker(&mut linker, |state: &mut ServerWasiView| {
|
|
state
|
|
})?;
|
|
|
|
let wasi_view = ServerWasiView::new(agent_config.clone());
|
|
let store = Store::new(&engine, wasi_view);
|
|
|
|
Ok(Self {
|
|
store,
|
|
linker,
|
|
engine,
|
|
agent_config,
|
|
})
|
|
}
|
|
|
|
pub async fn id(&mut self, plugin: &str) -> anyhow::Result<String> {
|
|
let plugin = self.ensure_plugin(plugin).await?;
|
|
|
|
plugin
|
|
.interface0
|
|
.call_id(&mut self.store)
|
|
.await
|
|
.context("Failed to call add function")
|
|
}
|
|
|
|
pub async fn execute(&mut self, plugin: &str) -> anyhow::Result<()> {
|
|
let plugin = self.ensure_plugin(plugin).await?;
|
|
|
|
self.store.gc_async().await;
|
|
|
|
if plugin
|
|
.interface0
|
|
.call_should_run(&mut self.store)
|
|
.await
|
|
.context("Failed to call should run")?
|
|
{
|
|
tracing::info!("job was marked as required to run");
|
|
return plugin
|
|
.interface0
|
|
.call_execute(&mut self.store)
|
|
.await
|
|
.context("Failed to call add function");
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn ensure_plugin(&mut self, plugin: &str) -> anyhow::Result<Churn> {
|
|
let cache = dirs::cache_dir()
|
|
.ok_or(anyhow::anyhow!("failed to find cache dir"))?
|
|
.join("io.kjuulh.churn");
|
|
|
|
let (plugin_name, plugin_version) = plugin.split_once("@").unwrap_or((plugin, "latest"));
|
|
|
|
let plugin_path = cache
|
|
.join("plugins")
|
|
.join(plugin_name)
|
|
.join(plugin_version)
|
|
.join(format!("{plugin_name}.wasm"));
|
|
|
|
let no_cache: bool = std::env::var("CHURN_NO_CACHE")
|
|
.unwrap_or("false".into())
|
|
.parse()?;
|
|
|
|
if !plugin_path.exists() || no_cache {
|
|
tracing::info!(
|
|
plugin_name = plugin_name,
|
|
plugin_version = plugin_version,
|
|
"downloading plugin"
|
|
);
|
|
if let Some(parent) = plugin_path.parent() {
|
|
tokio::fs::create_dir_all(parent).await?;
|
|
}
|
|
|
|
let req = reqwest::get(format!("https://api-minio.front.kjuulh.io/churn-registry/{plugin_name}/{plugin_version}/{plugin_name}.wasm")).await.context("failed to get plugin from registry")?;
|
|
let mut stream = req.bytes_stream();
|
|
|
|
tracing::info!(
|
|
plugin_name = plugin_name,
|
|
plugin_path = plugin_path.display().to_string(),
|
|
"writing plugin to file"
|
|
);
|
|
let mut file = tokio::fs::File::create(&plugin_path).await?;
|
|
while let Some(chunk) = stream.next().await {
|
|
let chunk = chunk?;
|
|
file.write_all(&chunk).await?;
|
|
}
|
|
file.flush().await?;
|
|
}
|
|
|
|
let component =
|
|
Component::from_file(&self.engine, plugin_path).context("Component file not found")?;
|
|
|
|
tracing::debug!(
|
|
plugin_name = plugin_name,
|
|
plugin_version = plugin_version,
|
|
"instantiating plugin"
|
|
);
|
|
let instance = Churn::instantiate_async(&mut self.store, &component, &self.linker)
|
|
.await
|
|
.context("Failed to instantiate the example world")
|
|
.unwrap();
|
|
|
|
Ok(instance)
|
|
}
|
|
}
|
|
|
|
struct ServerWasiView {
|
|
table: ResourceTable,
|
|
ctx: WasiCtx,
|
|
processes: ResourceTable,
|
|
clients: ResourceTable,
|
|
agent_config: AgentConfig,
|
|
}
|
|
|
|
impl ServerWasiView {
|
|
fn new(agent_config: AgentConfig) -> Self {
|
|
let table = ResourceTable::new();
|
|
|
|
let ctx = WasiCtxBuilder::new()
|
|
.inherit_stdio()
|
|
.inherit_stdout()
|
|
.inherit_env()
|
|
.inherit_stderr()
|
|
.inherit_network()
|
|
.preopened_dir("/", "/", DirPerms::all(), FilePerms::all())
|
|
.expect("to be able to open root")
|
|
.build();
|
|
|
|
Self {
|
|
table,
|
|
ctx,
|
|
processes: ResourceTable::default(),
|
|
clients: ResourceTable::default(),
|
|
agent_config,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl WasiView for ServerWasiView {
|
|
fn table(&mut self) -> &mut ResourceTable {
|
|
&mut self.table
|
|
}
|
|
|
|
fn ctx(&mut self) -> &mut WasiCtx {
|
|
&mut self.ctx
|
|
}
|
|
}
|
|
|
|
impl component::churn_tasks::process::Host for ServerWasiView {}
|
|
|
|
#[async_trait::async_trait]
|
|
impl HostProcess for ServerWasiView {
|
|
async fn new(
|
|
&mut self,
|
|
) -> wasmtime::component::Resource<component::churn_tasks::process::Process> {
|
|
self.processes
|
|
.push(CustomProcess::new(self.agent_config.clone()))
|
|
.unwrap()
|
|
}
|
|
|
|
async fn run_process(
|
|
&mut self,
|
|
self_: wasmtime::component::Resource<component::churn_tasks::process::Process>,
|
|
inputs: wasmtime::component::__internal::Vec<String>,
|
|
) -> String {
|
|
let process = self.processes.get(&self_).unwrap();
|
|
process.run(inputs)
|
|
}
|
|
|
|
async fn get_variable(
|
|
&mut self,
|
|
self_: wasmtime::component::Resource<component::churn_tasks::process::Process>,
|
|
key: wasmtime::component::__internal::String,
|
|
) -> String {
|
|
let process = self.processes.get(&self_).unwrap();
|
|
process.get_label(&key).unwrap()
|
|
}
|
|
|
|
async fn drop(
|
|
&mut self,
|
|
rep: wasmtime::component::Resource<component::churn_tasks::process::Process>,
|
|
) -> wasmtime::Result<()> {
|
|
self.processes.delete(rep)?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl component::churn_tasks::http::Host for ServerWasiView {}
|
|
|
|
#[async_trait::async_trait]
|
|
impl component::churn_tasks::http::HostClient for ServerWasiView {
|
|
async fn new(&mut self) -> wasmtime::component::Resource<component::churn_tasks::http::Client> {
|
|
self.clients.push(http::HttpClient::new()).unwrap()
|
|
}
|
|
|
|
async fn get(
|
|
&mut self,
|
|
self_: wasmtime::component::Resource<component::churn_tasks::http::Client>,
|
|
url: wasmtime::component::__internal::String,
|
|
) -> Vec<u8> {
|
|
let process = self.clients.get(&self_).unwrap();
|
|
process
|
|
.get(&url)
|
|
.await
|
|
.expect("to be able to make http call")
|
|
}
|
|
|
|
async fn drop(
|
|
&mut self,
|
|
rep: wasmtime::component::Resource<component::churn_tasks::http::Client>,
|
|
) -> wasmtime::Result<()> {
|
|
self.clients.delete(rep)?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|