feat: add ubernetes like example
Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
@@ -15,3 +15,5 @@ tracing-subscriber.workspace = true
|
||||
tracing.workspace = true
|
||||
rand.workspace = true
|
||||
uuid.workspace = true
|
||||
noprocess.workspace = true
|
||||
tokio-util = { version = "0.7", features = ["rt"] }
|
||||
|
||||
@@ -1,15 +1,17 @@
|
||||
use std::time::Duration;
|
||||
use std::{collections::HashMap, future::Future, sync::Arc, time::Duration};
|
||||
|
||||
use nocontrol::{
|
||||
Operator, OperatorState, Specification,
|
||||
manifests::{Action, Manifest, ManifestMetadata, ManifestState},
|
||||
manifests::{Action, Manifest, ManifestMetadata, ManifestState, ManifestStatusState},
|
||||
};
|
||||
use noprocess::{HandleID, Process, ProcessHandler, ProcessManager, ProcessResult, ProcessState};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::RwLock;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
// Setup logging to file
|
||||
let output_file = std::fs::File::create("target/nocontrol.log")?;
|
||||
|
||||
tracing_subscriber::fmt()
|
||||
@@ -21,41 +23,46 @@ async fn main() -> anyhow::Result<()> {
|
||||
.without_time()
|
||||
.init();
|
||||
|
||||
let operator = OperatorState::new(MyOperator {});
|
||||
let process_manager = ProcessManager::new();
|
||||
let operator = OperatorState::new(ProcessOperator::new(process_manager));
|
||||
let control_plane = nocontrol::ControlPlane::new(operator);
|
||||
|
||||
// Add initial manifest
|
||||
// Add initial process manifest - desired state is Running
|
||||
control_plane
|
||||
.add_manifest(Manifest {
|
||||
name: "initial-deployment".into(),
|
||||
name: "worker-process".into(),
|
||||
metadata: ManifestMetadata {},
|
||||
spec: Specifications::Deployment(DeploymentControllerManifest {
|
||||
name: "initial-app".into(),
|
||||
spec: Specifications::Process(ProcessManifest {
|
||||
name: "data-worker".into(),
|
||||
desired_state: DesiredProcessState::Running,
|
||||
}),
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Spawn random manifest updater
|
||||
// Spawn a task that toggles the desired state periodically
|
||||
tokio::spawn({
|
||||
let control_plane = control_plane.clone();
|
||||
async move {
|
||||
let mut running = true;
|
||||
loop {
|
||||
let rand = {
|
||||
use rand::Rng;
|
||||
let mut rng = rand::rng();
|
||||
rng.random_range(3..8)
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
|
||||
running = !running;
|
||||
let desired_state = if running {
|
||||
DesiredProcessState::Running
|
||||
} else {
|
||||
DesiredProcessState::Stopped
|
||||
};
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(rand)).await;
|
||||
|
||||
let random = uuid::Uuid::now_v7();
|
||||
tracing::info!("Toggling worker-process to {:?}", desired_state);
|
||||
|
||||
let _ = control_plane
|
||||
.add_manifest(Manifest {
|
||||
name: "initial-deployment".into(),
|
||||
name: "worker-process".into(),
|
||||
metadata: ManifestMetadata {},
|
||||
spec: Specifications::Deployment(DeploymentControllerManifest {
|
||||
name: format!("app-{}", &random.to_string()),
|
||||
spec: Specifications::Process(ProcessManifest {
|
||||
name: "data-worker".into(),
|
||||
desired_state,
|
||||
}),
|
||||
})
|
||||
.await;
|
||||
@@ -77,44 +84,212 @@ async fn main() -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// The operator that manages processes via noprocess
|
||||
#[derive(Clone)]
|
||||
pub struct MyOperator {}
|
||||
pub struct ProcessOperator {
|
||||
process_manager: ProcessManager,
|
||||
// Maps manifest name -> process handle ID in the ProcessManager
|
||||
process_ids: Arc<RwLock<HashMap<String, HandleID>>>,
|
||||
}
|
||||
|
||||
impl Operator for MyOperator {
|
||||
impl ProcessOperator {
|
||||
pub fn new(process_manager: ProcessManager) -> Self {
|
||||
Self {
|
||||
process_manager,
|
||||
process_ids: Arc::new(RwLock::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Operator for ProcessOperator {
|
||||
type Specifications = Specifications;
|
||||
|
||||
async fn reconcile(
|
||||
&self,
|
||||
desired_manifest: &mut ManifestState<Specifications>,
|
||||
manifest_state: &mut ManifestState<Specifications>,
|
||||
) -> anyhow::Result<Action> {
|
||||
match &desired_manifest.manifest.spec {
|
||||
Specifications::Deployment(spec) => {
|
||||
tracing::info!(
|
||||
"reconciliation was called for name = {}, value = {}",
|
||||
desired_manifest.manifest.name,
|
||||
spec.name
|
||||
)
|
||||
match &manifest_state.manifest.spec {
|
||||
Specifications::Process(spec) => {
|
||||
let manifest_name = &manifest_state.manifest.name;
|
||||
|
||||
// Get or create the process
|
||||
let process_id = {
|
||||
let ids = self.process_ids.read().await;
|
||||
ids.get(manifest_name).cloned()
|
||||
};
|
||||
|
||||
match spec.desired_state {
|
||||
DesiredProcessState::Running => {
|
||||
if let Some(id) = &process_id {
|
||||
// Check if process is already running
|
||||
let status = self.process_manager.get_process_state(id).await;
|
||||
match status {
|
||||
Some(ProcessState::Running) => {
|
||||
// Process is running as desired
|
||||
manifest_state.status.status = ManifestStatusState::Running;
|
||||
tracing::info!(
|
||||
"Process {} is running as desired",
|
||||
spec.name
|
||||
);
|
||||
}
|
||||
Some(ProcessState::Pending) | Some(ProcessState::Stopped) => {
|
||||
// Process is pending or stopped, start it
|
||||
tracing::info!(
|
||||
"Process {} is pending/stopped, starting",
|
||||
spec.name
|
||||
);
|
||||
self.process_manager.start_process(id).await?;
|
||||
manifest_state.status.status = ManifestStatusState::Started;
|
||||
}
|
||||
Some(ProcessState::Errored) => {
|
||||
// Process errored, update status and try to restart
|
||||
tracing::warn!(
|
||||
"Process {} errored, attempting restart",
|
||||
spec.name
|
||||
);
|
||||
manifest_state.status.status = ManifestStatusState::Errored;
|
||||
// Try to restart
|
||||
let _ = self.process_manager.restart_process(id).await;
|
||||
}
|
||||
None => {
|
||||
// Process doesn't exist in manager, recreate
|
||||
tracing::info!(
|
||||
"Process {} not found, creating new",
|
||||
spec.name
|
||||
);
|
||||
let new_id = self
|
||||
.process_manager
|
||||
.add_process(Process::new(WorkerProcess {
|
||||
name: spec.name.clone(),
|
||||
}))
|
||||
.await;
|
||||
self.process_manager.start_process(&new_id).await?;
|
||||
self.process_ids
|
||||
.write()
|
||||
.await
|
||||
.insert(manifest_name.clone(), new_id);
|
||||
manifest_state.status.status = ManifestStatusState::Started;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// No process exists yet, create and start one
|
||||
tracing::info!("Creating and starting process {}", spec.name);
|
||||
let id = self
|
||||
.process_manager
|
||||
.add_process(Process::new(WorkerProcess {
|
||||
name: spec.name.clone(),
|
||||
}))
|
||||
.await;
|
||||
self.process_manager.start_process(&id).await?;
|
||||
self.process_ids
|
||||
.write()
|
||||
.await
|
||||
.insert(manifest_name.clone(), id);
|
||||
manifest_state.status.status = ManifestStatusState::Started;
|
||||
}
|
||||
}
|
||||
DesiredProcessState::Stopped => {
|
||||
if let Some(id) = &process_id {
|
||||
let status = self.process_manager.get_process_state(id).await;
|
||||
match status {
|
||||
Some(ProcessState::Running) => {
|
||||
// Process is running but should be stopped
|
||||
tracing::info!(
|
||||
"Stopping process {} as requested",
|
||||
spec.name
|
||||
);
|
||||
manifest_state.status.status = ManifestStatusState::Stopping;
|
||||
self.process_manager.stop_process(id).await?;
|
||||
}
|
||||
Some(ProcessState::Pending) | Some(ProcessState::Stopped) => {
|
||||
// Already pending/stopped as desired
|
||||
manifest_state.status.status = ManifestStatusState::Pending;
|
||||
tracing::info!(
|
||||
"Process {} is pending/stopped as desired",
|
||||
spec.name
|
||||
);
|
||||
}
|
||||
Some(ProcessState::Errored) => {
|
||||
// Errored and should be stopped - that's fine
|
||||
tracing::info!(
|
||||
"Process {} errored, desired state is stopped",
|
||||
spec.name
|
||||
);
|
||||
manifest_state.status.status = ManifestStatusState::Errored;
|
||||
}
|
||||
None => {
|
||||
// Doesn't exist, which is fine for stopped state
|
||||
manifest_state.status.status = ManifestStatusState::Pending;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// No process exists, which is fine for stopped state
|
||||
manifest_state.status.status = ManifestStatusState::Pending;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Action::Requeue(std::time::Duration::from_secs(10)))
|
||||
Ok(Action::Requeue(Duration::from_secs(5)))
|
||||
}
|
||||
}
|
||||
|
||||
// A simple worker process that does periodic work
|
||||
#[derive(Clone)]
|
||||
struct WorkerProcess {
|
||||
name: String,
|
||||
}
|
||||
|
||||
impl ProcessHandler for WorkerProcess {
|
||||
fn call(&self, cancel: CancellationToken) -> impl Future<Output = ProcessResult> + Send {
|
||||
let name = self.name.clone();
|
||||
async move {
|
||||
tracing::info!("Worker {} started", name);
|
||||
let mut iteration = 0u64;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
tracing::info!("Worker {} received cancellation, shutting down gracefully", name);
|
||||
break;
|
||||
}
|
||||
_ = tokio::time::sleep(Duration::from_secs(2)) => {
|
||||
iteration += 1;
|
||||
tracing::info!("Worker {} iteration {}: doing work...", name, iteration);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!("Worker {} stopped", name);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Specifications enum for the operator
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub enum Specifications {
|
||||
Deployment(DeploymentControllerManifest),
|
||||
Process(ProcessManifest),
|
||||
}
|
||||
|
||||
impl Specification for Specifications {
|
||||
fn kind(&self) -> &'static str {
|
||||
match self {
|
||||
Specifications::Deployment(_) => "deployment",
|
||||
Specifications::Process(_) => "process",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The manifest specification for a process
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct DeploymentControllerManifest {
|
||||
name: String,
|
||||
pub struct ProcessManifest {
|
||||
pub name: String,
|
||||
pub desired_state: DesiredProcessState,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub enum DesiredProcessState {
|
||||
Running,
|
||||
Stopped,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user