Some checks reported errors
continuous-integration/drone/push Build encountered an error
Signed-off-by: kjuulh <contact@kjuulh.io>
189 lines
5.5 KiB
Rust
189 lines
5.5 KiB
Rust
use std::{env::temp_dir, fmt::Display, net::SocketAddr};
|
|
|
|
use tokio::io::AsyncWriteExt;
|
|
use tokio_stream::StreamExt;
|
|
use tonic::{service::interceptor, transport::Server};
|
|
use uuid::Uuid;
|
|
|
|
use crate::{
|
|
app::SharedApp,
|
|
services::release_manager::{
|
|
extensions::ReleaseManagerExt,
|
|
models::{CommitArtifact, Release},
|
|
ReleaseManager,
|
|
},
|
|
};
|
|
|
|
use self::gen::{
|
|
flux_releaser_server, CommitArtifactRequest, CommitArtifactResponse, TriggerReleaseRequest,
|
|
TriggerReleaseResponse, UploadArtifactRequest, UploadArtifactResponse,
|
|
};
|
|
|
|
pub mod gen {
|
|
tonic::include_proto!("flux_releaser");
|
|
}
|
|
|
|
pub struct FluxReleaserGrpc {
|
|
release_manager: ReleaseManager,
|
|
}
|
|
|
|
impl FluxReleaserGrpc {
|
|
pub fn new(app: SharedApp) -> Self {
|
|
Self {
|
|
release_manager: app.release_manager(),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl std::fmt::Debug for FluxReleaserGrpc {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[tonic::async_trait]
|
|
impl flux_releaser_server::FluxReleaser for FluxReleaserGrpc {
|
|
#[tracing::instrument]
|
|
async fn upload_artifact(
|
|
&self,
|
|
request: tonic::Request<tonic::Streaming<UploadArtifactRequest>>,
|
|
) -> std::result::Result<tonic::Response<UploadArtifactResponse>, tonic::Status> {
|
|
let mut stream = request.into_inner();
|
|
|
|
let file_path = temp_dir()
|
|
.join("flux_releaser")
|
|
.join("tmp")
|
|
.join("upload_artifact")
|
|
.join(Uuid::new_v4().to_string());
|
|
tokio::fs::create_dir_all(file_path.parent().unwrap()).await?;
|
|
let mut file = tokio::fs::File::create(&file_path).await?;
|
|
|
|
while let Some(item) = stream.next().await {
|
|
tracing::trace!("received chunk");
|
|
let item = item?;
|
|
|
|
let _ = file.write(&item.content).await?;
|
|
}
|
|
tracing::info!("got this far 1a");
|
|
|
|
file.flush().await?;
|
|
tracing::info!("got this far 1");
|
|
|
|
let upload_id = match self.release_manager.upload_artifact(file_path.into()).await {
|
|
Ok(res) => res,
|
|
Err(e) => {
|
|
tracing::warn!("failed to upload artifact: {}", e);
|
|
return Err(tonic::Status::unknown(e.to_string()));
|
|
}
|
|
};
|
|
|
|
tracing::info!("got this far 2");
|
|
|
|
Ok(tonic::Response::new(UploadArtifactResponse {
|
|
upload_id: upload_id.to_string(),
|
|
}))
|
|
}
|
|
|
|
#[tracing::instrument]
|
|
async fn commit_artifact(
|
|
&self,
|
|
request: tonic::Request<CommitArtifactRequest>,
|
|
) -> std::result::Result<tonic::Response<CommitArtifactResponse>, tonic::Status> {
|
|
let req = request.into_inner();
|
|
let artifact = self
|
|
.release_manager
|
|
.commit_artifact(req.try_into().map_err(|e: anyhow::Error| {
|
|
tracing::warn!("failed to parse input body: {}", e);
|
|
tonic::Status::invalid_argument(e.to_string())
|
|
})?)
|
|
.await
|
|
.map_err(|e: anyhow::Error| {
|
|
tracing::warn!("failed to commit artifact: {}", e);
|
|
tonic::Status::internal(e.to_string())
|
|
})?;
|
|
|
|
Ok(tonic::Response::new(CommitArtifactResponse {
|
|
artifact_id: artifact.to_string(),
|
|
}))
|
|
}
|
|
|
|
#[tracing::instrument]
|
|
async fn trigger_release(
|
|
&self,
|
|
request: tonic::Request<TriggerReleaseRequest>,
|
|
) -> std::result::Result<tonic::Response<TriggerReleaseResponse>, tonic::Status> {
|
|
let req = request.into_inner();
|
|
|
|
tracing::info!("some trigger release");
|
|
|
|
self.release_manager
|
|
.release(req.try_into().map_err(|e: anyhow::Error| {
|
|
tracing::warn!("failed to parse input body: {}", e);
|
|
tonic::Status::invalid_argument(e.to_string())
|
|
})?)
|
|
.await
|
|
.map_err(|e| {
|
|
tracing::warn!("failed to release: {}", e);
|
|
tonic::Status::internal(e.to_string())
|
|
})?;
|
|
|
|
Ok(tonic::Response::new(TriggerReleaseResponse {}))
|
|
}
|
|
}
|
|
|
|
impl TryFrom<CommitArtifactRequest> for CommitArtifact {
|
|
type Error = anyhow::Error;
|
|
|
|
fn try_from(value: CommitArtifactRequest) -> Result<Self, Self::Error> {
|
|
if value.app.is_empty() {
|
|
anyhow::bail!("app cannot be empty")
|
|
}
|
|
if value.branch.is_empty() {
|
|
anyhow::bail!("branch cannot be empty")
|
|
}
|
|
if value.upload_id.is_empty() {
|
|
anyhow::bail!("folder cannot be empty")
|
|
}
|
|
|
|
Ok(Self {
|
|
app: value.app,
|
|
branch: value.branch,
|
|
upload_id: value.upload_id.try_into()?,
|
|
})
|
|
}
|
|
}
|
|
|
|
impl TryFrom<TriggerReleaseRequest> for Release {
|
|
type Error = anyhow::Error;
|
|
|
|
fn try_from(value: TriggerReleaseRequest) -> Result<Self, Self::Error> {
|
|
if value.app.is_empty() {
|
|
anyhow::bail!("app cannot be empty");
|
|
}
|
|
|
|
if value.branch.is_empty() {
|
|
anyhow::bail!("branch canot be empty");
|
|
}
|
|
|
|
Ok(Self {
|
|
app: value.app,
|
|
branch: value.branch,
|
|
})
|
|
}
|
|
}
|
|
|
|
pub async fn tonic_serve(host: SocketAddr, app: SharedApp) -> anyhow::Result<()> {
|
|
tracing::info!("grpc listening on: {}", host);
|
|
Server::builder()
|
|
.trace_fn(|_| tracing::info_span!("flux_releaser"))
|
|
.add_service(flux_releaser_server::FluxReleaserServer::new(
|
|
FluxReleaserGrpc::new(app),
|
|
))
|
|
.serve(host)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub struct LogLayer {}
|