feat: add cockroach db
Some checks failed
continuous-integration/drone/push Build is failing

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
2024-02-22 18:26:53 +01:00
parent b4af46aa23
commit ff6cc941d8
13 changed files with 515 additions and 106 deletions

View File

@@ -24,7 +24,8 @@ walkdir = "2.4.0"
tar = "0.4.40"
tokio-stream = { version = "0.1.14", features = ["full"] }
rand = "0.8.5"
sqlx = { version = "0.7.3", features = ["postgres", "runtime-tokio"] }
sqlx = { version = "0.7.3", features = ["postgres", "runtime-tokio", "uuid", "chrono"] }
chrono = "0.4.34"
[build-dependencies]
tonic-build = "0.11.0"

View File

@@ -1 +1,8 @@
-- Add migration script here
CREATE TABLE IF NOT EXISTS artifacts (
app VARCHAR NOT NULL,
branch VARCHAR NOT NULL,
artifact_id UUID NOT NULL PRIMARY KEY,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE (app, artifact_id)
);

View File

@@ -5,6 +5,7 @@ package flux_releaser;
service FluxReleaser {
rpc UploadArtifact (stream UploadArtifactRequest) returns (UploadArtifactResponse) {}
rpc CommitArtifact (CommitArtifactRequest) returns (CommitArtifactResponse) {}
rpc TriggerRelease (TriggerReleaseRequest) returns (TriggerReleaseResponse) {}
}
message UploadArtifactRequest {
@@ -24,3 +25,11 @@ message CommitArtifactRequest {
message CommitArtifactResponse {
string artifact_id = 1;
}
message TriggerReleaseRequest {
string app = 1;
string branch = 2;
string cluster = 3;
}
message TriggerReleaseResponse {}

View File

@@ -32,6 +32,20 @@ pub enum Commands {
#[arg(env = "FLUX_RELEASER_REGISTRY", long)]
registry: String,
},
Release {
#[arg(long)]
app: String,
#[arg(long)]
branch: String,
#[arg(long)]
cluster: String,
#[arg(env = "FLUX_RELEASER_REGISTRY", long)]
registry: String,
},
}
impl Command {
@@ -55,7 +69,19 @@ impl Command {
.package_clusters(include)
.await?;
}
_ => (),
Some(Commands::Release {
app: service_app,
branch,
cluster,
registry,
}) => {
let app = client::get_local_app(registry).await?;
app.flux_local_cluster_manager()
.trigger_release(service_app, branch, cluster)
.await?;
}
None => (),
}
Ok(())

View File

@@ -8,13 +8,15 @@ use uuid::Uuid;
use crate::{
app::SharedApp,
services::release_manager::{
extensions::ReleaseManagerExt, models::CommitArtifact, ReleaseManager,
extensions::ReleaseManagerExt,
models::{CommitArtifact, Release},
ReleaseManager,
},
};
use self::gen::{
flux_releaser_server, CommitArtifactRequest, CommitArtifactResponse, UploadArtifactRequest,
UploadArtifactResponse,
flux_releaser_server, CommitArtifactRequest, CommitArtifactResponse, TriggerReleaseRequest,
TriggerReleaseResponse, UploadArtifactRequest, UploadArtifactResponse,
};
pub mod gen {
@@ -80,12 +82,29 @@ impl flux_releaser_server::FluxReleaser for FluxReleaserGrpc {
.map_err(|e: anyhow::Error| tonic::Status::invalid_argument(e.to_string()))?,
)
.await
.unwrap();
.map_err(|e: anyhow::Error| tonic::Status::internal(e.to_string()))?;
Ok(tonic::Response::new(CommitArtifactResponse {
artifact_id: artifact.to_string(),
}))
}
async fn trigger_release(
&self,
request: tonic::Request<TriggerReleaseRequest>,
) -> std::result::Result<tonic::Response<TriggerReleaseResponse>, tonic::Status> {
let req = request.into_inner();
self.release_manager
.release(
req.try_into()
.map_err(|e: anyhow::Error| tonic::Status::invalid_argument(e.to_string()))?,
)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;
Ok(tonic::Response::new(TriggerReleaseResponse {}))
}
}
impl TryFrom<CommitArtifactRequest> for CommitArtifact {
@@ -110,6 +129,30 @@ impl TryFrom<CommitArtifactRequest> for CommitArtifact {
}
}
impl TryFrom<TriggerReleaseRequest> for Release {
type Error = anyhow::Error;
fn try_from(value: TriggerReleaseRequest) -> Result<Self, Self::Error> {
if value.app.is_empty() {
anyhow::bail!("app cannot be empty");
}
if value.branch.is_empty() {
anyhow::bail!("branch canot be empty");
}
if value.cluster.is_empty() {
anyhow::bail!("cluster canot be empty");
}
Ok(Self {
app: value.app,
branch: value.branch,
cluster: value.cluster,
})
}
}
pub async fn tonic_serve(host: SocketAddr, app: SharedApp) -> anyhow::Result<()> {
tracing::info!("grpc listening on: {}", host);
Server::builder()

View File

@@ -5,3 +5,5 @@ pub mod file_store;
pub mod flux_local_cluster;
pub mod flux_releaser_uploader;
pub mod release_manager;
pub mod artifacts_db;

View File

@@ -0,0 +1,132 @@
use std::sync::Arc;
use sqlx::{prelude::FromRow, PgPool};
use self::defaults::DefaultArtifactsDB;
#[derive(Clone, Debug)]
pub struct AddCommitArtifact {
pub app: String,
pub branch: String,
pub artifact_id: uuid::Uuid,
}
#[derive(Clone, Debug, FromRow)]
pub struct Artifact {
pub app: String,
pub branch: String,
pub artifact_id: uuid::Uuid,
pub created_at: chrono::NaiveDateTime,
}
#[derive(Clone, Debug)]
pub struct GetLatestArtifact {
pub app: String,
pub branch: String,
}
pub mod traits {
use axum::async_trait;
use super::{AddCommitArtifact, Artifact, GetLatestArtifact};
#[async_trait]
pub trait ArtifactsDB {
async fn commit_artifact(&self, commit_artifact: AddCommitArtifact) -> anyhow::Result<()>;
async fn get_latest_artifact(
&self,
get_latest_artifact: GetLatestArtifact,
) -> anyhow::Result<Artifact>;
}
}
pub mod defaults {
use axum::async_trait;
use sqlx::PgPool;
use super::{traits, AddCommitArtifact, Artifact, GetLatestArtifact};
pub struct DefaultArtifactsDB {
db: PgPool,
}
impl DefaultArtifactsDB {
pub fn new(db: PgPool) -> Self {
Self { db }
}
}
#[async_trait]
impl traits::ArtifactsDB for DefaultArtifactsDB {
async fn commit_artifact(&self, commit_artifact: AddCommitArtifact) -> anyhow::Result<()> {
sqlx::query("INSERT INTO artifacts (app, branch, artifact_id) VALUES ($1, $2, $3)")
.bind(commit_artifact.app)
.bind(commit_artifact.branch)
.bind(commit_artifact.artifact_id)
.execute(&self.db)
.await?;
Ok(())
}
async fn get_latest_artifact(
&self,
get_latest_artifact: GetLatestArtifact,
) -> anyhow::Result<Artifact> {
let artifact = sqlx::query_as::<_, Artifact>(
"SELECT
*
FROM
artifacts
WHERE
app = $1 AND
branch = $2
ORDER BY created_at DESC
LIMIT 1",
)
.bind(get_latest_artifact.app)
.bind(get_latest_artifact.branch)
.fetch_one(&self.db)
.await?;
Ok(artifact)
}
}
}
#[derive(Clone)]
pub struct ArtifactsDB {
inner: Arc<dyn traits::ArtifactsDB + Send + Sync>,
}
impl ArtifactsDB {
pub fn new(db: PgPool) -> Self {
Self {
inner: Arc::new(DefaultArtifactsDB::new(db)),
}
}
}
impl std::ops::Deref for ArtifactsDB {
type Target = Arc<dyn traits::ArtifactsDB + Send + Sync>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
pub mod extensions {
use crate::app::App;
use super::ArtifactsDB;
pub trait ArtifactsDBExt {
fn artifacts_db(&self) -> ArtifactsDB;
}
impl ArtifactsDBExt for App {
fn artifacts_db(&self) -> ArtifactsDB {
ArtifactsDB::new(self.database.clone())
}
}
}

View File

@@ -1,6 +1,11 @@
use std::path::PathBuf;
use crate::{app::infra::grpc::FluxReleaserGrpcClient, grpc::gen::CommitArtifactRequest};
use anyhow::Context;
use crate::{
app::infra::grpc::FluxReleaserGrpcClient,
grpc::gen::{CommitArtifactRequest, TriggerReleaseRequest},
};
use super::{
archive::Archive,
@@ -61,6 +66,36 @@ impl FluxLocalClusterManager {
artifact_id.into_inner().artifact_id.try_into()
}
pub async fn trigger_release(
&self,
app: impl Into<String>,
branch: impl Into<String>,
cluster: impl Into<String>,
) -> anyhow::Result<()> {
self.flux_releaser_client
.lock()
.await
.trigger_release(tonic::Request::new(TriggerReleaseRequest {
app: app.into(),
branch: branch.into(),
cluster: cluster.into(),
}))
.await
.context("failed to trigger release")?;
// Send release proto to upstream
// 1. find app by app + branch
// 2. Unpack latest artifact by app + branch
// 3. Unpack by cluster
// 4. Upload
Ok(())
}
}
pub mod extensions {

View File

@@ -1,19 +1,21 @@
use serde::Serialize;
use crate::services::archive::extensions::ArchiveUploadExt;
use crate::services::artifacts_db::{AddCommitArtifact, GetLatestArtifact};
use crate::services::file_store::FileStore;
use super::archive::Archive;
use super::artifacts_db::ArtifactsDB;
use super::domain_events::DomainEvents;
use super::file_reader::FileReader;
use self::models::{ArtifactID, CommitArtifact, UploadArtifact, UploadArtifactID};
use self::models::*;
pub struct ReleaseManager {
archive: Archive,
file_reader: FileReader,
file_store: FileStore,
domain_events: DomainEvents,
artifacts_db: ArtifactsDB,
}
impl ReleaseManager {
@@ -22,12 +24,14 @@ impl ReleaseManager {
file_store: FileStore,
archive: Archive,
domain_events: DomainEvents,
artifacts_db: ArtifactsDB,
) -> Self {
Self {
archive,
file_reader,
file_store,
domain_events,
artifacts_db,
}
}
@@ -60,8 +64,37 @@ impl ReleaseManager {
})?)
.await?;
self.artifacts_db
.commit_artifact(AddCommitArtifact {
app: request.app,
branch: request.branch,
artifact_id: artifact_id.clone().into(),
})
.await?;
Ok(artifact_id)
}
pub async fn release(&self, release_req: Release) -> anyhow::Result<()> {
tracing::debug!(
app = release_req.app,
branch = release_req.branch,
cluster = release_req.cluster,
"releasing latest commit"
);
let artifact = self
.artifacts_db
.get_latest_artifact(GetLatestArtifact {
app: release_req.app,
branch: release_req.branch,
})
.await?;
tracing::trace!("found latest artifact: {:?}", artifact);
Ok(())
}
}
#[derive(Serialize)]

View File

@@ -1,12 +1,15 @@
use crate::{
app::SharedApp,
services::{
archive::extensions::ArchiveExt, domain_events::extensions::DomainEventsExt,
file_reader::extensions::FileReaderExt, file_store::extensions::FileStoreExt,
archive::extensions::ArchiveExt,
artifacts_db::{extensions::ArtifactsDBExt, AddCommitArtifact},
domain_events::extensions::DomainEventsExt,
file_reader::extensions::FileReaderExt,
file_store::extensions::FileStoreExt,
},
};
use super::ReleaseManager;
use super::{CommitArtifact, ReleaseManager};
pub trait ReleaseManagerExt {
fn release_manager(&self) -> ReleaseManager;
@@ -19,6 +22,7 @@ impl ReleaseManagerExt for SharedApp {
self.file_store(),
self.archive(),
self.domain_events(),
self.artifacts_db(),
)
}
}

View File

@@ -7,6 +7,13 @@ pub struct CommitArtifact {
pub upload_id: UploadArtifactID,
}
#[derive(Clone, Debug)]
pub struct Release {
pub app: String,
pub branch: String,
pub cluster: String,
}
#[derive(Debug, Clone)]
pub struct ArtifactID(uuid::Uuid);
@@ -52,6 +59,12 @@ impl From<uuid::Uuid> for UploadArtifactID {
}
}
impl From<ArtifactID> for uuid::Uuid {
fn from(value: ArtifactID) -> Self {
value.0
}
}
impl TryFrom<String> for UploadArtifactID {
type Error = anyhow::Error;

View File

@@ -15,7 +15,6 @@ use flux_releaser::{
},
};
use tokio::{net::TcpListener, runtime::Runtime, sync::Mutex, time::sleep};
use uuid::Uuid;
struct Server {
endpoints: Endpoints,
@@ -200,4 +199,41 @@ async fn can_publish_artifact() -> anyhow::Result<()> {
Ok(())
}
#[tokio::test]
async fn can_trigger_latest_release() -> anyhow::Result<()> {
let test_id = uuid::Uuid::now_v7();
std::env::set_var("RUST_LOG", "flux_releaser=trace");
let (endpoints, app) = setup().await?;
let local_app = local_setup(endpoints.clone()).await?;
let upload_id = local_app
.flux_local_cluster_manager()
.package_clusters("testdata/flux_local_cluster")
.await?;
let archive = app.file_store().get_temp(upload_id.clone()).await?;
assert!(archive.exists());
let _ = local_app
.flux_local_cluster_manager()
.commit_artifact(test_id, "some-branch", upload_id)
.await?;
local_app
.flux_local_cluster_manager()
.trigger_release(test_id, "some-branch", "flux_local_cluster")
.await?;
// 1. Verify that release event has been sent
// 2. Verify that we've splatted the flux cluster over the upstream registry
// 3. Verify database has a release record
todo!();
Ok(())
}
pub struct TestGreeter {}