use std::{ net::{Ipv4Addr, SocketAddr}, sync::Arc, time::Duration, }; use anyhow::Context; use flux_releaser::{ app::{LocalApp, SharedApp, SharedLocalApp}, grpc::gen::flux_releaser_client::FluxReleaserClient, services::{ archive::ArchiveFile, file_store::extensions::FileStoreExt, flux_local_cluster::extensions::FluxLocalClusterManagerExt, flux_releaser_uploader::FluxReleaserUploader, }, }; use tokio::{net::TcpListener, runtime::Runtime, sync::Mutex, time::sleep}; struct Server { endpoints: Endpoints, app: SharedApp, } #[derive(Clone, Debug)] struct Endpoints { http: SocketAddr, grpc: SocketAddr, } impl Server { pub async fn new() -> anyhow::Result { let http_socket = Self::find_free_port().await?; let grpc_socket = Self::find_free_port().await?; Ok(Self { endpoints: Endpoints { http: http_socket, grpc: grpc_socket, }, app: SharedApp::new(flux_releaser::app::App::new().await?), }) } pub async fn start(&self) -> anyhow::Result<()> { flux_releaser::cli::server::run_server(self.endpoints.http, self.endpoints.grpc).await?; Ok(()) } pub async fn find_free_port() -> anyhow::Result { let socket = SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); let listener = TcpListener::bind(socket).await?; listener.local_addr().context("failed to get local addr") } } static INIT: std::sync::Once = std::sync::Once::new(); async fn perform_task_with_backoff( mut task: F, max_retries: u32, base_delay_ms: u64, ) -> Result where F: FnMut() -> Fut, Fut: std::future::Future>, { let mut retries = 0; let mut delay = base_delay_ms; loop { match task().await { Ok(result) => return Ok(result), Err(_e) if retries < max_retries => { sleep(Duration::from_millis(delay)).await; delay *= 2; // Exponential backoff retries += 1; } Err(e) => return Err(e), } } } // Makes sure the setup is ready for execution async fn is_ready() -> anyhow::Result<()> { tokio::time::sleep(std::time::Duration::from_secs(1)).await; perform_task_with_backoff( || async { let endpoints = unsafe { if ENDPOINTS.is_none() { anyhow::bail!("endpoints not set yet"); } ENDPOINTS.clone().unwrap() }; let resp = reqwest::get(format!("http://{}/ping", endpoints.http)).await?; if !resp.status().is_success() { anyhow::bail!("failed with status: {}", resp.status()); } Ok::<(), anyhow::Error>(()) }, 5, 500, ) .await?; Ok(()) } static mut ENDPOINTS: Option = None; static mut APP: Option = None; async fn setup() -> anyhow::Result<(Endpoints, SharedApp)> { INIT.call_once(|| { std::thread::spawn(|| { let rt = Runtime::new().unwrap(); rt.block_on(async move { println!("once was created once"); let server = Server::new().await.unwrap(); unsafe { ENDPOINTS = Some(server.endpoints.clone()); APP = Some(server.app.clone()); } server.start().await.unwrap(); }) }); }); is_ready().await?; Ok(unsafe { (ENDPOINTS.clone().unwrap(), APP.clone().unwrap()) }) } async fn local_setup(endpoints: Endpoints) -> anyhow::Result { Ok(SharedLocalApp::new( LocalApp::new(format!("http://{}", endpoints.grpc)).await?, )) } #[tokio::test] async fn can_upload_artifact() -> anyhow::Result<()> { return Ok(()); std::env::set_var("RUST_LOG", "flux_releaser=trace"); let (endpoints, app) = setup().await?; let client = FluxReleaserClient::connect(format!("http://{}", endpoints.grpc)).await?; let client = FluxReleaserUploader::new(Arc::new(Mutex::new(client))); let bytes: Vec = vec![0; 10_000_000]; let upload_id = client .upload_archive(ArchiveFile { content: bytes.clone(), }) .await?; assert!(!upload_id.to_string().is_empty()); let actual_path = app.file_store().get_temp(upload_id).await?; let actual_content = tokio::fs::read(actual_path).await?; assert_eq!(&bytes, &actual_content); Ok(()) } #[tokio::test] async fn can_publish_artifact() -> anyhow::Result<()> { return Ok(()); std::env::set_var("RUST_LOG", "flux_releaser=trace"); let (endpoints, app) = setup().await?; let local_app = local_setup(endpoints.clone()).await?; let upload_id = local_app .flux_local_cluster_manager() .package_clusters("testdata/flux_local_cluster") .await?; let archive = app.file_store().get_temp(upload_id.clone()).await?; assert!(archive.exists()); let artifact_id = local_app .flux_local_cluster_manager() .commit_artifact("some-app", "some-branch", upload_id) .await?; let artifact = app.file_store().get_archive(artifact_id).await?; assert!(artifact.exists()); Ok(()) } #[tokio::test] async fn can_trigger_latest_release() -> anyhow::Result<()> { return Ok(()); let test_id = uuid::Uuid::now_v7(); std::env::set_var("RUST_LOG", "flux_releaser=trace"); let (endpoints, app) = setup().await?; let local_app = local_setup(endpoints.clone()).await?; let upload_id = local_app .flux_local_cluster_manager() .package_clusters("testdata/flux_local_cluster") .await?; let archive = app.file_store().get_temp(upload_id.clone()).await?; assert!(archive.exists()); let _ = local_app .flux_local_cluster_manager() .commit_artifact(test_id, "some-branch", upload_id) .await?; local_app .flux_local_cluster_manager() .trigger_release(test_id, "some-branch") .await?; // 1. Verify that release event has been sent // 2. Verify that we've splatted the flux cluster over the upstream registry // 3. Verify database has a release record Ok(()) } pub struct TestGreeter {}