Files
client/crates/forage-server/src/main.rs
2026-03-15 22:38:42 +01:00

269 lines
9.0 KiB
Rust

mod auth;
mod forest_client;
mod notification_consumer;
mod notification_ingester;
mod notification_worker;
mod routes;
mod serve_http;
mod session_reaper;
mod state;
mod templates;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use forage_core::session::{FileSessionStore, SessionStore};
use forage_db::PgSessionStore;
use opentelemetry::trace::TracerProvider as _;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use axum::Router;
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::{Html, IntoResponse, Response};
use minijinja::context;
use tower_http::services::ServeDir;
use tower_http::trace::TraceLayer;
use crate::forest_client::GrpcForestClient;
use crate::state::AppState;
use crate::templates::TemplateEngine;
fn init_telemetry() {
let env_filter =
EnvFilter::try_from_default_env().unwrap_or_else(|_| "info,h2=warn,tonic=info".into());
let fmt_layer = tracing_subscriber::fmt::layer();
if std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").is_ok() {
let tracer = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.build()
.expect("failed to create OTLP span exporter");
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_batch_exporter(tracer)
.with_resource(
opentelemetry_sdk::Resource::builder()
.with_service_name(
std::env::var("OTEL_SERVICE_NAME")
.unwrap_or_else(|_| "forage-server".into()),
)
.build(),
)
.build();
let otel_layer =
tracing_opentelemetry::layer().with_tracer(tracer_provider.tracer("forage-server"));
tracing_subscriber::registry()
.with(env_filter)
.with(fmt_layer)
.with(otel_layer)
.init();
tracing::info!("OpenTelemetry enabled — exporting to OTLP endpoint");
} else {
tracing_subscriber::registry()
.with(env_filter)
.with(fmt_layer)
.init();
}
}
async fn fallback_404(State(state): State<AppState>) -> Response {
let html = state.templates.render(
"pages/error.html.jinja",
context! {
title => "Not Found - Forage",
description => "The page you're looking for doesn't exist.",
status => 404u16,
heading => "Page not found",
message => "The page you're looking for doesn't exist.",
},
);
match html {
Ok(body) => (StatusCode::NOT_FOUND, Html(body)).into_response(),
Err(_) => StatusCode::NOT_FOUND.into_response(),
}
}
pub fn build_router(state: AppState) -> Router {
Router::new()
.merge(routes::router())
.nest_service("/static", ServeDir::new("static"))
.fallback(fallback_404)
.layer(TraceLayer::new_for_http())
.with_state(state)
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
init_telemetry();
let forest_endpoint =
std::env::var("FOREST_SERVER_URL").unwrap_or_else(|_| "http://localhost:4040".into());
tracing::info!("connecting to forest-server at {forest_endpoint}");
let forest_client = GrpcForestClient::connect_lazy(&forest_endpoint)?;
let template_engine = TemplateEngine::new()?;
let port: u16 = std::env::var("PORT")
.ok()
.and_then(|p| p.parse().ok())
.unwrap_or(3000);
let addr = SocketAddr::from(([0, 0, 0, 0], port));
// Build components based on available configuration
let mut mad = notmad::Mad::builder();
// Session store + integration store: PostgreSQL if DATABASE_URL is set
let (sessions, integration_store): (
Arc<dyn SessionStore>,
Option<Arc<dyn forage_core::integrations::IntegrationStore>>,
);
if let Ok(database_url) = std::env::var("DATABASE_URL") {
tracing::info!("using PostgreSQL session store");
let pool = sqlx::PgPool::connect(&database_url).await?;
forage_db::migrate(&pool).await?;
let pg_store = Arc::new(PgSessionStore::new(pool.clone()));
// Integration store (uses same pool)
let encryption_key = std::env::var("INTEGRATION_ENCRYPTION_KEY").unwrap_or_else(|_| {
tracing::warn!(
"INTEGRATION_ENCRYPTION_KEY not set — using default key (not safe for production)"
);
"forage-dev-key-not-for-production!!".to_string()
});
let pg_integrations = Arc::new(forage_db::PgIntegrationStore::new(
pool,
encryption_key.into_bytes(),
));
// Session reaper component
mad.add(session_reaper::PgSessionReaper {
store: pg_store.clone(),
max_inactive_days: 30,
});
sessions = pg_store;
integration_store =
Some(pg_integrations as Arc<dyn forage_core::integrations::IntegrationStore>);
} else {
let session_dir = std::env::var("SESSION_DIR").unwrap_or_else(|_| "target/sessions".into());
tracing::info!(
"using file session store at {session_dir} (set DATABASE_URL for PostgreSQL)"
);
let file_store =
Arc::new(FileSessionStore::new(&session_dir).expect("failed to create session dir"));
// File session reaper component
mad.add(session_reaper::FileSessionReaper {
store: file_store.clone(),
});
sessions = file_store as Arc<dyn SessionStore>;
integration_store = None;
};
let forest_client = Arc::new(forest_client);
let mut state = AppState::new(
template_engine,
forest_client.clone(),
forest_client.clone(),
sessions,
)
.with_grpc_client(forest_client.clone());
// Slack OAuth config (optional, enables "Add to Slack" button)
if let (Ok(client_id), Ok(client_secret)) = (
std::env::var("SLACK_CLIENT_ID"),
std::env::var("SLACK_CLIENT_SECRET"),
) {
let redirect_host = std::env::var("SLACK_REDIRECT_HOST")
.unwrap_or_else(|_| format!("http://localhost:{port}"));
tracing::info!("Slack OAuth enabled");
state = state.with_slack_config(crate::state::SlackConfig {
client_id,
client_secret,
redirect_host,
});
}
// NATS JetStream connection (optional, enables durable notification delivery)
let nats_jetstream = if let Ok(nats_url) = std::env::var("NATS_URL") {
match async_nats::connect(&nats_url).await {
Ok(client) => {
tracing::info!("connected to NATS at {nats_url}");
Some(async_nats::jetstream::new(client))
}
Err(e) => {
tracing::error!(error = %e, "failed to connect to NATS — falling back to direct dispatch");
None
}
}
} else {
None
};
if let Some(ref store) = integration_store {
state = state.with_integration_store(store.clone());
if let Ok(service_token) = std::env::var("FORAGE_SERVICE_TOKEN") {
let forage_url = std::env::var("FORAGE_URL")
.or_else(|_| std::env::var("SLACK_REDIRECT_HOST"))
.unwrap_or_else(|_| format!("http://localhost:{port}"));
if let Some(ref js) = nats_jetstream {
// JetStream mode: ingester publishes, consumer dispatches
tracing::info!("starting notification pipeline (JetStream)");
let grpc_for_consumer = forest_client.clone();
let token_for_consumer = service_token.clone();
mad.add(notification_ingester::NotificationIngester {
grpc: forest_client,
jetstream: js.clone(),
service_token,
});
mad.add(notification_consumer::NotificationConsumer {
jetstream: js.clone(),
store: store.clone(),
forage_url,
grpc: grpc_for_consumer,
service_token: token_for_consumer,
});
} else {
// Fallback: direct dispatch (no durability)
tracing::warn!(
"NATS_URL not set — using direct notification dispatch (no durability)"
);
mad.add(notification_worker::NotificationListener {
grpc: forest_client,
store: store.clone(),
service_token,
forage_url,
});
}
} else {
tracing::warn!("FORAGE_SERVICE_TOKEN not set — notification listener disabled");
}
}
// HTTP server component
mad.add(serve_http::ServeHttp { addr, state });
mad.cancellation(Some(Duration::from_secs(10)))
.run()
.await?;
Ok(())
}
#[cfg(test)]
mod test_support;
#[cfg(test)]
mod tests;