Compare commits
48 Commits
1dea5f29ac
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 850f74f667 | |||
| 76b21aab07 | |||
| 7d7e760e5e | |||
| 947303c5e0 | |||
| 06a9dd10e1 | |||
| 55befef95b | |||
|
53cc689dc4
|
|||
|
1c20383de6
|
|||
|
53c15a653f
|
|||
|
9c5cb6667e
|
|||
|
b0c40196b6
|
|||
|
a28a5ca6ee
|
|||
|
ea6bfc9c04
|
|||
|
844f8519d5
|
|||
|
1508fbb2bf
|
|||
|
ef6ae3f2b1
|
|||
|
8923c60d9e
|
|||
| efec76d28c | |||
| 03e23c7d9d | |||
| 83294306a4 | |||
| ceaad75057 | |||
| b86fa54671 | |||
| aecdace4ee | |||
| c98761622d | |||
| 825f612aea | |||
| c12b02ba92 | |||
| c5e5307682 | |||
| 5fb59ad992 | |||
| e21663c8bd | |||
| 39a01778b2 | |||
| f1cdf3ae20 | |||
|
355587234e
|
|||
|
21a13f3444
|
|||
|
5e1b585a2d
|
|||
|
94025a02ce
|
|||
|
db4cc98643
|
|||
|
2387a70778
|
|||
|
d6fdda0e4e
|
|||
|
974e1ee0d6
|
|||
|
717b052a88
|
|||
|
9b52376e7a
|
|||
|
7b222af1dd
|
|||
|
150c7c3c98
|
|||
|
8b064c2169
|
|||
|
879737eedd
|
|||
|
818cc6c671
|
|||
|
e759239243
|
|||
|
b37674987e
|
6
.env
6
.env
@@ -2,4 +2,8 @@ EXTERNAL_HOST=http://localhost:3000
|
|||||||
PROCESS_HOST=http://localhost:7900
|
PROCESS_HOST=http://localhost:7900
|
||||||
SERVICE_HOST=127.0.0.1:3000
|
SERVICE_HOST=127.0.0.1:3000
|
||||||
DISCOVERY_HOST=http://127.0.0.1:3000
|
DISCOVERY_HOST=http://127.0.0.1:3000
|
||||||
RUST_LOG=h2=warn,debug
|
|
||||||
|
#EXTERNAL_HOST=http://localhost:3000
|
||||||
|
#PROCESS_HOST=http://localhost:7900
|
||||||
|
#SERVICE_HOST=127.0.0.1:3000
|
||||||
|
#DISCOVERY_HOST=https://churn.prod.kjuulh.app
|
||||||
|
|||||||
165
CHANGELOG.md
Normal file
165
CHANGELOG.md
Normal file
@@ -0,0 +1,165 @@
|
|||||||
|
# Changelog
|
||||||
|
All notable changes to this project will be documented in this file.
|
||||||
|
|
||||||
|
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||||
|
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||||
|
|
||||||
|
## [Unreleased]
|
||||||
|
|
||||||
|
## [0.1.0] - 2025-01-11
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- add cuddle please
|
||||||
|
- enable churn update service
|
||||||
|
- add updater to install script
|
||||||
|
- add updater to install script
|
||||||
|
- add http client
|
||||||
|
- run more often
|
||||||
|
- enable checking if it should actually run
|
||||||
|
- enable having get variable from local setup
|
||||||
|
- inherit output as well
|
||||||
|
- allow process from external code
|
||||||
|
- add inherit
|
||||||
|
- add default no labels
|
||||||
|
- warn all targets
|
||||||
|
- update with web assembly components
|
||||||
|
- add labels to config
|
||||||
|
- add abstraction around task
|
||||||
|
- enable webpki roots
|
||||||
|
- add short connect timeout
|
||||||
|
- more error logging
|
||||||
|
- stop the service if running
|
||||||
|
- setup stream logging
|
||||||
|
- update script with warn
|
||||||
|
- disable force again
|
||||||
|
- make curl silent"
|
||||||
|
- force update
|
||||||
|
- use public prod
|
||||||
|
- run as root
|
||||||
|
- agent is already setup
|
||||||
|
- allow errors
|
||||||
|
- some more debugging
|
||||||
|
- some more debugging
|
||||||
|
- stderr to stdout as well
|
||||||
|
- this should work
|
||||||
|
- when config has already been setup
|
||||||
|
- add agent start as well
|
||||||
|
- update with agent setup
|
||||||
|
- add install script
|
||||||
|
- add comments
|
||||||
|
- use actual internal
|
||||||
|
- reqwest as native build
|
||||||
|
- use internal
|
||||||
|
- add external service host
|
||||||
|
- add grpc host
|
||||||
|
- add external vars
|
||||||
|
- add grpc and env
|
||||||
|
- add queue
|
||||||
|
- add common queue
|
||||||
|
- add discovery
|
||||||
|
- add tonic
|
||||||
|
- added tonic
|
||||||
|
- added longer timer
|
||||||
|
- fix error message
|
||||||
|
- add agent
|
||||||
|
- add churn v2
|
||||||
|
- initial v2 commit
|
||||||
|
- reset
|
||||||
|
- update
|
||||||
|
- update
|
||||||
|
- update stuff
|
||||||
|
- update
|
||||||
|
- with drone
|
||||||
|
- with agent db
|
||||||
|
- with sled db and capnp
|
||||||
|
- with sled db
|
||||||
|
- with basic changelog
|
||||||
|
- with basic package
|
||||||
|
- with publish
|
||||||
|
- with monitoring
|
||||||
|
- with monitor
|
||||||
|
- with extra churning repl thingy
|
||||||
|
- with enroll
|
||||||
|
- add initial churn
|
||||||
|
- add simple health check
|
||||||
|
|
||||||
|
### Docs
|
||||||
|
- update readme
|
||||||
|
next up is differentiating the different agents, such that we can execute commands from the cli to for example update dependencies on all machines, restart machines etc.
|
||||||
|
- add installation docs
|
||||||
|
- add notes
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
- use actual names for files
|
||||||
|
- *(deps)* update rust crate serde to v1.0.217
|
||||||
|
- *(deps)* update rust crate serde_json to v1.0.134
|
||||||
|
- *(deps)* update all dependencies to v28
|
||||||
|
- *(deps)* update rust crate nodrift to 0.3.0
|
||||||
|
- *(deps)* update rust crate serde to v1.0.216
|
||||||
|
- *(deps)* update tokio-prost monorepo to v0.13.4
|
||||||
|
- *(deps)* update rust crate tokio-util to v0.7.13
|
||||||
|
- *(deps)* update rust crate bytes to v1.9.0
|
||||||
|
- *(deps)* update rust crate tower-http to 0.6.0
|
||||||
|
- *(deps)* update all dependencies
|
||||||
|
- *(deps)* update rust crate capnp to 0.19.5
|
||||||
|
- *(deps)* update rust crate capnp to 0.19.4
|
||||||
|
|
||||||
|
### Other
|
||||||
|
- update final repo
|
||||||
|
- update lock"
|
||||||
|
- update default schedule
|
||||||
|
- *(deps)* update rust crate anyhow to v1.0.95
|
||||||
|
- *(deps)* update rust crate clap to v4.5.23
|
||||||
|
- *(deps)* update all dependencies
|
||||||
|
- *(deps)* update rust crate tracing-subscriber to v0.3.19
|
||||||
|
- *(deps)* update rust crate tracing to v0.1.41
|
||||||
|
- *(deps)* update rust crate serde to v1.0.215
|
||||||
|
- *(deps)* update rust crate serde to v1.0.214
|
||||||
|
- *(deps)* update rust crate serde to v1.0.213
|
||||||
|
- *(deps)* update rust crate serde to v1.0.210
|
||||||
|
- *(deps)* update rust crate serde to v1.0.209
|
||||||
|
- *(deps)* update rust crate serde_json to v1.0.126
|
||||||
|
- *(deps)* update all dependencies
|
||||||
|
- *(deps)* update rust crate serde to v1.0.208
|
||||||
|
- *(deps)* update all dependencies
|
||||||
|
- *(deps)* update rust crate serde to v1.0.203
|
||||||
|
- *(deps)* update rust crate anyhow to 1.0.86
|
||||||
|
- *(deps)* update rust crate anyhow to 1.0.85
|
||||||
|
- *(deps)* update rust crate anyhow to 1.0.84
|
||||||
|
- *(deps)* update rust crate itertools to 0.13.0
|
||||||
|
- *(deps)* update rust crate anyhow to 1.0.83
|
||||||
|
- *(deps)* update rust crate reqwest to 0.12.4
|
||||||
|
- *(deps)* update rust crate chrono to 0.4.38
|
||||||
|
- *(deps)* update rust crate anyhow to 1.0.82
|
||||||
|
- Merge pull request 'chore(release): v0.1.0' (#4) from cuddle-please/release into main
|
||||||
|
|
||||||
|
Reviewed-on: https://git.front.kjuulh.io/kjuulh/churn/pulls/4
|
||||||
|
|
||||||
|
- *(release)* 0.1.0
|
||||||
|
- *(test)* test commit
|
||||||
|
- *(test)* test commit
|
||||||
|
- *(test)* test commit
|
||||||
|
- *(test)* test commit
|
||||||
|
- Merge pull request 'chore(deps): update all dependencies' (#2) from renovate/all into main
|
||||||
|
|
||||||
|
Reviewed-on: https://git.front.kjuulh.io/kjuulh/churn/pulls/2
|
||||||
|
|
||||||
|
- *(deps)* update all dependencies
|
||||||
|
- change to byte slice
|
||||||
|
- fmt
|
||||||
|
- fmt
|
||||||
|
- Add renovate.json
|
||||||
|
|
||||||
|
- Release churn-server v0.1.0
|
||||||
|
|
||||||
|
- Release churn-agent v0.1.0
|
||||||
|
|
||||||
|
- Release churn v0.1.0
|
||||||
|
|
||||||
|
- Release churn v0.1.0
|
||||||
|
|
||||||
|
- Release churn-domain v0.1.0, churn v0.1.0
|
||||||
|
|
||||||
|
- with changelog
|
||||||
|
- Release churn-domain v0.1.0, churn v0.1.0
|
||||||
|
|
||||||
1819
Cargo.lock
generated
1819
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -3,7 +3,6 @@ members = ["crates/*"]
|
|||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
churn = { path = "crates/churn" }
|
|
||||||
|
|
||||||
anyhow = { version = "1" }
|
anyhow = { version = "1" }
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
@@ -12,3 +11,6 @@ tracing-subscriber = { version = "0.3.18" }
|
|||||||
clap = { version = "4", features = ["derive", "env"] }
|
clap = { version = "4", features = ["derive", "env"] }
|
||||||
dotenv = { version = "0.15" }
|
dotenv = { version = "0.15" }
|
||||||
axum = { version = "0.7" }
|
axum = { version = "0.7" }
|
||||||
|
|
||||||
|
[workspace.package]
|
||||||
|
version = "0.1.0"
|
||||||
|
|||||||
26
README.md
26
README.md
@@ -1 +1,27 @@
|
|||||||
# churn
|
# churn
|
||||||
|
|
||||||
|
## Installation
|
||||||
|
|
||||||
|
To install churn, you need first of all a server and agents.
|
||||||
|
|
||||||
|
Servers can be run via. docker.
|
||||||
|
|
||||||
|
```shell
|
||||||
|
docker run docker.io/kjuulh/churn-v2:latest
|
||||||
|
```
|
||||||
|
|
||||||
|
To install an agent run the following script
|
||||||
|
|
||||||
|
```shell
|
||||||
|
curl https://git.front.kjuulh.io/kjuulh/churn-v2/raw/branch/main/install.sh | bash
|
||||||
|
```
|
||||||
|
|
||||||
|
configure `~/.local/share/io.kjuulh.churn-agent/churn-agent.toml` use an editor of choice. Churn agent will generate a randomish name for the specific agent, consider giving it something more semantically meaningful to you
|
||||||
|
|
||||||
|
## CLI (TBD)
|
||||||
|
|
||||||
|
Using the churn cli allows sending specific commands to a set of agents
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
```
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ tower-http = { version = "0.6.0", features = ["cors", "trace"] }
|
|||||||
notmad = "0.7.1"
|
notmad = "0.7.1"
|
||||||
tokio-util = "0.7.12"
|
tokio-util = "0.7.12"
|
||||||
async-trait = "0.1.83"
|
async-trait = "0.1.83"
|
||||||
nodrift = "0.2.0"
|
nodrift = "0.3.0"
|
||||||
rusqlite = { version = "0.32.1", features = ["bundled"] }
|
rusqlite = { version = "0.32.1", features = ["bundled"] }
|
||||||
prost-types = "0.13.3"
|
prost-types = "0.13.3"
|
||||||
prost = "0.13.3"
|
prost = "0.13.3"
|
||||||
@@ -32,5 +32,9 @@ reqwest = { version = "0.12.9", default-features = false, features = [
|
|||||||
"http2",
|
"http2",
|
||||||
"charset",
|
"charset",
|
||||||
"native-tls-vendored",
|
"native-tls-vendored",
|
||||||
|
"stream",
|
||||||
] }
|
] }
|
||||||
serde_json = "1.0.133"
|
serde_json = "1.0.133"
|
||||||
|
wasmtime = "28.0.0"
|
||||||
|
wasmtime-wasi = "28.0.0"
|
||||||
|
petname = "2.0.2"
|
||||||
|
|||||||
@@ -5,12 +5,14 @@ use refresh::AgentRefresh;
|
|||||||
pub use config::setup_config;
|
pub use config::setup_config;
|
||||||
|
|
||||||
pub mod models;
|
pub mod models;
|
||||||
|
pub(crate) mod task;
|
||||||
|
|
||||||
mod agent_state;
|
mod agent_state;
|
||||||
mod config;
|
mod config;
|
||||||
mod discovery_client;
|
mod discovery_client;
|
||||||
mod event_handler;
|
mod event_handler;
|
||||||
mod grpc_client;
|
mod grpc_client;
|
||||||
|
mod plugins;
|
||||||
mod queue;
|
mod queue;
|
||||||
mod refresh;
|
mod refresh;
|
||||||
mod scheduler;
|
mod scheduler;
|
||||||
|
|||||||
@@ -1,59 +1,24 @@
|
|||||||
use anyhow::Context;
|
use apt::AptTask;
|
||||||
|
use plugin_task::PluginTask;
|
||||||
|
|
||||||
pub struct Plan {}
|
use super::{plugins::PluginStore, task::IntoTask};
|
||||||
|
|
||||||
|
pub mod apt;
|
||||||
|
pub mod plugin_task;
|
||||||
|
|
||||||
|
pub struct Plan {
|
||||||
|
store: PluginStore,
|
||||||
|
}
|
||||||
impl Plan {
|
impl Plan {
|
||||||
pub fn new() -> Self {
|
pub fn new(store: PluginStore) -> Self {
|
||||||
Self {}
|
Self { store }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn tasks(&self) -> anyhow::Result<Vec<Task>> {
|
pub async fn tasks(&self) -> anyhow::Result<Vec<impl IntoTask>> {
|
||||||
Ok(vec![Task::new()])
|
Ok(vec![
|
||||||
}
|
AptTask::new().into_task(),
|
||||||
}
|
PluginTask::new("alloy@0.1.0", self.store.clone()).into_task(),
|
||||||
|
PluginTask::new("dev_packages@0.1.0", self.store.clone()).into_task(),
|
||||||
pub struct Task {}
|
])
|
||||||
|
|
||||||
impl Task {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Self {}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn id(&self) -> String {
|
|
||||||
"apt".into()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn should_run(&self) -> anyhow::Result<bool> {
|
|
||||||
Ok(true)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn execute(&self) -> anyhow::Result<()> {
|
|
||||||
let mut cmd = tokio::process::Command::new("apt-get");
|
|
||||||
cmd.args(["update", "-q"]);
|
|
||||||
let output = cmd.output().await.context("failed to run apt update")?;
|
|
||||||
match output.status.success() {
|
|
||||||
true => tracing::info!("successfully ran apt update"),
|
|
||||||
false => {
|
|
||||||
anyhow::bail!(
|
|
||||||
"failed to run apt update: {}",
|
|
||||||
std::str::from_utf8(&output.stderr)?
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut cmd = tokio::process::Command::new("apt-get");
|
|
||||||
cmd.env("DEBIAN_FRONTEND", "noninteractive")
|
|
||||||
.args(["upgrade", "-y"]);
|
|
||||||
let output = cmd.output().await.context("failed to run apt upgrade")?;
|
|
||||||
match output.status.success() {
|
|
||||||
true => tracing::info!("successfully ran apt upgrade"),
|
|
||||||
false => {
|
|
||||||
anyhow::bail!(
|
|
||||||
"failed to run apt upgrade: {}",
|
|
||||||
std::str::from_utf8(&output.stderr)?
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
49
crates/churn/src/agent/actions/apt.rs
Normal file
49
crates/churn/src/agent/actions/apt.rs
Normal file
@@ -0,0 +1,49 @@
|
|||||||
|
use anyhow::Context;
|
||||||
|
|
||||||
|
use crate::agent::task::Task;
|
||||||
|
|
||||||
|
pub struct AptTask {}
|
||||||
|
|
||||||
|
impl AptTask {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl Task for AptTask {
|
||||||
|
async fn id(&self) -> anyhow::Result<String> {
|
||||||
|
Ok("apt".into())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn execute(&self) -> anyhow::Result<()> {
|
||||||
|
let mut cmd = tokio::process::Command::new("apt-get");
|
||||||
|
cmd.args(["update", "-q"]);
|
||||||
|
let output = cmd.output().await.context("failed to run apt update")?;
|
||||||
|
match output.status.success() {
|
||||||
|
true => tracing::info!("successfully ran apt update"),
|
||||||
|
false => {
|
||||||
|
anyhow::bail!(
|
||||||
|
"failed to run apt update: {}",
|
||||||
|
std::str::from_utf8(&output.stderr)?
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut cmd = tokio::process::Command::new("apt-get");
|
||||||
|
cmd.env("DEBIAN_FRONTEND", "noninteractive")
|
||||||
|
.args(["upgrade", "-y"]);
|
||||||
|
let output = cmd.output().await.context("failed to run apt upgrade")?;
|
||||||
|
match output.status.success() {
|
||||||
|
true => tracing::info!("successfully ran apt upgrade"),
|
||||||
|
false => {
|
||||||
|
anyhow::bail!(
|
||||||
|
"failed to run apt upgrade: {}",
|
||||||
|
std::str::from_utf8(&output.stderr)?
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
30
crates/churn/src/agent/actions/plugin_task.rs
Normal file
30
crates/churn/src/agent/actions/plugin_task.rs
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
use crate::agent::{plugins::PluginStore, task::Task};
|
||||||
|
|
||||||
|
pub struct PluginTask {
|
||||||
|
plugin: String,
|
||||||
|
store: PluginStore,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PluginTask {
|
||||||
|
pub fn new(plugin: impl Into<String>, store: PluginStore) -> Self {
|
||||||
|
Self {
|
||||||
|
plugin: plugin.into(),
|
||||||
|
store,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl Task for PluginTask {
|
||||||
|
async fn id(&self) -> anyhow::Result<String> {
|
||||||
|
let id = self.store.id(&self.plugin).await?;
|
||||||
|
|
||||||
|
Ok(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn execute(&self) -> anyhow::Result<()> {
|
||||||
|
self.store.execute(&self.plugin).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -4,7 +4,8 @@ use crate::api::Discovery;
|
|||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
config::AgentConfig, discovery_client::DiscoveryClient, grpc_client::GrpcClient,
|
config::AgentConfig, discovery_client::DiscoveryClient, grpc_client::GrpcClient,
|
||||||
handlers::scheduled_tasks::ScheduledTasks, queue::AgentQueue, scheduler::Scheduler,
|
handlers::scheduled_tasks::ScheduledTasks, plugins::PluginStore, queue::AgentQueue,
|
||||||
|
scheduler::Scheduler,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@@ -35,6 +36,7 @@ pub struct State {
|
|||||||
pub config: AgentConfig,
|
pub config: AgentConfig,
|
||||||
pub discovery: Discovery,
|
pub discovery: Discovery,
|
||||||
pub queue: AgentQueue,
|
pub queue: AgentQueue,
|
||||||
|
pub plugin_store: PluginStore,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl State {
|
impl State {
|
||||||
@@ -42,7 +44,8 @@ impl State {
|
|||||||
let config = AgentConfig::new().await?;
|
let config = AgentConfig::new().await?;
|
||||||
let discovery = DiscoveryClient::new(&config.discovery).discover().await?;
|
let discovery = DiscoveryClient::new(&config.discovery).discover().await?;
|
||||||
let grpc = GrpcClient::new(&discovery.process_host);
|
let grpc = GrpcClient::new(&discovery.process_host);
|
||||||
let scheduled_tasks = ScheduledTasks::new();
|
let plugin_store = PluginStore::new(config.clone())?;
|
||||||
|
let scheduled_tasks = ScheduledTasks::new(plugin_store.clone());
|
||||||
let scheduler = Scheduler::new(scheduled_tasks);
|
let scheduler = Scheduler::new(scheduled_tasks);
|
||||||
let queue = AgentQueue::new(scheduler);
|
let queue = AgentQueue::new(scheduler);
|
||||||
|
|
||||||
@@ -51,6 +54,7 @@ impl State {
|
|||||||
config,
|
config,
|
||||||
discovery,
|
discovery,
|
||||||
queue,
|
queue,
|
||||||
|
plugin_store,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
@@ -6,6 +8,8 @@ use uuid::Uuid;
|
|||||||
pub struct AgentConfig {
|
pub struct AgentConfig {
|
||||||
pub agent_id: String,
|
pub agent_id: String,
|
||||||
pub discovery: String,
|
pub discovery: String,
|
||||||
|
|
||||||
|
pub labels: BTreeMap<String, String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AgentConfig {
|
impl AgentConfig {
|
||||||
@@ -15,6 +19,7 @@ impl AgentConfig {
|
|||||||
Ok(Self {
|
Ok(Self {
|
||||||
agent_id: config.agent_id,
|
agent_id: config.agent_id,
|
||||||
discovery: config.discovery,
|
discovery: config.discovery,
|
||||||
|
labels: config.labels.unwrap_or_default(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -23,6 +28,8 @@ impl AgentConfig {
|
|||||||
struct ConfigFile {
|
struct ConfigFile {
|
||||||
agent_id: String,
|
agent_id: String,
|
||||||
discovery: String,
|
discovery: String,
|
||||||
|
|
||||||
|
labels: Option<BTreeMap<String, String>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ConfigFile {
|
impl ConfigFile {
|
||||||
@@ -43,10 +50,15 @@ impl ConfigFile {
|
|||||||
toml::from_str(&contents).context("failed to parse the contents of the churn agent config")
|
toml::from_str(&contents).context("failed to parse the contents of the churn agent config")
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn write_default(discovery: impl Into<String>, force: bool) -> anyhow::Result<Self> {
|
pub async fn write_default(
|
||||||
|
discovery: impl Into<String>,
|
||||||
|
force: bool,
|
||||||
|
labels: impl Into<BTreeMap<String, String>>,
|
||||||
|
) -> anyhow::Result<Self> {
|
||||||
let s = Self {
|
let s = Self {
|
||||||
agent_id: Uuid::new_v4().to_string(),
|
agent_id: Uuid::new_v4().to_string(),
|
||||||
discovery: discovery.into(),
|
discovery: discovery.into(),
|
||||||
|
labels: Some(labels.into()),
|
||||||
};
|
};
|
||||||
|
|
||||||
let directory = dirs::data_dir()
|
let directory = dirs::data_dir()
|
||||||
@@ -73,8 +85,12 @@ impl ConfigFile {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn setup_config(discovery: impl Into<String>, force: bool) -> anyhow::Result<()> {
|
pub async fn setup_config(
|
||||||
ConfigFile::write_default(discovery, force).await?;
|
discovery: impl Into<String>,
|
||||||
|
force: bool,
|
||||||
|
labels: impl Into<BTreeMap<String, String>>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
ConfigFile::write_default(discovery, force, labels).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -61,25 +61,33 @@ impl GrpcClient {
|
|||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let mut client = self.client().await?;
|
let mut client = self.client().await?;
|
||||||
|
|
||||||
|
tracing::debug!("creating stream for listening to events on: {}", namespace);
|
||||||
let resp = client
|
let resp = client
|
||||||
.listen_events(ListenEventsRequest {
|
.listen_events(ListenEventsRequest {
|
||||||
namespace: namespace.into(),
|
namespace: namespace.into(),
|
||||||
id: id.map(|i| i.into()),
|
id: id.map(|i| i.into()),
|
||||||
})
|
})
|
||||||
.await?;
|
.await
|
||||||
|
.inspect_err(|e| tracing::warn!("failed to establish a connection: {}", e))?;
|
||||||
|
|
||||||
|
tracing::debug!("setup stream: {}", namespace);
|
||||||
let mut inner = resp.into_inner();
|
let mut inner = resp.into_inner();
|
||||||
while let Ok(Some(message)) = inner.message().await {
|
while let Ok(Some(message)) = inner.message().await {
|
||||||
exec.execute(message).await?;
|
tracing::debug!("received message: {}", namespace);
|
||||||
|
exec.execute(message)
|
||||||
|
.await
|
||||||
|
.inspect_err(|e| tracing::warn!("failed to handle message: {}", e))?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn client(&self) -> anyhow::Result<ChurnClient<tonic::transport::Channel>> {
|
async fn client(&self) -> anyhow::Result<ChurnClient<tonic::transport::Channel>> {
|
||||||
|
tracing::debug!("setting up client");
|
||||||
let channel = if self.host.starts_with("https") {
|
let channel = if self.host.starts_with("https") {
|
||||||
Channel::from_shared(self.host.to_owned())?
|
Channel::from_shared(self.host.to_owned())?
|
||||||
.tls_config(ClientTlsConfig::new().with_native_roots())?
|
.tls_config(ClientTlsConfig::new().with_native_roots())?
|
||||||
|
.connect_timeout(std::time::Duration::from_secs(5))
|
||||||
.connect()
|
.connect()
|
||||||
.await?
|
.await?
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -1,12 +1,18 @@
|
|||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
use crate::agent::actions::Plan;
|
use crate::agent::{
|
||||||
|
actions::Plan,
|
||||||
|
plugins::PluginStore,
|
||||||
|
task::{ConcreteTask, IntoTask},
|
||||||
|
};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct ScheduledTasks {}
|
pub struct ScheduledTasks {
|
||||||
|
store: PluginStore,
|
||||||
|
}
|
||||||
impl ScheduledTasks {
|
impl ScheduledTasks {
|
||||||
pub fn new() -> Self {
|
pub fn new(store: PluginStore) -> Self {
|
||||||
Self {}
|
Self { store }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle(
|
pub async fn handle(
|
||||||
@@ -16,16 +22,22 @@ impl ScheduledTasks {
|
|||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
tracing::info!("scheduling: {}", task);
|
tracing::info!("scheduling: {}", task);
|
||||||
|
|
||||||
let plan = Plan::new();
|
let plan = Plan::new(self.store.clone());
|
||||||
let tasks = plan.tasks().await?;
|
let tasks: Vec<ConcreteTask> = plan
|
||||||
|
.tasks()
|
||||||
|
.await?
|
||||||
|
.into_iter()
|
||||||
|
.map(|i| i.into_task())
|
||||||
|
.collect();
|
||||||
|
|
||||||
for task in tasks {
|
for task in tasks {
|
||||||
|
let id = task.id().await?;
|
||||||
if !task.should_run().await? {
|
if !task.should_run().await? {
|
||||||
tracing::debug!(task = task.id(), "skipping run");
|
tracing::debug!(task = id, "skipping run");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::info!(task = task.id(), "executing task");
|
tracing::info!(task = id, "executing task");
|
||||||
task.execute().await?;
|
task.execute().await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
328
crates/churn/src/agent/plugins.rs
Normal file
328
crates/churn/src/agent/plugins.rs
Normal file
@@ -0,0 +1,328 @@
|
|||||||
|
use anyhow::Context;
|
||||||
|
use component::churn_tasks::process::HostProcess;
|
||||||
|
use futures::StreamExt;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
use wasmtime::component::*;
|
||||||
|
use wasmtime::{Config, Engine, Store};
|
||||||
|
use wasmtime_wasi::{DirPerms, FilePerms, WasiCtx, WasiCtxBuilder, WasiView};
|
||||||
|
|
||||||
|
use super::config::AgentConfig;
|
||||||
|
|
||||||
|
wasmtime::component::bindgen!({
|
||||||
|
path: "wit/world.wit",
|
||||||
|
//world: "churn",
|
||||||
|
async: true,
|
||||||
|
with: {
|
||||||
|
"component:churn-tasks/process/process": CustomProcess,
|
||||||
|
"component:churn-tasks/http/client": http::HttpClient
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
mod http;
|
||||||
|
|
||||||
|
pub struct CustomProcess {
|
||||||
|
agent_config: AgentConfig,
|
||||||
|
}
|
||||||
|
impl CustomProcess {
|
||||||
|
pub fn new(agent_config: AgentConfig) -> Self {
|
||||||
|
Self { agent_config }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn run(&self, args: Vec<String>) -> String {
|
||||||
|
tracing::info!("calling function");
|
||||||
|
|
||||||
|
match args.split_first() {
|
||||||
|
Some((item, rest)) => {
|
||||||
|
let mut cmd = std::process::Command::new(item);
|
||||||
|
match cmd.args(rest).output() {
|
||||||
|
Ok(output) => std::str::from_utf8(&output.stdout)
|
||||||
|
.expect("to be able to parse utf8")
|
||||||
|
.to_string(),
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("command failed with output: {e}");
|
||||||
|
e.to_string()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
tracing::warn!("failed to call function because it is empty");
|
||||||
|
panic!("failed to call function because it is empty")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_label(&self, label_key: &str) -> Option<String> {
|
||||||
|
self.agent_config.labels.get(label_key).cloned()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct PluginStore {
|
||||||
|
inner: Arc<Mutex<InnerPluginStore>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PluginStore {
|
||||||
|
pub fn new(config: AgentConfig) -> anyhow::Result<Self> {
|
||||||
|
Ok(Self {
|
||||||
|
inner: Arc::new(Mutex::new(InnerPluginStore::new(config)?)),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn id(&self, plugin: &str) -> anyhow::Result<String> {
|
||||||
|
let mut inner = self.inner.lock().await;
|
||||||
|
inner.id(plugin).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn execute(&self, plugin: &str) -> anyhow::Result<()> {
|
||||||
|
let mut inner = self.inner.lock().await;
|
||||||
|
|
||||||
|
// FIXME: hack to avoid memory leak issues from instantiating plugins
|
||||||
|
*inner = InnerPluginStore::new(inner.agent_config.clone())?;
|
||||||
|
|
||||||
|
inner.execute(plugin).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct InnerPluginStore {
|
||||||
|
store: wasmtime::Store<ServerWasiView>,
|
||||||
|
linker: wasmtime::component::Linker<ServerWasiView>,
|
||||||
|
engine: wasmtime::Engine,
|
||||||
|
agent_config: AgentConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl InnerPluginStore {
|
||||||
|
pub fn new(agent_config: AgentConfig) -> anyhow::Result<Self> {
|
||||||
|
let mut config = Config::default();
|
||||||
|
config.wasm_component_model(true);
|
||||||
|
config.async_support(true);
|
||||||
|
let engine = Engine::new(&config)?;
|
||||||
|
let mut linker: wasmtime::component::Linker<ServerWasiView> = Linker::new(&engine);
|
||||||
|
|
||||||
|
// Add the command world (aka WASI CLI) to the linker
|
||||||
|
wasmtime_wasi::add_to_linker_async(&mut linker).context("Failed to link command world")?;
|
||||||
|
|
||||||
|
component::churn_tasks::process::add_to_linker(
|
||||||
|
&mut linker,
|
||||||
|
|state: &mut ServerWasiView| state,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
component::churn_tasks::http::add_to_linker(&mut linker, |state: &mut ServerWasiView| {
|
||||||
|
state
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let wasi_view = ServerWasiView::new(agent_config.clone());
|
||||||
|
let store = Store::new(&engine, wasi_view);
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
store,
|
||||||
|
linker,
|
||||||
|
engine,
|
||||||
|
agent_config,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn id(&mut self, plugin: &str) -> anyhow::Result<String> {
|
||||||
|
let plugin = self.ensure_plugin(plugin).await?;
|
||||||
|
|
||||||
|
plugin
|
||||||
|
.interface0
|
||||||
|
.call_id(&mut self.store)
|
||||||
|
.await
|
||||||
|
.context("Failed to call add function")
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn execute(&mut self, plugin: &str) -> anyhow::Result<()> {
|
||||||
|
let plugin = self.ensure_plugin(plugin).await?;
|
||||||
|
|
||||||
|
self.store.gc_async().await;
|
||||||
|
|
||||||
|
if plugin
|
||||||
|
.interface0
|
||||||
|
.call_should_run(&mut self.store)
|
||||||
|
.await
|
||||||
|
.context("Failed to call should run")?
|
||||||
|
{
|
||||||
|
tracing::info!("job was marked as required to run");
|
||||||
|
return plugin
|
||||||
|
.interface0
|
||||||
|
.call_execute(&mut self.store)
|
||||||
|
.await
|
||||||
|
.context("Failed to call add function");
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn ensure_plugin(&mut self, plugin: &str) -> anyhow::Result<Churn> {
|
||||||
|
let cache = dirs::cache_dir()
|
||||||
|
.ok_or(anyhow::anyhow!("failed to find cache dir"))?
|
||||||
|
.join("io.kjuulh.churn");
|
||||||
|
|
||||||
|
let (plugin_name, plugin_version) = plugin.split_once("@").unwrap_or((plugin, "latest"));
|
||||||
|
|
||||||
|
let plugin_path = cache
|
||||||
|
.join("plugins")
|
||||||
|
.join(plugin_name)
|
||||||
|
.join(plugin_version)
|
||||||
|
.join(format!("{plugin_name}.wasm"));
|
||||||
|
|
||||||
|
let no_cache: bool = std::env::var("CHURN_NO_CACHE")
|
||||||
|
.unwrap_or("false".into())
|
||||||
|
.parse()?;
|
||||||
|
|
||||||
|
if !plugin_path.exists() || no_cache {
|
||||||
|
tracing::info!(
|
||||||
|
plugin_name = plugin_name,
|
||||||
|
plugin_version = plugin_version,
|
||||||
|
"downloading plugin"
|
||||||
|
);
|
||||||
|
if let Some(parent) = plugin_path.parent() {
|
||||||
|
tokio::fs::create_dir_all(parent).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let req = reqwest::get(format!("https://api-minio.front.kjuulh.io/churn-registry/{plugin_name}/{plugin_version}/{plugin_name}.wasm")).await.context("failed to get plugin from registry")?;
|
||||||
|
let mut stream = req.bytes_stream();
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
plugin_name = plugin_name,
|
||||||
|
plugin_path = plugin_path.display().to_string(),
|
||||||
|
"writing plugin to file"
|
||||||
|
);
|
||||||
|
let mut file = tokio::fs::File::create(&plugin_path).await?;
|
||||||
|
while let Some(chunk) = stream.next().await {
|
||||||
|
let chunk = chunk?;
|
||||||
|
file.write_all(&chunk).await?;
|
||||||
|
}
|
||||||
|
file.flush().await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let component =
|
||||||
|
Component::from_file(&self.engine, plugin_path).context("Component file not found")?;
|
||||||
|
|
||||||
|
tracing::debug!(
|
||||||
|
plugin_name = plugin_name,
|
||||||
|
plugin_version = plugin_version,
|
||||||
|
"instantiating plugin"
|
||||||
|
);
|
||||||
|
let instance = Churn::instantiate_async(&mut self.store, &component, &self.linker)
|
||||||
|
.await
|
||||||
|
.context("Failed to instantiate the example world")
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
Ok(instance)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ServerWasiView {
|
||||||
|
table: ResourceTable,
|
||||||
|
ctx: WasiCtx,
|
||||||
|
processes: ResourceTable,
|
||||||
|
clients: ResourceTable,
|
||||||
|
agent_config: AgentConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ServerWasiView {
|
||||||
|
fn new(agent_config: AgentConfig) -> Self {
|
||||||
|
let table = ResourceTable::new();
|
||||||
|
|
||||||
|
let ctx = WasiCtxBuilder::new()
|
||||||
|
.inherit_stdio()
|
||||||
|
.inherit_stdout()
|
||||||
|
.inherit_env()
|
||||||
|
.inherit_stderr()
|
||||||
|
.inherit_network()
|
||||||
|
.preopened_dir("/", "/", DirPerms::all(), FilePerms::all())
|
||||||
|
.expect("to be able to open root")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Self {
|
||||||
|
table,
|
||||||
|
ctx,
|
||||||
|
processes: ResourceTable::default(),
|
||||||
|
clients: ResourceTable::default(),
|
||||||
|
agent_config,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WasiView for ServerWasiView {
|
||||||
|
fn table(&mut self) -> &mut ResourceTable {
|
||||||
|
&mut self.table
|
||||||
|
}
|
||||||
|
|
||||||
|
fn ctx(&mut self) -> &mut WasiCtx {
|
||||||
|
&mut self.ctx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl component::churn_tasks::process::Host for ServerWasiView {}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl HostProcess for ServerWasiView {
|
||||||
|
async fn new(
|
||||||
|
&mut self,
|
||||||
|
) -> wasmtime::component::Resource<component::churn_tasks::process::Process> {
|
||||||
|
self.processes
|
||||||
|
.push(CustomProcess::new(self.agent_config.clone()))
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_process(
|
||||||
|
&mut self,
|
||||||
|
self_: wasmtime::component::Resource<component::churn_tasks::process::Process>,
|
||||||
|
inputs: wasmtime::component::__internal::Vec<String>,
|
||||||
|
) -> String {
|
||||||
|
let process = self.processes.get(&self_).unwrap();
|
||||||
|
process.run(inputs)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_variable(
|
||||||
|
&mut self,
|
||||||
|
self_: wasmtime::component::Resource<component::churn_tasks::process::Process>,
|
||||||
|
key: wasmtime::component::__internal::String,
|
||||||
|
) -> String {
|
||||||
|
let process = self.processes.get(&self_).unwrap();
|
||||||
|
process.get_label(&key).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn drop(
|
||||||
|
&mut self,
|
||||||
|
rep: wasmtime::component::Resource<component::churn_tasks::process::Process>,
|
||||||
|
) -> wasmtime::Result<()> {
|
||||||
|
self.processes.delete(rep)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl component::churn_tasks::http::Host for ServerWasiView {}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl component::churn_tasks::http::HostClient for ServerWasiView {
|
||||||
|
async fn new(&mut self) -> wasmtime::component::Resource<component::churn_tasks::http::Client> {
|
||||||
|
self.clients.push(http::HttpClient::new()).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get(
|
||||||
|
&mut self,
|
||||||
|
self_: wasmtime::component::Resource<component::churn_tasks::http::Client>,
|
||||||
|
url: wasmtime::component::__internal::String,
|
||||||
|
) -> Vec<u8> {
|
||||||
|
let process = self.clients.get(&self_).unwrap();
|
||||||
|
process
|
||||||
|
.get(&url)
|
||||||
|
.await
|
||||||
|
.expect("to be able to make http call")
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn drop(
|
||||||
|
&mut self,
|
||||||
|
rep: wasmtime::component::Resource<component::churn_tasks::http::Client>,
|
||||||
|
) -> wasmtime::Result<()> {
|
||||||
|
self.clients.delete(rep)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
12
crates/churn/src/agent/plugins/http.rs
Normal file
12
crates/churn/src/agent/plugins/http.rs
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
pub struct HttpClient {}
|
||||||
|
|
||||||
|
impl HttpClient {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get(&self, url: &str) -> anyhow::Result<Vec<u8>> {
|
||||||
|
let bytes = reqwest::get(url).await?.bytes().await?;
|
||||||
|
Ok(bytes.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -30,7 +30,10 @@ impl notmad::Component for AgentRefresh {
|
|||||||
&self,
|
&self,
|
||||||
cancellation_token: tokio_util::sync::CancellationToken,
|
cancellation_token: tokio_util::sync::CancellationToken,
|
||||||
) -> Result<(), notmad::MadError> {
|
) -> Result<(), notmad::MadError> {
|
||||||
let cancel = nodrift::schedule_drifter(std::time::Duration::from_secs(60), self.clone());
|
// let cancel =
|
||||||
|
// nodrift::schedule_drifter(std::time::Duration::from_secs(60 * 10), self.clone());
|
||||||
|
let cancel =
|
||||||
|
nodrift::schedule_drifter(std::time::Duration::from_secs(60 * 5), self.clone());
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = cancel.cancelled() => {},
|
_ = cancel.cancelled() => {},
|
||||||
_ = cancellation_token.cancelled() => {
|
_ = cancellation_token.cancelled() => {
|
||||||
|
|||||||
45
crates/churn/src/agent/task.rs
Normal file
45
crates/churn/src/agent/task.rs
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
pub trait Task {
|
||||||
|
async fn id(&self) -> anyhow::Result<String>;
|
||||||
|
async fn should_run(&self) -> anyhow::Result<bool> {
|
||||||
|
Ok(true)
|
||||||
|
}
|
||||||
|
async fn execute(&self) -> anyhow::Result<()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait IntoTask {
|
||||||
|
fn into_task(self) -> ConcreteTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct ConcreteTask {
|
||||||
|
inner: Arc<dyn Task + Sync + Send + 'static>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConcreteTask {
|
||||||
|
pub fn new<T: Task + Sync + Send + 'static>(t: T) -> Self {
|
||||||
|
Self { inner: Arc::new(t) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::ops::Deref for ConcreteTask {
|
||||||
|
type Target = Arc<dyn Task + Sync + Send + 'static>;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.inner
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IntoTask for ConcreteTask {
|
||||||
|
fn into_task(self) -> ConcreteTask {
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Task + Sync + Send + 'static> IntoTask for T {
|
||||||
|
fn into_task(self) -> ConcreteTask {
|
||||||
|
ConcreteTask::new(self)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
use std::net::SocketAddr;
|
use std::{collections::BTreeMap, net::SocketAddr};
|
||||||
|
|
||||||
use clap::{Parser, Subcommand};
|
use clap::{Parser, Subcommand};
|
||||||
|
|
||||||
@@ -21,8 +21,24 @@ pub async fn execute() -> anyhow::Result<()> {
|
|||||||
agent::execute().await?;
|
agent::execute().await?;
|
||||||
tracing::info!("shut down agent");
|
tracing::info!("shut down agent");
|
||||||
}
|
}
|
||||||
AgentCommands::Setup { force, discovery } => {
|
AgentCommands::Setup {
|
||||||
agent::setup_config(discovery, force).await?;
|
force,
|
||||||
|
discovery,
|
||||||
|
labels,
|
||||||
|
} => {
|
||||||
|
let mut setup_labels = BTreeMap::new();
|
||||||
|
for (k, v) in labels {
|
||||||
|
setup_labels.insert(k, v);
|
||||||
|
}
|
||||||
|
|
||||||
|
if !setup_labels.contains_key("node_name") {
|
||||||
|
setup_labels.insert(
|
||||||
|
"node_name".into(),
|
||||||
|
petname::petname(2, "-").expect("to be able to generate a valid petname"),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
agent::setup_config(discovery, force, setup_labels).await?;
|
||||||
tracing::info!("wrote default agent config");
|
tracing::info!("wrote default agent config");
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@@ -65,5 +81,15 @@ enum AgentCommands {
|
|||||||
|
|
||||||
#[arg(env = "DISCOVERY_HOST", long = "discovery")]
|
#[arg(env = "DISCOVERY_HOST", long = "discovery")]
|
||||||
discovery: String,
|
discovery: String,
|
||||||
|
|
||||||
|
#[arg(long = "label", short = 'l', value_parser = parse_key_val, action = clap::ArgAction::Append)]
|
||||||
|
labels: Vec<(String, String)>,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn parse_key_val(s: &str) -> Result<(String, String), String> {
|
||||||
|
let (key, value) = s
|
||||||
|
.split_once("=")
|
||||||
|
.ok_or_else(|| format!("invalid key=value: no `=` found in `{s}`"))?;
|
||||||
|
Ok((key.to_string(), value.to_string()))
|
||||||
|
}
|
||||||
|
|||||||
@@ -65,7 +65,7 @@ impl crate::grpc::churn_server::Churn for GrpcServer {
|
|||||||
) -> std::result::Result<tonic::Response<Self::ListenEventsStream>, tonic::Status> {
|
) -> std::result::Result<tonic::Response<Self::ListenEventsStream>, tonic::Status> {
|
||||||
let (tx, rx) = tokio::sync::mpsc::channel(128);
|
let (tx, rx) = tokio::sync::mpsc::channel(128);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
|
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60 * 10));
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
|
|||||||
28
crates/churn/wit/world.wit
Normal file
28
crates/churn/wit/world.wit
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
package component:churn-tasks@0.1.0;
|
||||||
|
|
||||||
|
interface process {
|
||||||
|
resource process {
|
||||||
|
constructor();
|
||||||
|
run-process: func(inputs: list<string>) -> string;
|
||||||
|
get-variable: func(key: string) -> string;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
interface http {
|
||||||
|
resource client {
|
||||||
|
constructor();
|
||||||
|
get: func(url: string) -> list<u8>;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
interface task {
|
||||||
|
id: func() -> string;
|
||||||
|
should-run: func() -> bool;
|
||||||
|
execute: func();
|
||||||
|
}
|
||||||
|
|
||||||
|
world churn {
|
||||||
|
export task;
|
||||||
|
import process;
|
||||||
|
import http;
|
||||||
|
}
|
||||||
10
cuddle.yaml
10
cuddle.yaml
@@ -14,6 +14,16 @@ vars:
|
|||||||
- internal: "true"
|
- internal: "true"
|
||||||
- internal_grpc: "true"
|
- internal_grpc: "true"
|
||||||
|
|
||||||
|
please:
|
||||||
|
project:
|
||||||
|
owner: kjuulh
|
||||||
|
repository: churn-v2
|
||||||
|
branch: main
|
||||||
|
settings:
|
||||||
|
api_url: https://git.front.kjuulh.io
|
||||||
|
actions:
|
||||||
|
rust:
|
||||||
|
|
||||||
cuddle/clusters:
|
cuddle/clusters:
|
||||||
dev:
|
dev:
|
||||||
env:
|
env:
|
||||||
|
|||||||
53
install.sh
53
install.sh
@@ -8,15 +8,23 @@ APP_VERSION="latest" # or specify a version
|
|||||||
S3_BUCKET="rust-artifacts"
|
S3_BUCKET="rust-artifacts"
|
||||||
BINARY_NAME="churn"
|
BINARY_NAME="churn"
|
||||||
SERVICE_NAME="${APP_NAME}.service"
|
SERVICE_NAME="${APP_NAME}.service"
|
||||||
|
SERVICE_UPDATE_NAME="${APP_NAME}-update.service"
|
||||||
|
TIMER_UPDATE_NAME="${APP_NAME}-update.timer"
|
||||||
INSTALL_DIR="/usr/local/bin"
|
INSTALL_DIR="/usr/local/bin"
|
||||||
CONFIG_DIR="/etc/${APP_NAME}"
|
CONFIG_DIR="/etc/${APP_NAME}"
|
||||||
CHURN_DISCOVERY="https://churn.internal.prod.kjuulh.app"
|
CHURN_DISCOVERY="https://churn.prod.kjuulh.app"
|
||||||
|
LOG="/var/log/churn-install.log"
|
||||||
|
|
||||||
# Colors for output
|
# Colors for output
|
||||||
RED='\033[0;31m'
|
RED='\033[0;31m'
|
||||||
GREEN='\033[0;32m'
|
GREEN='\033[0;32m'
|
||||||
NC='\033[0m' # No Color
|
NC='\033[0m' # No Color
|
||||||
|
|
||||||
|
|
||||||
|
exec > >(tee -i ${LOG})
|
||||||
|
exec 2>&1
|
||||||
|
echo "Starting churn install $(date)"
|
||||||
|
|
||||||
# Check if running as root
|
# Check if running as root
|
||||||
if [ "$EUID" -ne 0 ]; then
|
if [ "$EUID" -ne 0 ]; then
|
||||||
echo -e "${RED}Please run as root${NC}"
|
echo -e "${RED}Please run as root${NC}"
|
||||||
@@ -28,9 +36,14 @@ echo "Creating directories..."
|
|||||||
mkdir -p "${INSTALL_DIR}"
|
mkdir -p "${INSTALL_DIR}"
|
||||||
mkdir -p "${CONFIG_DIR}"
|
mkdir -p "${CONFIG_DIR}"
|
||||||
|
|
||||||
|
if systemctl is-active --quiet churn.service; then
|
||||||
|
echo "Stopping existing churn service..."
|
||||||
|
systemctl stop churn.service
|
||||||
|
fi
|
||||||
|
|
||||||
# Download binary from S3
|
# Download binary from S3
|
||||||
echo "Downloading binary..."
|
echo "Downloading binary..."
|
||||||
curl -L "https://api-minio.front.kjuulh.io/${S3_BUCKET}/releases/${APP_NAME}/${APP_VERSION}/${BINARY_NAME}" -o "${INSTALL_DIR}/${BINARY_NAME}"
|
curl -L -s "https://api-minio.front.kjuulh.io/${S3_BUCKET}/releases/${APP_NAME}/${APP_VERSION}/${BINARY_NAME}" -o "${INSTALL_DIR}/${BINARY_NAME}"
|
||||||
|
|
||||||
# Make binary executable
|
# Make binary executable
|
||||||
chmod +x "${INSTALL_DIR}/${BINARY_NAME}"
|
chmod +x "${INSTALL_DIR}/${BINARY_NAME}"
|
||||||
@@ -64,18 +77,52 @@ Group=root
|
|||||||
ExecStart=${INSTALL_DIR}/${BINARY_NAME} agent start
|
ExecStart=${INSTALL_DIR}/${BINARY_NAME} agent start
|
||||||
Restart=always
|
Restart=always
|
||||||
RestartSec=10
|
RestartSec=10
|
||||||
Environment=RUST_LOG=info
|
Environment=RUST_LOG=h2=warn,hyper=warn,churn=debug,warn
|
||||||
|
|
||||||
[Install]
|
[Install]
|
||||||
WantedBy=multi-user.target
|
WantedBy=multi-user.target
|
||||||
EOF
|
EOF
|
||||||
|
|
||||||
|
echo "Creating churn update service..."
|
||||||
|
cat > "/etc/systemd/system/${SERVICE_UPDATE_NAME}" <<EOF
|
||||||
|
[Unit]
|
||||||
|
Description=Daily Churn Update Service
|
||||||
|
After=network-online.target
|
||||||
|
Wants=network-online.target
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
Type=oneshot
|
||||||
|
ExecStart=/bin/bash -c 'curl -s https://git.front.kjuulh.io/kjuulh/churn-v2/raw/branch/main/install.sh | bash'
|
||||||
|
User=root
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=multi-user.target
|
||||||
|
EOF
|
||||||
|
|
||||||
|
cat > "/etc/systemd/system/${TIMER_UPDATE_NAME}" <<EOF
|
||||||
|
[Unit]
|
||||||
|
Description=Run Churn Update Daily
|
||||||
|
|
||||||
|
[Timer]
|
||||||
|
OnCalendar=daily
|
||||||
|
Persistent=true
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=timers.target
|
||||||
|
EOF
|
||||||
|
|
||||||
# Reload systemd and enable service
|
# Reload systemd and enable service
|
||||||
echo "Configuring systemd service..."
|
echo "Configuring systemd service..."
|
||||||
systemctl daemon-reload
|
systemctl daemon-reload
|
||||||
|
|
||||||
systemctl enable "${SERVICE_NAME}"
|
systemctl enable "${SERVICE_NAME}"
|
||||||
systemctl start "${SERVICE_NAME}"
|
systemctl start "${SERVICE_NAME}"
|
||||||
|
|
||||||
|
systemctl enable "${SERVICE_UPDATE_NAME}"
|
||||||
|
|
||||||
|
systemctl enable "${TIMER_UPDATE_NAME}"
|
||||||
|
systemctl start "${TIMER_UPDATE_NAME}"
|
||||||
|
|
||||||
# Check service status
|
# Check service status
|
||||||
if systemctl is-active --quiet "${SERVICE_NAME}"; then
|
if systemctl is-active --quiet "${SERVICE_NAME}"; then
|
||||||
echo -e "${GREEN}Installation successful! ${APP_NAME} is running.${NC}"
|
echo -e "${GREEN}Installation successful! ${APP_NAME} is running.${NC}"
|
||||||
|
|||||||
Reference in New Issue
Block a user