feat: add compute

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
2026-03-21 00:42:17 +01:00
parent 04e452ecc3
commit 7188b44624
17 changed files with 1307 additions and 3 deletions

View File

@@ -6,7 +6,7 @@ edition = "2024"
[dependencies]
forage-core = { path = "../forage-core" }
forage-db = { path = "../forage-db" }
forage-grpc = { path = "../forage-grpc" }
forage-grpc = { path = "../forage-grpc", features = ["client", "server"] }
anyhow.workspace = true
chrono.workspace = true
async-trait.workspace = true
@@ -37,3 +37,4 @@ sha2.workspace = true
notmad.workspace = true
tokio-util.workspace = true
async-nats.workspace = true
async-stream = "0.3"

View File

@@ -0,0 +1,160 @@
use std::pin::Pin;
use std::sync::Arc;
use forage_core::compute::{
ComputeError, ComputeResourceSpec, ComputeScheduler, ResourceKind, RolloutStatus,
};
use forage_grpc::forage_service_server::ForageService;
use forage_grpc::{
ApplyResourcesRequest, ApplyResourcesResponse, DeleteResourcesRequest,
DeleteResourcesResponse, RolloutEvent as ProtoRolloutEvent, WatchRolloutRequest,
};
use tokio_stream::Stream;
use tonic::{Request, Response, Status};
/// Implements the `ForageService` gRPC server trait.
///
/// Thin adapter: validates auth, converts proto to domain types, delegates to
/// the `ComputeScheduler`, and converts results back to proto.
pub struct ForageServiceImpl {
pub scheduler: Arc<dyn ComputeScheduler>,
}
type WatchStream =
Pin<Box<dyn Stream<Item = Result<ProtoRolloutEvent, Status>> + Send + 'static>>;
#[tonic::async_trait]
impl ForageService for ForageServiceImpl {
async fn apply_resources(
&self,
request: Request<ApplyResourcesRequest>,
) -> Result<Response<ApplyResourcesResponse>, Status> {
let req = request.into_inner();
if req.namespace.is_empty() {
return Err(Status::invalid_argument("namespace is required"));
}
if req.resources.is_empty() {
return Err(Status::invalid_argument("at least one resource is required"));
}
let resources: Vec<ComputeResourceSpec> = req
.resources
.iter()
.map(|r| {
let (kind, image, replicas, cpu, memory) = match &r.spec {
Some(forage_grpc::forage_resource::Spec::ContainerService(cs)) => {
let container = cs.container.as_ref();
let scaling = cs.scaling.as_ref();
(
ResourceKind::ContainerService,
container.map(|c| c.image.clone()),
scaling.map(|s| s.replicas).unwrap_or(1),
container
.and_then(|c| c.resources.as_ref())
.and_then(|r| r.requests.as_ref())
.map(|r| r.cpu.clone()),
container
.and_then(|c| c.resources.as_ref())
.and_then(|r| r.requests.as_ref())
.map(|r| r.memory.clone()),
)
}
Some(forage_grpc::forage_resource::Spec::Service(_)) => {
(ResourceKind::Service, None, 1, None, None)
}
Some(forage_grpc::forage_resource::Spec::Route(_)) => {
(ResourceKind::Route, None, 1, None, None)
}
Some(forage_grpc::forage_resource::Spec::CronJob(cj)) => {
let image = cj.container.as_ref().map(|c| c.image.clone());
(ResourceKind::CronJob, image, 1, None, None)
}
Some(forage_grpc::forage_resource::Spec::Job(j)) => {
let image = j.container.as_ref().map(|c| c.image.clone());
(ResourceKind::Job, image, 1, None, None)
}
None => (ResourceKind::ContainerService, None, 1, None, None),
};
ComputeResourceSpec {
name: r.name.clone(),
kind,
image,
replicas,
cpu,
memory,
}
})
.collect();
let rollout_id = self
.scheduler
.apply_resources(&req.apply_id, &req.namespace, resources, req.labels)
.await
.map_err(compute_err_to_status)?;
Ok(Response::new(ApplyResourcesResponse { rollout_id }))
}
type WatchRolloutStream = WatchStream;
async fn watch_rollout(
&self,
request: Request<WatchRolloutRequest>,
) -> Result<Response<Self::WatchRolloutStream>, Status> {
let rollout_id = request.into_inner().rollout_id;
let mut rx = self
.scheduler
.watch_rollout(&rollout_id)
.await
.map_err(compute_err_to_status)?;
let stream = async_stream::stream! {
while let Some(event) = rx.recv().await {
yield Ok(ProtoRolloutEvent {
resource_name: event.resource_name,
resource_kind: event.resource_kind,
status: domain_status_to_proto(event.status) as i32,
message: event.message,
});
}
};
Ok(Response::new(Box::pin(stream) as Self::WatchRolloutStream))
}
async fn delete_resources(
&self,
request: Request<DeleteResourcesRequest>,
) -> Result<Response<DeleteResourcesResponse>, Status> {
let req = request.into_inner();
self.scheduler
.delete_resources(&req.namespace, req.labels)
.await
.map_err(compute_err_to_status)?;
Ok(Response::new(DeleteResourcesResponse {}))
}
}
fn compute_err_to_status(e: ComputeError) -> Status {
match e {
ComputeError::NotFound(msg) => Status::not_found(msg),
ComputeError::InvalidRequest(msg) => Status::invalid_argument(msg),
ComputeError::Conflict(msg) => Status::already_exists(msg),
ComputeError::Internal(msg) => Status::internal(msg),
}
}
fn domain_status_to_proto(s: RolloutStatus) -> forage_grpc::RolloutStatus {
match s {
RolloutStatus::Pending => forage_grpc::RolloutStatus::Pending,
RolloutStatus::InProgress => forage_grpc::RolloutStatus::InProgress,
RolloutStatus::Succeeded => forage_grpc::RolloutStatus::Succeeded,
RolloutStatus::Failed => forage_grpc::RolloutStatus::Failed,
RolloutStatus::RolledBack => forage_grpc::RolloutStatus::RolledBack,
}
}

View File

@@ -1,9 +1,11 @@
mod auth;
mod compute_grpc;
mod forest_client;
mod notification_consumer;
mod notification_ingester;
mod notification_worker;
mod routes;
mod serve_grpc;
mod serve_http;
mod session_reaper;
mod state;
@@ -252,6 +254,20 @@ async fn main() -> anyhow::Result<()> {
}
}
// Compute scheduler (mock for now — simulates container lifecycle)
let compute_scheduler = Arc::new(forage_core::compute::InMemoryComputeScheduler::new());
state = state.with_compute_scheduler(compute_scheduler.clone());
let grpc_port: u16 = std::env::var("GRPC_PORT")
.ok()
.and_then(|p| p.parse().ok())
.unwrap_or(4050);
let grpc_addr = SocketAddr::from(([0, 0, 0, 0], grpc_port));
mad.add(serve_grpc::ServeGrpc {
addr: grpc_addr,
scheduler: compute_scheduler,
});
// HTTP server component
mad.add(serve_http::ServeHttp { addr, state });

View File

@@ -142,6 +142,12 @@ pub fn router() -> Router<AppState> {
get(timeline_api),
)
.route("/api/orgs/{org}/timeline", get(org_timeline_api))
.route("/orgs/{org}/compute", get(compute_page))
.route(
"/orgs/{org}/compute/rollouts/{rollout_id}",
get(rollout_detail_page),
)
.route("/api/compute/regions", get(regions_api))
}
fn orgs_context(orgs: &[CachedOrg]) -> Vec<minijinja::Value> {
@@ -4989,3 +4995,202 @@ async fn get_plan_output_api(
}))
.into_response())
}
// ---------------------------------------------------------------------------
// Compute
// ---------------------------------------------------------------------------
async fn compute_page(
State(state): State<AppState>,
session: Session,
Path(org): Path<String>,
) -> Result<Response, Response> {
let orgs = &session.user.orgs;
let _cached_org = require_org_membership(&state, orgs, &org)?;
let (instances, rollouts) = if let Some(ref scheduler) = state.compute_scheduler {
let namespace = &org;
let instances = scheduler
.list_instances(namespace)
.await
.unwrap_or_default();
let rollouts = scheduler
.list_rollouts(namespace)
.await
.unwrap_or_default();
(instances, rollouts)
} else {
(vec![], vec![])
};
let instances_ctx: Vec<minijinja::Value> = instances
.iter()
.map(|i| {
context! {
id => i.id,
resource_name => i.resource_name,
project => i.project,
destination => i.destination,
environment => i.environment,
image => i.image,
region => i.region,
replicas => i.replicas,
cpu => i.cpu,
memory => i.memory,
status => i.status,
}
})
.collect();
let rollouts_ctx: Vec<minijinja::Value> = rollouts
.iter()
.take(20)
.map(|r| {
let resources: Vec<minijinja::Value> = r
.resources
.iter()
.map(|res| {
context! {
name => res.name,
kind => res.kind.to_string(),
status => res.status.to_string(),
message => res.message,
}
})
.collect();
context! {
id => r.id,
apply_id => r.apply_id,
namespace => r.namespace,
status => r.status.to_string(),
resources => resources,
}
})
.collect();
let projects = warn_default(
"compute: list projects",
state
.platform_client
.list_projects(&session.access_token, &org)
.await,
);
let html = state
.templates
.render(
"pages/compute.html.jinja",
context! {
title => format!("Compute - {} - Forage", org),
description => "Managed compute instances",
user => context! { username => session.user.username },
csrf_token => &session.csrf_token,
orgs => orgs_context(orgs),
current_org => &org,
active_tab => "compute",
projects => projects,
instances => instances_ctx,
rollouts => rollouts_ctx,
org_name => &org,
},
)
.map_err(|e| internal_error(&state, "compute render", &e))?;
Ok(Html(html).into_response())
}
async fn rollout_detail_page(
State(state): State<AppState>,
session: Session,
Path((org, rollout_id)): Path<(String, String)>,
) -> Result<Response, Response> {
let orgs = &session.user.orgs;
let _cached_org = require_org_membership(&state, orgs, &org)?;
let scheduler = state.compute_scheduler.as_ref().ok_or_else(|| {
error_page(
&state,
StatusCode::NOT_FOUND,
"Not available",
"Compute is not enabled.",
)
})?;
let rollout = scheduler.get_rollout(&rollout_id).await.map_err(|_| {
error_page(
&state,
StatusCode::NOT_FOUND,
"Not found",
"Rollout not found.",
)
})?;
let resources_ctx: Vec<minijinja::Value> = rollout
.resources
.iter()
.map(|r| {
context! {
name => r.name,
kind => r.kind.to_string(),
status => r.status.to_string(),
message => r.message,
}
})
.collect();
let labels_ctx: Vec<minijinja::Value> = rollout.labels.iter().map(|(k, v)| context! { key => k, value => v }).collect();
let rollout_ctx = context! {
id => rollout.id,
apply_id => rollout.apply_id,
namespace => rollout.namespace,
status => rollout.status.to_string(),
resources => resources_ctx,
labels => labels_ctx,
};
let projects = warn_default(
"rollout detail: list projects",
state
.platform_client
.list_projects(&session.access_token, &org)
.await,
);
let html = state
.templates
.render(
"pages/rollout_detail.html.jinja",
context! {
title => format!("Rollout {} - Forage", rollout.apply_id),
description => "Rollout details",
user => context! { username => session.user.username },
csrf_token => &session.csrf_token,
orgs => orgs_context(orgs),
current_org => &org,
active_tab => "compute",
projects => projects,
rollout => rollout_ctx,
org_name => &org,
},
)
.map_err(|e| internal_error(&state, "rollout detail render", &e))?;
Ok(Html(html).into_response())
}
async fn regions_api() -> impl IntoResponse {
let regions: Vec<serde_json::Value> = forage_core::compute::REGIONS
.iter()
.map(|r| {
serde_json::json!({
"id": r.id,
"name": r.name,
"display_name": r.display_name,
"available": r.available,
})
})
.collect();
Json(regions)
}

View File

@@ -0,0 +1,39 @@
use std::net::SocketAddr;
use std::sync::Arc;
use anyhow::Context;
use forage_core::compute::ComputeScheduler;
use forage_grpc::forage_service_server::ForageServiceServer;
use notmad::{Component, ComponentInfo, MadError};
use tokio_util::sync::CancellationToken;
use crate::compute_grpc::ForageServiceImpl;
pub struct ServeGrpc {
pub addr: SocketAddr,
pub scheduler: Arc<dyn ComputeScheduler>,
}
impl Component for ServeGrpc {
fn info(&self) -> ComponentInfo {
"forage/grpc".into()
}
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError> {
let svc = ForageServiceImpl {
scheduler: self.scheduler.clone(),
};
tracing::info!("gRPC server listening on {}", self.addr);
tonic::transport::Server::builder()
.add_service(ForageServiceServer::new(svc))
.serve_with_shutdown(self.addr, async move {
cancellation_token.cancelled().await;
})
.await
.context("failed to run gRPC server")?;
Ok(())
}
}

View File

@@ -3,6 +3,7 @@ use std::sync::Arc;
use crate::forest_client::GrpcForestClient;
use crate::templates::TemplateEngine;
use forage_core::auth::ForestAuth;
use forage_core::compute::ComputeScheduler;
use forage_core::integrations::IntegrationStore;
use forage_core::platform::ForestPlatform;
use forage_core::session::SessionStore;
@@ -24,6 +25,7 @@ pub struct AppState {
pub grpc_client: Option<Arc<GrpcForestClient>>,
pub integration_store: Option<Arc<dyn IntegrationStore>>,
pub slack_config: Option<SlackConfig>,
pub compute_scheduler: Option<Arc<dyn ComputeScheduler>>,
}
impl AppState {
@@ -41,6 +43,7 @@ impl AppState {
grpc_client: None,
integration_store: None,
slack_config: None,
compute_scheduler: None,
}
}
@@ -58,4 +61,9 @@ impl AppState {
self.slack_config = Some(config);
self
}
pub fn with_compute_scheduler(mut self, scheduler: Arc<dyn ComputeScheduler>) -> Self {
self.compute_scheduler = Some(scheduler);
self
}
}