feat: with actual archive test
Some checks failed
continuous-integration/drone/push Build is failing

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
2024-02-18 13:38:16 +01:00
parent e994df19cf
commit 6cf1a23169
19 changed files with 495 additions and 230 deletions

View File

@@ -15,7 +15,6 @@ prost = "0.12.3"
tonic = "0.11.0"
uuid = { version = "1.7.0", features = ["v7", "v4"] }
async-trait = "0.1.77"
mockall_double = "0.3.1"
aws-config = { version = "1.1.5", features = ["behavior-version-latest"] }
aws-sdk-s3 = { version = "1.15.0", features = ["behavior-version-latest"] }
serde = { version = "1.0.196", features = ["derive"] }
@@ -29,4 +28,4 @@ tonic-build = "0.11.0"
[dev-dependencies]
lazy_static = "1.4.0"
mockall = "0.12.1"
reqwest = "0.11.24"

View File

@@ -13,5 +13,5 @@ message HelloRequest {
}
message HelloReply {
string message = 1;
string artifact_id = 1;
}

View File

@@ -1,11 +1,14 @@
use std::net::SocketAddr;
use axum::{routing::get, Router};
use axum::{response::IntoResponse, routing::get, Router};
use crate::app::SharedApp;
pub async fn axum_serve(host: SocketAddr, app: SharedApp) -> anyhow::Result<()> {
let app = Router::new().route("/", get(root)).with_state(app);
let app = Router::new()
.route("/ping", get(pong))
.route("/", get(root))
.with_state(app);
tracing::info!("listening on {}", host);
let listener = tokio::net::TcpListener::bind(host).await.unwrap();
@@ -15,6 +18,10 @@ pub async fn axum_serve(host: SocketAddr, app: SharedApp) -> anyhow::Result<()>
Ok(())
}
async fn pong() -> impl IntoResponse {
"pong!"
}
async fn root() -> &'static str {
"Hello, flux-releaser!"
}

View File

@@ -1,3 +1,4 @@
use anyhow::Context;
use aws_config::{BehaviorVersion, Region};
use aws_sdk_s3::config::Credentials;
@@ -5,15 +6,15 @@ pub async fn s3_client() -> anyhow::Result<aws_sdk_s3::Client> {
let shared_config = aws_config::defaults(BehaviorVersion::latest())
.region(Region::new("eu-west-1"))
.credentials_provider(Credentials::new(
std::env::var("AWS_ACCESS_KEY_ID")?,
std::env::var("AWS_SECRET_ACCESS_KEY")?,
std::env::var("AWS_ACCESS_KEY_ID").context("AWS_ACCESS_KEY_ID was not set")?,
std::env::var("AWS_SECRET_ACCESS_KEY").context("AWS_SECRET_ACCESS_KEY was not set")?,
None,
None,
"flux_releaser",
));
let config = aws_sdk_s3::config::Builder::from(&shared_config.load().await)
.endpoint_url(std::env::var("AWS_ENDPOINT_URL")?)
.endpoint_url(std::env::var("AWS_ENDPOINT_URL").context("AWS_ENDPOINT_URL was not set")?)
.build();
let client = aws_sdk_s3::Client::from_conf(config);

View File

@@ -11,7 +11,7 @@ use crate::{
use self::gen::{greeter_server, HelloReply, HelloRequest};
mod gen {
pub mod gen {
tonic::include_proto!("flux_releaser");
}
@@ -34,7 +34,8 @@ impl greeter_server::Greeter for FluxReleaserGrpc {
request: tonic::Request<HelloRequest>,
) -> std::result::Result<tonic::Response<HelloReply>, tonic::Status> {
let req = request.into_inner();
self.release_manager
let artifact = self
.release_manager
.commit_artifact(
req.try_into()
.map_err(|e: anyhow::Error| tonic::Status::invalid_argument(e.to_string()))?,
@@ -43,7 +44,7 @@ impl greeter_server::Greeter for FluxReleaserGrpc {
.unwrap();
Ok(tonic::Response::new(HelloReply {
message: "something".into(),
artifact_id: artifact.to_string(),
}))
}
}

View File

@@ -0,0 +1,15 @@
use cli::Command;
pub mod api;
pub mod app;
pub mod cli;
pub mod grpc;
pub mod services;
pub async fn run() -> anyhow::Result<()> {
dotenv::dotenv().ok();
Command::run().await?;
Ok(())
}

View File

@@ -1,19 +1,6 @@
use cli::Command;
mod cli;
mod api;
mod grpc;
mod app;
mod services;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();
Command::run().await?;
flux_releaser::run().await?;
Ok(())
}

View File

@@ -1,5 +1,5 @@
mod archive;
mod domain_events;
mod file_reader;
mod file_store;
pub mod archive;
pub mod domain_events;
pub mod file_reader;
pub mod file_store;
pub mod release_manager;

View File

@@ -13,10 +13,8 @@ pub mod extensions {
use crate::{app::SharedApp, services::release_manager::models::ArtifactID};
#[mockall_double::double]
use crate::services::file_store::FileStore;
#[mockall_double::double]
use super::Archive;
use super::ArchiveFile;
@@ -92,19 +90,15 @@ pub mod extensions {
}
}
#[cfg(test)]
use mockall::{automock, mock, predicate::*};
use super::file_reader::{File, Files};
#[cfg_attr(test, automock)]
impl Archive {
pub fn new() -> Self {
Self {}
}
pub async fn create_archive(&self, files: Files) -> anyhow::Result<ArchiveFile> {
tracing::trace!("archiving files");
tracing::trace!("archiving files: {}", files.len());
let buffer = Vec::new();
let cursor = Cursor::new(buffer);
@@ -114,9 +108,14 @@ impl Archive {
for file in files {
let abs_file_path = file.path;
tracing::trace!("archiving file: {}", abs_file_path.display());
let mut fd = std::fs::File::open(&abs_file_path)?;
tar_builder.append_file(&abs_file_path, &mut fd)?;
if let Some(rel) = file.relative {
tracing::trace!("archiving rel file: {}", rel.display());
tar_builder.append_file(&rel, &mut fd)?;
} else {
tracing::trace!("archiving file: {}", abs_file_path.display());
tar_builder.append_file(&abs_file_path, &mut fd)?;
}
}
tar_builder.finish()?;

View File

@@ -5,12 +5,7 @@ pub struct DomainEvents {
nats: Nats,
}
#[cfg(test)]
use mockall::{automock, mock, predicate::*};
use crate::app::infra::{nats::Nats};
#[cfg_attr(test, automock)]
use crate::app::infra::nats::Nats;
impl DomainEvents {
pub fn new(nats: Nats) -> Self {
Self { nats }

View File

@@ -1,6 +1,5 @@
use crate::app::SharedApp;
#[mockall_double::double]
use super::DomainEvents;
pub trait DomainEventsExt {

View File

@@ -1,15 +1,15 @@
#[derive(Clone)]
pub struct FileReader {}
use std::{collections::BTreeMap, path::PathBuf};
use std::{
collections::BTreeMap,
path::{Path, PathBuf},
};
pub mod extensions;
use anyhow::anyhow;
#[cfg(test)]
use mockall::{automock, mock, predicate::*};
#[cfg_attr(test, automock)]
impl FileReader {
pub fn new() -> Self {
Self {}
@@ -49,7 +49,11 @@ impl FileReader {
cluster_name
);
files.push(file.into_path().into())
if file.path().is_absolute() {
files.push((file.path(), file.path().strip_prefix(&location)?).into())
} else {
files.push(file.into_path().into())
}
}
}
@@ -60,11 +64,32 @@ impl FileReader {
#[derive(Debug, Clone)]
pub struct File {
pub path: PathBuf,
pub relative: Option<PathBuf>,
}
impl From<PathBuf> for File {
fn from(value: PathBuf) -> Self {
Self { path: value }
Self {
path: value,
relative: None,
}
}
}
impl From<(PathBuf, PathBuf)> for File {
fn from(value: (PathBuf, PathBuf)) -> Self {
Self {
path: value.0,
relative: Some(value.1),
}
}
}
impl From<(&Path, &Path)> for File {
fn from(value: (&Path, &Path)) -> Self {
Self {
path: value.0.to_path_buf(),
relative: Some(value.1.to_path_buf()),
}
}
}
@@ -92,14 +117,8 @@ impl From<Files> for Vec<File> {
value
.iter()
.map(|(cluster_name, files)| (PathBuf::from(cluster_name), files))
.flat_map(|(cluster_name, files)| {
files
.iter()
//.map(|file_path| cluster_name.join(&file_path.path))
.map(|file_path| file_path.path.clone())
.collect::<Vec<_>>()
})
.map(|f| f.into())
.flat_map(|(_cluster_name, files)| files.to_vec())
// .map(|f| f.into())
.collect::<Vec<_>>()
}
}

View File

@@ -1,6 +1,5 @@
use crate::app::SharedApp;
#[mockall_double::double]
use super::FileReader;
pub trait FileReaderExt {

View File

@@ -1,4 +1,4 @@
use std::path::PathBuf;
use std::{env::temp_dir, path::PathBuf};
use super::release_manager::models::ArtifactID;
@@ -10,11 +10,8 @@ pub struct FileStore {
}
use aws_sdk_s3::primitives::ByteStream;
#[cfg(test)]
use mockall::{automock, mock, predicate::*};
use tokio::io::AsyncReadExt;
use tokio::io::BufReader;
#[cfg_attr(test, automock)]
impl FileStore {
pub fn new(client: aws_sdk_s3::Client) -> Self {
Self { client }
@@ -33,4 +30,32 @@ impl FileStore {
Ok(())
}
pub async fn get_archive(&self, artifact_id: ArtifactID) -> anyhow::Result<PathBuf> {
tracing::trace!("getting archive: {}", artifact_id.to_string());
let archive_name = format!("archives/{}.tar", &artifact_id.to_string());
let obj = self
.client
.get_object()
.bucket("mybucket")
.key(&archive_name)
.send()
.await?;
let archive_path = temp_dir()
.join("flux_releaser")
.join("cache")
.join(&archive_name);
tokio::fs::create_dir_all(archive_path.parent().unwrap()).await?;
let mut archive_file = tokio::fs::File::create(&archive_path).await?;
let mut buf_reader = BufReader::new(obj.body.into_async_read());
tokio::io::copy(&mut buf_reader, &mut archive_file).await?;
tracing::debug!("created archive: {}", archive_path.display());
Ok(archive_path)
}
}

View File

@@ -1,6 +1,5 @@
use crate::app::SharedApp;
#[mockall_double::double]
use super::FileStore;
pub trait FileStoreExt {

View File

@@ -1,14 +1,10 @@
use serde::Serialize;
use crate::services::archive::extensions::ArchiveUploadExt;
#[mockall_double::double]
use crate::services::file_store::FileStore;
#[mockall_double::double]
use super::archive::Archive;
#[mockall_double::double]
use super::domain_events::DomainEvents;
#[mockall_double::double]
use super::file_reader::FileReader;
use self::models::{ArtifactID, CommitArtifact};
@@ -64,53 +60,3 @@ pub struct CommittedArtifactEvent {
pub mod extensions;
pub mod models;
#[cfg(test)]
mod test {
use crate::services::archive::{ArchiveFile, MockArchive};
use crate::services::domain_events::MockDomainEvents;
use crate::services::file_reader::{Files, MockFileReader};
use crate::services::file_store::MockFileStore;
use super::*;
#[tokio::test]
async fn generated_artifact_id() -> anyhow::Result<()> {
let mut file_store = MockFileStore::default();
file_store
.expect_upload_file()
.times(1)
.returning(|_, _| Ok(()));
let mut domain_events = MockDomainEvents::default();
domain_events
.expect_publish_event()
.times(1)
.returning(|_| Ok(()));
let mut file_reader = MockFileReader::default();
file_reader
.expect_read_files()
.times(1)
.returning(|_| Ok(Files::default()));
let mut archive = MockArchive::default();
archive.expect_create_archive().times(1).returning(|_| {
Ok(ArchiveFile {
content: Vec::new(),
})
});
let releaser_manager = ReleaseManager::new(file_reader, file_store, archive, domain_events);
releaser_manager
.commit_artifact(CommitArtifact {
app: "app".into(),
branch: "branch".into(),
folder: "someFolder".into(),
})
.await?;
Ok(())
}
}

View File

@@ -23,3 +23,13 @@ impl std::ops::Deref for ArtifactID {
&self.0
}
}
impl TryFrom<String> for ArtifactID {
type Error = anyhow::Error;
fn try_from(value: String) -> Result<Self, Self::Error> {
let uuid = uuid::Uuid::parse_str(&value)?;
Ok(ArtifactID(uuid))
}
}

View File

@@ -1,48 +1,191 @@
struct Server {}
use std::{
env::temp_dir,
net::{Ipv4Addr, SocketAddr},
time::Duration,
};
use anyhow::Context;
use flux_releaser::{
app::SharedApp, grpc::gen::HelloRequest, services::file_store::extensions::FileStoreExt,
};
use tokio::{net::TcpListener, runtime::Runtime, time::sleep};
use tonic::transport::Channel;
use uuid::Uuid;
struct Server {
endpoints: Endpoints,
app: SharedApp,
}
#[derive(Clone, Debug)]
struct Endpoints {
http: SocketAddr,
grpc: SocketAddr,
}
impl Server {
pub async fn new() -> Self {
Self {}
pub async fn new() -> anyhow::Result<Self> {
let http_socket = Self::find_free_port().await?;
let grpc_socket = Self::find_free_port().await?;
Ok(Self {
endpoints: Endpoints {
http: http_socket,
grpc: grpc_socket,
},
app: SharedApp::new(flux_releaser::app::App::new().await?),
})
}
pub async fn start(&self) -> anyhow::Result<()> {
flux_releaser::cli::server::run_server(self.endpoints.http, self.endpoints.grpc).await?;
Ok(())
}
pub async fn find_free_port() -> anyhow::Result<SocketAddr> {
let socket = SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let listener = TcpListener::bind(socket).await?;
listener.local_addr().context("failed to get local addr")
}
}
static INIT: std::sync::Once = std::sync::Once::new();
async fn perform_task_with_backoff<F, Fut, T, E>(
mut task: F,
max_retries: u32,
base_delay_ms: u64,
) -> Result<T, E>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
let mut retries = 0;
let mut delay = base_delay_ms;
loop {
match task().await {
Ok(result) => return Ok(result),
Err(e) if retries < max_retries => {
sleep(Duration::from_millis(delay)).await;
delay *= 2; // Exponential backoff
retries += 1;
}
Err(e) => return Err(e),
}
}
}
// Makes sure the setup is ready for execution
async fn is_ready() -> anyhow::Result<()> {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
perform_task_with_backoff(
|| async {
let endpoints = unsafe {
if ENDPOINTS.is_none() {
anyhow::bail!("endpoints not set yet");
}
ENDPOINTS.clone().unwrap()
};
let resp = reqwest::get(format!("http://{}/ping", endpoints.http)).await?;
if !resp.status().is_success() {
anyhow::bail!("failed with status: {}", resp.status());
}
Ok::<(), anyhow::Error>(())
},
5,
500,
)
.await?;
Ok(())
}
async fn setup() -> anyhow::Result<()> {
static mut ENDPOINTS: Option<Endpoints> = None;
static mut APP: Option<SharedApp> = None;
async fn setup() -> anyhow::Result<(Endpoints, SharedApp)> {
INIT.call_once(|| {
tokio::spawn(async move {
println!("once was created once");
Server::new().await.start().await.unwrap();
std::thread::spawn(|| {
let rt = Runtime::new().unwrap();
rt.block_on(async move {
println!("once was created once");
let server = Server::new().await.unwrap();
unsafe {
ENDPOINTS = Some(server.endpoints.clone());
APP = Some(server.app.clone());
}
server.start().await.unwrap();
})
});
});
is_ready().await?;
Ok(())
Ok(unsafe { (ENDPOINTS.clone().unwrap(), APP.clone().unwrap()) })
}
#[tokio::test]
async fn can_create_artifact() -> anyhow::Result<()> {
setup().await?;
anyhow::bail!("failed one");
Ok(())
}
#[tokio::test]
async fn can_more_create_artifact() -> anyhow::Result<()> {
setup().await?;
std::env::set_var("RUST_LOG", "flux_releaser=trace");
let (endpoints, app) = setup().await?;
let mut client = flux_releaser::grpc::gen::greeter_client::GreeterClient::connect(format!(
"http://{}",
endpoints.grpc
))
.await?;
let test_id = Uuid::new_v4();
let temp = temp_dir()
.join("flux_releaser")
.join("tests")
.join(test_id.to_string());
let file_path = temp
.join("clusters")
.join(Uuid::new_v4().to_string())
.join("some-file.yaml");
tokio::fs::create_dir_all(file_path.parent().unwrap()).await?;
let _ = tokio::fs::File::create(file_path).await?;
let resp = client
.say_hello(HelloRequest {
app: "some-app".into(),
branch: "some-branch".into(),
folder: temp.to_string_lossy().to_string(),
})
.await?;
let artifact = resp.into_inner();
let archive = app
.file_store()
.get_archive(artifact.artifact_id.try_into()?)
.await?;
assert!(archive.exists());
anyhow::bail!("failed two");
Ok(())
}
pub struct TestGreeter {}