Add nats
Some checks failed
continuous-integration/drone/push Build is failing

This commit is contained in:
2024-02-13 22:13:03 +01:00
committed by GitButler
parent f9f280278f
commit fc9a0abe7e
10 changed files with 599 additions and 68 deletions

View File

@@ -20,6 +20,7 @@ aws-config = { version = "1.1.5", features = ["behavior-version-latest"] }
aws-sdk-s3 = { version = "1.15.0", features = ["behavior-version-latest"] }
serde = { version = "1.0.196", features = ["derive"] }
serde_json = "1.0.113"
nats = "0.24.1"
[build-dependencies]
tonic-build = "0.11.0"

View File

@@ -24,48 +24,16 @@ impl SharedApp {
pub struct App {
pub s3_client: aws_sdk_s3::Client,
pub nats: infra::nats::Nats,
}
impl App {
pub async fn new() -> anyhow::Result<Self> {
Ok(Self {
s3_client: s3_client().await?,
nats: infra::nats::Nats::new().await?,
})
}
}
mod infra {
pub mod aws_s3 {
use aws_config::{BehaviorVersion, Region};
use aws_sdk_s3::config::Credentials;
pub async fn s3_client() -> anyhow::Result<aws_sdk_s3::Client> {
let shared_config = aws_config::defaults(BehaviorVersion::latest())
.region(Region::new("eu-west-1"))
.credentials_provider(Credentials::new(
std::env::var("AWS_ACCESS_KEY_ID")?,
std::env::var("AWS_SECRET_ACCESS_KEY")?,
None,
None,
"flux_releaser",
));
let config = aws_sdk_s3::config::Builder::from(&shared_config.load().await)
.endpoint_url(std::env::var("AWS_ENDPOINT_URL")?)
.build();
let client = aws_sdk_s3::Client::from_conf(config);
let buckets = client.list_buckets().send().await?;
buckets
.buckets
.iter()
.flatten()
.flat_map(|b| &b.name)
.for_each(|n| tracing::debug!("test: found aws bucket: {}", n));
Ok(client)
}
}
}
pub mod infra;

View File

@@ -0,0 +1,2 @@
pub mod aws_s3;
pub mod nats;

View File

@@ -0,0 +1,31 @@
use aws_config::{BehaviorVersion, Region};
use aws_sdk_s3::config::Credentials;
pub async fn s3_client() -> anyhow::Result<aws_sdk_s3::Client> {
let shared_config = aws_config::defaults(BehaviorVersion::latest())
.region(Region::new("eu-west-1"))
.credentials_provider(Credentials::new(
std::env::var("AWS_ACCESS_KEY_ID")?,
std::env::var("AWS_SECRET_ACCESS_KEY")?,
None,
None,
"flux_releaser",
));
let config = aws_sdk_s3::config::Builder::from(&shared_config.load().await)
.endpoint_url(std::env::var("AWS_ENDPOINT_URL")?)
.build();
let client = aws_sdk_s3::Client::from_conf(config);
let buckets = client.list_buckets().send().await?;
buckets
.buckets
.iter()
.flatten()
.flat_map(|b| &b.name)
.for_each(|n| tracing::debug!("test: found aws bucket: {}", n));
Ok(client)
}

View File

@@ -0,0 +1,32 @@
use std::sync::Arc;
use anyhow::Context;
#[derive(Clone)]
pub struct Nats {
nats: Arc<nats::asynk::Connection>,
}
impl Nats {
pub async fn new() -> anyhow::Result<Self> {
let nats = nats::asynk::Options::with_user_pass(
&std::env::var("NATS_USERNAME").context("NATS_USERNAME was not found")?,
&std::env::var("NATS_PASSWORD").context("NATS_PASSWORD was not found")?,
)
.with_name(std::env!("CARGO_PKG_NAME"))
.connect(std::env::var("NATS_URL").context("NATS_URL was not found")?)
.await?;
Ok(Self {
nats: Arc::new(nats),
})
}
}
impl std::ops::Deref for Nats {
type Target = nats::asynk::Connection;
fn deref(&self) -> &Self::Target {
&self.nats
}
}

View File

@@ -1,24 +1,28 @@
use std::{path::PathBuf, sync::Arc};
use super::release_manager::models::ArtifactID;
pub mod extensions;
#[derive(Clone)]
pub struct DomainEvents {}
pub struct DomainEvents {
nats: Nats,
}
#[cfg(test)]
use mockall::{automock, mock, predicate::*};
use crate::app::infra::{self, nats::Nats};
#[cfg_attr(test, automock)]
impl DomainEvents {
pub fn new() -> Self {
Self {}
pub fn new(nats: Nats) -> Self {
Self { nats }
}
pub async fn publish_event(&self, event: &str) -> anyhow::Result<()> {
tracing::trace!("publish events: {}", event);
self.nats
.publish("flux_releaser.domain_events", event)
.await?;
Ok(())
}
}

View File

@@ -9,6 +9,6 @@ pub trait DomainEventsExt {
impl DomainEventsExt for SharedApp {
fn domain_events(&self) -> DomainEvents {
DomainEvents::new()
DomainEvents::new(self.nats.clone())
}
}