20 Commits

Author SHA1 Message Date
cuddle-please
67d57f2b54 chore(release): 0.1.0
Some checks failed
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is failing
2025-03-06 02:04:13 +00:00
489b30e028 fix(deps): update aws-sdk-rust monorepo
Some checks failed
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is failing
2025-03-06 02:02:25 +00:00
430240d751 fix(deps): update rust crate aws-sdk-s3 to v1.76.0
Some checks failed
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is failing
2025-02-15 01:49:37 +00:00
707d1e2263 chore(deps): update tokio-prost monorepo to v0.13.5
Some checks failed
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is failing
2025-02-13 01:55:42 +00:00
d0bd8b938f fix(deps): update aws-sdk-rust monorepo
Some checks failed
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is failing
2025-02-07 01:54:57 +00:00
6f3a5129bd fix(deps): update aws-sdk-rust monorepo
Some checks failed
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is failing
2025-01-30 02:06:16 +00:00
68c9820aae fix(deps): update rust crate aws-sdk-s3 to v1.70.0
Some checks failed
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is failing
2025-01-22 01:54:31 +00:00
e44cb6dd23 fix(deps): update aws-sdk-rust monorepo
Some checks failed
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is failing
2025-01-16 01:55:25 +00:00
6aa881048c fix(deps): update aws-sdk-rust monorepo
Some checks failed
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is failing
2024-12-29 01:50:13 +00:00
5356303e7b fix(deps): update rust crate serde to v1.0.216
Some checks failed
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is failing
2024-12-11 05:44:02 +00:00
1bd6fc06c5 chore(deps): update tokio-prost monorepo to v0.13.4
Some checks failed
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is failing
2024-12-07 01:51:42 +00:00
f342e3c0df fix(deps): update rust crate aws-sdk-s3 to v1.65.0
Some checks failed
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is failing
2024-12-05 01:52:45 +00:00
25182f14c1 chore(deps): update rust crate tracing-subscriber to v0.3.19
Some checks failed
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is failing
2024-11-30 06:30:57 +00:00
9a72f2f6c2 chore(deps): update rust crate tracing to v0.1.41
Some checks failed
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is failing
2024-11-28 01:56:50 +00:00
2352c00736 fix(deps): update rust crate aws-sdk-s3 to v1.63.0
Some checks failed
continuous-integration/drone/push Build is running
continuous-integration/drone/pr Build is failing
2024-11-26 02:28:02 +00:00
089c1d502f feat: with updated notmad
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-19 17:30:18 +01:00
8b24dc23e0 feat: added please and release
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-17 20:57:48 +01:00
7510c9a333 feat: change order of grpc host
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-17 16:26:21 +01:00
2bcc74ed8c feat: add replicas 1
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-17 16:07:59 +01:00
c8f4bae1f2 feat: add s3 and deployment
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-17 16:07:58 +01:00
17 changed files with 1243 additions and 765 deletions

10
.env
View File

@@ -1 +1,9 @@
DATABASE_URL="postgres://root@localhost:26257/defaultdb?sslmode=disable"
DATABASE_URL="postgres://root@localhost:26257/defaultdb?sslmode=disable"
#STORAGE_BACKEND=local
#LOCAL_STORAGE_LOCATION=/tmp/nodata/local
STORAGE_BACKEND=s3
AWS_ACCESS_KEY_ID=OgAfuzefQRBHq4up2eYr
AWS_SECRET_ACCESS_KEY=nW85rHFOlZeMg7v6kkCikpYbyE3Pw28RS2O5FNZu
AWS_ENDPOINT_URL="https://api.minio.i.kjuulh.io"
AWS_BUCKET="nodata-dev"

67
CHANGELOG.md Normal file
View File

@@ -0,0 +1,67 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [0.1.0] - 2025-03-06
### Added
- with updated notmad
- added please and release
- change order of grpc host
- add replicas 1
- add s3 and deployment
- add prometheus and protobuf messages
- increase size of index
- add indexes
- set notification to 4mb
- add storage backend
- remove more partitions
- cleanup
- remove keys and ids
- working dagger engine
- should listen on public endpoint
- with sdk
- with dagger engine
- do cleanup
- add broker setup
this is mainly to decouple the actual sending of events, from the ingest.
we now ingest the data, and update consumer groups with the new offset. Consumer groups now in the background continously send out data from the update. they tick 1 second between checks, but if something takes long than a second, the next run just continues from where we left off
- update
- add basic streaming data
- add ingest layer
- extract handler
- add offset to consumers
- add complexity
- added v1 files
- add operations endpoint to get topics
- add staging
- add data ingest
- add docs
- add base
### Fixed
- *(deps)* update aws-sdk-rust monorepo
- *(deps)* update rust crate aws-sdk-s3 to v1.76.0
- *(deps)* update aws-sdk-rust monorepo
- *(deps)* update aws-sdk-rust monorepo
- *(deps)* update rust crate aws-sdk-s3 to v1.70.0
- *(deps)* update aws-sdk-rust monorepo
- *(deps)* update aws-sdk-rust monorepo
- *(deps)* update rust crate serde to v1.0.216
- *(deps)* update rust crate aws-sdk-s3 to v1.65.0
- *(deps)* update rust crate aws-sdk-s3 to v1.63.0
- *(deps)* update rust crate serde to v1.0.215
- *(deps)* update all dependencies
- offset be inclusive end
### Other
- *(deps)* update tokio-prost monorepo to v0.13.5
- *(deps)* update tokio-prost monorepo to v0.13.4
- *(deps)* update rust crate tracing-subscriber to v0.3.19
- *(deps)* update rust crate tracing to v0.1.41
- into files

1391
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -2,8 +2,11 @@
members = ["crates/*"]
resolver = "2"
[workspace.package]
version = "0.1.0"
[workspace.dependencies]
nodata-storage = { path = "crates/nodata-storage" }
nodata-storage = { path = "crates/nodata-storage", version = "0.1.0" }
anyhow = { version = "1" }
tokio = { version = "1", features = ["full"] }
@@ -12,9 +15,9 @@ tracing-subscriber = { version = "0.3.18" }
clap = { version = "4", features = ["derive", "env"] }
dotenv = { version = "0.15" }
axum = { version = "0.7" }
drift = { git = "https://github.com/kjuulh/drift", branch = "main" }
nodrift = { version = "0.2.0" }
async-trait = "0.1"
tonic = "0.12.1"
tonic = { version = "0.12.3", features = ["tls", "tls-roots"] }
bytes = "1.7.1"
prost = "0.13.1"
prost-types = "0.13.1"

View File

@@ -1,6 +1,8 @@
[package]
name = "nodata-storage"
version = "0.1.0"
version.workspace = true
description = "nodata storage is the backend that serves the nodata message broker, it allows storing data in many different types of backends"
license = "MIT"
edition = "2021"
[dependencies]
@@ -11,6 +13,18 @@ tracing.workspace = true
prost.workspace = true
prost-types.workspace = true
bytes.workspace = true
async-trait.workspace = true
hex = "0.4.3"
sha2 = "0.10.8"
aws-config = { version = "1.5.10", features = [
"behavior-version-latest",
], optional = true }
aws-sdk-s3 = { version = "1.61.0", features = [
"behavior-version-latest",
], optional = true }
[features]
default = ["s3"]
s3 = ["dep:aws-config", "dep:aws-sdk-s3"]

View File

@@ -1,84 +1,18 @@
use std::{
env::temp_dir,
path::{Path, PathBuf},
time::{SystemTime, UNIX_EPOCH},
};
use std::time::SystemTime;
use anyhow::Context;
use tokio::io::AsyncWriteExt;
use async_trait::async_trait;
pub struct StorageBackend {
location: PathBuf,
}
pub mod local;
#[cfg(feature = "s3")]
pub mod s3;
impl StorageBackend {
pub fn new(location: &Path) -> Self {
Self {
location: location.into(),
}
}
pub fn temp() -> Self {
Self::new(&temp_dir().join("nodata"))
}
pub async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result<String> {
let segment_key = uuid::Uuid::now_v7();
let segment_path = PathBuf::from("logs")
.join(topic)
.join(segment_key.to_string());
tracing::trace!("writing segment file: {}", segment_path.display());
let file_location = self.location.join(&segment_path);
if let Some(parent) = file_location.parent() {
tokio::fs::create_dir_all(parent)
.await
.context("failed to create storage backend dir")?;
}
let mut segment_file = tokio::fs::File::create(&file_location).await?;
segment_file.write_all(buffer).await?;
segment_file.flush().await?;
Ok(segment_key.to_string())
}
pub async fn append_index(
#[async_trait]
pub trait StorageBackend {
async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result<String>;
async fn append_index(
&self,
topic: &str,
segment_file: &str,
time: SystemTime,
) -> anyhow::Result<()> {
let index_path = PathBuf::from("indexes").join(topic);
tracing::trace!("writing index file: {}", index_path.display());
let file_location = self.location.join(&index_path);
if let Some(parent) = file_location.parent() {
tokio::fs::create_dir_all(parent)
.await
.context("failed to create storage backend dir, index")?;
}
if !file_location.exists() {
tokio::fs::File::create(&file_location).await?;
}
let mut index_file = tokio::fs::File::options()
.append(true)
.open(&file_location)
.await?;
index_file
.write_all(
format!(
"{},{}\n",
time.duration_since(UNIX_EPOCH)
.expect("to be able to get time")
.as_secs(),
segment_file
)
.as_bytes(),
)
.await?;
index_file.flush().await?;
Ok(())
}
) -> anyhow::Result<()>;
}

View File

@@ -0,0 +1,97 @@
use std::{
env::temp_dir,
path::{Path, PathBuf},
time::{SystemTime, UNIX_EPOCH},
};
use anyhow::Context;
use async_trait::async_trait;
use tokio::io::AsyncWriteExt;
use super::StorageBackend;
pub struct LocalStorageBackend {
location: PathBuf,
}
impl LocalStorageBackend {
pub fn new(location: &Path) -> Self {
Self {
location: location.into(),
}
}
pub fn new_from_env() -> anyhow::Result<Self> {
Ok(Self::new(&PathBuf::from(
std::env::var("LOCAL_STORAGE_LOCATION")
.context("LOCAL_STORAGE_LOCATION was not found in env")?,
)))
}
pub fn temp() -> Self {
Self::new(&temp_dir().join("nodata"))
}
}
#[async_trait]
impl StorageBackend for LocalStorageBackend {
async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result<String> {
let segment_key = uuid::Uuid::now_v7();
let segment_path = PathBuf::from("logs")
.join(topic)
.join(segment_key.to_string());
tracing::trace!("writing segment file: {}", segment_path.display());
let file_location = self.location.join(&segment_path);
if let Some(parent) = file_location.parent() {
tokio::fs::create_dir_all(parent)
.await
.context("failed to create storage backend dir")?;
}
let mut segment_file = tokio::fs::File::create(&file_location).await?;
segment_file.write_all(buffer).await?;
segment_file.flush().await?;
Ok(segment_key.to_string())
}
async fn append_index(
&self,
topic: &str,
segment_file: &str,
time: SystemTime,
) -> anyhow::Result<()> {
let index_path = PathBuf::from("indexes").join(topic);
tracing::trace!("writing index file: {}", index_path.display());
let file_location = self.location.join(&index_path);
if let Some(parent) = file_location.parent() {
tokio::fs::create_dir_all(parent)
.await
.context("failed to create storage backend dir, index")?;
}
if !file_location.exists() {
tokio::fs::File::create(&file_location).await?;
}
let mut index_file = tokio::fs::File::options()
.append(true)
.open(&file_location)
.await?;
index_file
.write_all(
format!(
"{},{}\n",
time.duration_since(UNIX_EPOCH)
.expect("to be able to get time")
.as_secs(),
segment_file
)
.as_bytes(),
)
.await?;
index_file.flush().await?;
Ok(())
}
}

View File

@@ -0,0 +1,171 @@
use std::{
collections::BTreeMap,
time::{SystemTime, UNIX_EPOCH},
};
use anyhow::Context;
use async_trait::async_trait;
use aws_config::{BehaviorVersion, Region};
use aws_sdk_s3::{
config::Credentials,
primitives::{ByteStream, SdkBody},
};
use tokio::{
io::{AsyncReadExt, BufReader},
sync::RwLock,
};
use super::StorageBackend;
pub struct S3StorageBackend {
client: aws_sdk_s3::Client,
bucket: String,
index_lock: RwLock<BTreeMap<String, RwLock<()>>>,
}
impl S3StorageBackend {
pub async fn upload_file(&self, path: &str, buffer: &[u8]) -> anyhow::Result<()> {
tracing::trace!("committing file: {}", &path);
self.client
.put_object()
.bucket(&self.bucket)
.key(path)
.body(ByteStream::new(SdkBody::from(buffer)))
.send()
.await?;
Ok(())
}
pub async fn get_file(&self, path: &str) -> anyhow::Result<Option<Vec<u8>>> {
tracing::trace!("getting file: {}", path);
let obj = match self
.client
.get_object()
.bucket(&self.bucket)
.key(path)
.send()
.await
{
Ok(ok) => ok,
Err(err) => match err.into_service_error() {
aws_sdk_s3::operation::get_object::GetObjectError::NoSuchKey(_) => return Ok(None),
e => anyhow::bail!(e.to_string()),
},
};
let mut buf_reader = BufReader::new(obj.body.into_async_read());
let mut output = Vec::new();
buf_reader.read_buf(&mut output).await?;
Ok(Some(output))
}
pub async fn append_file(&self, path: &str, buffer: &[u8]) -> anyhow::Result<()> {
tracing::trace!("appending file: {}", &path);
{
let mut index_lock = self.index_lock.write().await;
let item = index_lock.get(path);
if item.is_none() {
index_lock.insert(path.to_string(), RwLock::default());
}
}
let index_lock = self.index_lock.read().await;
let item = index_lock.get(path).expect("to find a path lock");
let lock = item.write().await;
let file = self.get_file(path).await?;
match file {
Some(mut file_contents) => {
file_contents.extend_from_slice(buffer);
self.upload_file(path, &file_contents).await?
}
None => self.upload_file(path, buffer).await?,
}
drop(lock);
Ok(())
}
}
impl S3StorageBackend {
pub async fn new(
key_id: impl Into<String>,
key: impl Into<String>,
endpint_url: impl Into<String>,
bucket: impl Into<String>,
) -> anyhow::Result<Self> {
let shared_config = aws_config::defaults(BehaviorVersion::latest())
.region(Region::new("eu-west-1"))
.credentials_provider(Credentials::new(
key_id,
key,
None,
None,
env!("CARGO_PKG_NAME"),
));
let config = aws_sdk_s3::config::Builder::from(&shared_config.load().await)
.endpoint_url(endpint_url)
.force_path_style(true)
.build();
let client = aws_sdk_s3::Client::from_conf(config);
Ok(Self {
client,
bucket: bucket.into(),
index_lock: RwLock::default(),
})
}
pub async fn new_from_env() -> anyhow::Result<Self> {
let key_id = std::env::var("AWS_ACCESS_KEY_ID").context("AWS_ACCESS_KEY_ID was not set")?;
let access_key =
std::env::var("AWS_SECRET_ACCESS_KEY").context("AWS_SECRET_ACCESS_KEY was not set")?;
let endpoint_url =
std::env::var("AWS_ENDPOINT_URL").context("AWS_ENDPOINT_URL was not set")?;
let bucket = std::env::var("AWS_BUCKET").context("AWS_BUCKET was not set")?;
Self::new(key_id, access_key, endpoint_url, bucket).await
}
}
#[async_trait]
impl StorageBackend for S3StorageBackend {
async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result<String> {
let segment_key = uuid::Uuid::now_v7();
self.upload_file(
&format!("nodata/logs/{}/{}.pb", topic, &segment_key.to_string()),
buffer,
)
.await?;
Ok(segment_key.to_string())
}
async fn append_index(
&self,
topic: &str,
segment_file: &str,
time: SystemTime,
) -> anyhow::Result<()> {
self.append_file(
&format!("nodata/indexes/{}", topic),
format!(
"{},{}\n",
time.duration_since(UNIX_EPOCH)
.expect("to be able to get time")
.as_secs(),
segment_file
)
.as_bytes(),
)
.await
}
}

View File

@@ -8,7 +8,7 @@
use std::{collections::BTreeMap, sync::Arc, time::SystemTime};
use anyhow::Context;
use backend::StorageBackend;
use backend::{local::LocalStorageBackend, StorageBackend};
use proto::ProtoStorage;
use sha2::{Digest, Sha256};
use tokio::sync::Mutex;
@@ -21,19 +21,43 @@ pub mod backend;
pub struct Storage {
segment_size_bytes: usize,
buffer: Arc<Mutex<BTreeMap<TopicHashKey, Vec<Vec<u8>>>>>,
backend: Arc<StorageBackend>,
backend: Arc<dyn StorageBackend + Send + Sync + 'static>,
codec: ProtoStorage,
}
impl Storage {
pub fn new(backend: StorageBackend) -> Self {
pub fn new(backend: LocalStorageBackend) -> Self {
Self {
segment_size_bytes: 4096 * 1000, // 4MB
buffer: Arc::default(),
codec: ProtoStorage::default(),
backend: Arc::new(backend),
}
}
pub async fn new_from_env() -> anyhow::Result<Self> {
match std::env::var("STORAGE_BACKEND")
.context("failed to find STORAGE_BACKEND in env")?
.as_str()
{
"local" => Ok(Self {
segment_size_bytes: 4096 * 1000, // 4MB
buffer: Arc::default(),
codec: ProtoStorage::default(),
backend: Arc::new(LocalStorageBackend::new_from_env()?),
}),
#[cfg(feature = "s3")]
"s3" => Ok(Self {
segment_size_bytes: 4 * 1024 * 1000, // 4MB
buffer: Arc::default(),
codec: ProtoStorage::default(),
backend: Arc::new(backend::s3::S3StorageBackend::new_from_env().await?),
}),
backend => anyhow::bail!("backend is not supported: {}", backend),
}
}

View File

@@ -2,6 +2,8 @@
name = "nodata"
version = "0.1.0"
edition = "2021"
description = "nodata is a kafka like message broker that is simple and easy to use, while relying on either local or s3 like data storage for consistency"
license = "MIT"
[dependencies]
nodata-storage.workspace = true
@@ -13,17 +15,10 @@ tracing-subscriber.workspace = true
clap.workspace = true
dotenv.workspace = true
axum.workspace = true
drift.workspace = true
nodrift.workspace = true
uuid.workspace = true
serde = { version = "1.0.197", features = ["derive"] }
sqlx = { version = "0.8.0", features = [
"runtime-tokio",
"tls-rustls",
"postgres",
"uuid",
"time",
] }
tower-http = { version = "0.6.0", features = ["cors", "trace"] }
tokio-util = "0.7.11"
tonic.workspace = true
@@ -34,7 +29,7 @@ chrono = { version = "0.4.38", features = ["serde"] }
tokio-stream = "0.1.15"
dagger-sdk = "0.13.0"
rand = "0.8.5"
notmad = "0.4.0"
notmad = "0.5.0"
prometheus = "0.13.4"
[dev-dependencies]

View File

@@ -1,7 +1,7 @@
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use axum::async_trait;
use drift::Drifter;
use nodrift::Drifter;
use notmad::Component;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
@@ -33,7 +33,7 @@ impl Component for Broker {
&self,
cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), notmad::MadError> {
let token = drift::schedule_drifter(Duration::from_secs(1), self.clone());
let token = nodrift::schedule_drifter(Duration::from_secs(1), self.clone());
tokio::select! {
_ = token.cancelled() => {},
@@ -95,7 +95,7 @@ impl BrokerHandler {
_parent_token: CancellationToken,
) -> Self {
let inner_state = state.clone();
let token = drift::schedule(Duration::from_secs(1), move || {
let token = nodrift::schedule(Duration::from_secs(1), move || {
let consumer_group = consumer_group.clone();
let state = inner_state.clone();

View File

@@ -61,7 +61,12 @@ impl no_data_service_server::NoDataService for GrpcServer {
self.counter.inc();
self.state.ingest().publish(req).await.map_err(|e| {
tracing::warn!(error = e.to_string(), "failed to handle ingest of data");
let caused_by = e
.chain()
.map(|e| e.to_string())
.collect::<Vec<String>>()
.join(", ");
tracing::warn!("failed to handle ingest of data: {}: {}", e, caused_by);
tonic::Status::internal(e.to_string())
})?;

View File

@@ -10,6 +10,7 @@ mod services;
use std::net::SocketAddr;
use anyhow::Context;
use broker::Broker;
use clap::{Parser, Subcommand};
use grpc::{GetTopicsRequest, GrpcServer, PublishEventRequest, SubscribeRequest};
@@ -17,6 +18,7 @@ use grpc_component::GrpcComponentClient;
use http::HttpServer;
use notmad::Mad;
use state::SharedState;
use tonic::transport::{Channel, ClientTlsConfig};
#[derive(Parser)]
#[command(author, version, about, long_about = None, subcommand_required = true)]
@@ -35,7 +37,7 @@ enum Commands {
)]
host: SocketAddr,
#[arg(
env = "GRPC_SERVICE_HOST",
env = "SERVICE_GRPC_HOST",
long = "service-grpc-host",
default_value = "127.0.0.1:7900"
)]
@@ -46,7 +48,7 @@ enum Commands {
// #[arg(env = "SERVICE_HOST", long, default_value = "http://127.0.0.1:3000")]
//host: String,
#[arg(
env = "GRPC_SERVICE_HOST",
env = "SERVICE_GRPC_HOST",
long = "service-grpc-host",
default_value = "http://127.0.0.1:7900"
)]
@@ -192,8 +194,22 @@ async fn create_client(
) -> anyhow::Result<
crate::grpc::no_data_service_client::NoDataServiceClient<tonic::transport::Channel>,
> {
let client =
crate::grpc::no_data_service_client::NoDataServiceClient::connect(grpc_host).await?;
let channel = if grpc_host.starts_with("https") {
Channel::from_shared(grpc_host.to_owned())
.context(format!("failed to connect to: {}", &grpc_host))?
.tls_config(ClientTlsConfig::new().with_native_roots())?
.connect()
.await
.context(format!("failed to connect to: {}", &grpc_host))?
} else {
Channel::from_shared(grpc_host.to_owned())
.context(format!("failed to connect to: {}", &grpc_host))?
.connect()
.await
.context(format!("failed to connect to: {}", &grpc_host))?
};
let client = crate::grpc::no_data_service_client::NoDataServiceClient::new(channel);
Ok(client)
}

View File

@@ -288,7 +288,7 @@ mod test {
let topic = "some-topic".to_string();
let offset = 9usize;
let staging = Staging::new();
let staging = Staging::new().await?;
// Publish 10 messages
for _ in 0..10 {
let offset = staging

View File

@@ -1,5 +1,6 @@
use std::{collections::BTreeMap, sync::Arc};
use nodata_storage::backend::local::LocalStorageBackend;
use tokio::sync::RwLock;
use crate::state::SharedState;
@@ -23,11 +24,11 @@ pub struct Staging {
}
impl Staging {
pub fn new() -> Self {
Self {
pub async fn new() -> anyhow::Result<Self> {
Ok(Self {
store: Arc::default(),
storage: nodata_storage::Storage::new(nodata_storage::backend::StorageBackend::temp()),
}
storage: nodata_storage::Storage::new_from_env().await?,
})
}
pub async fn publish(

View File

@@ -1,8 +1,6 @@
use std::{ops::Deref, sync::Arc};
use anyhow::Context;
use prometheus::Registry;
use sqlx::{Pool, Postgres};
use crate::services::{consumers::Consumers, handler::Handler, staging::Staging};
@@ -24,7 +22,6 @@ impl Deref for SharedState {
}
pub struct State {
pub _db: Pool<Postgres>,
pub staging: Staging,
pub consumers: Consumers,
pub handler: Handler,
@@ -33,23 +30,10 @@ pub struct State {
impl State {
pub async fn new() -> anyhow::Result<Self> {
let db = sqlx::PgPool::connect(
&std::env::var("DATABASE_URL").context("DATABASE_URL is not set")?,
)
.await?;
sqlx::migrate!("migrations/crdb")
.set_locking(false)
.run(&db)
.await?;
let _ = sqlx::query("SELECT 1;").fetch_one(&db).await?;
let staging = Staging::new();
let staging = Staging::new().await?;
let handler = Handler::new(staging.clone());
Ok(Self {
_db: db,
consumers: Consumers::new(),
staging,
handler,

View File

@@ -6,16 +6,46 @@ vars:
service: "nodata"
registry: kasperhermansen
clusters:
clank-prod:
replicas: "3"
namespace: prod
database:
crdb: "false"
ingress:
- internal: "true"
- internal_grpc: "true"
deployment:
registry: git@git.front.kjuulh.io:kjuulh/clank-clusters
please:
project:
owner: kjuulh
repository: nodata
branch: main
settings:
api_url: https://git.front.kjuulh.io
actions:
rust:
cuddle/clusters:
dev:
replicas: 1
env:
prod:
clusters:
- clank-prod
service.host: "0.0.0.0:3001"
service.grpc.host: "0.0.0.0:4001"
storage.backend: "s3"
aws.endpoint.url: "https://api.minio.i.kjuulh.io"
aws.bucket: "nodata-dev"
aws.access.key.id:
vault: true
aws.secret.access.key:
vault: true
prod:
replicas: 1
env:
service.host: "0.0.0.0:3001"
service.grpc.host: "0.0.0.0:4001"
storage.backend: "s3"
aws.endpoint.url: "https://api.minio.i.kjuulh.io"
aws.bucket: "nodata-prod"
aws.access.key.id:
vault: true
aws.secret.access.key:
vault: true