Compare commits
22 Commits
2e8d14f5a6
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 70ba5b10c7 | |||
| 3581bf1265 | |||
| c6f00031be | |||
| 489b30e028 | |||
| 430240d751 | |||
| 707d1e2263 | |||
| d0bd8b938f | |||
| 6f3a5129bd | |||
| 68c9820aae | |||
| e44cb6dd23 | |||
| 6aa881048c | |||
| 5356303e7b | |||
| 1bd6fc06c5 | |||
| f342e3c0df | |||
| 25182f14c1 | |||
| 9a72f2f6c2 | |||
| 2352c00736 | |||
|
089c1d502f
|
|||
|
8b24dc23e0
|
|||
|
7510c9a333
|
|||
|
2bcc74ed8c
|
|||
|
c8f4bae1f2
|
8
.env
8
.env
@@ -1 +1,9 @@
|
|||||||
DATABASE_URL="postgres://root@localhost:26257/defaultdb?sslmode=disable"
|
DATABASE_URL="postgres://root@localhost:26257/defaultdb?sslmode=disable"
|
||||||
|
#STORAGE_BACKEND=local
|
||||||
|
#LOCAL_STORAGE_LOCATION=/tmp/nodata/local
|
||||||
|
|
||||||
|
STORAGE_BACKEND=s3
|
||||||
|
AWS_ACCESS_KEY_ID=OgAfuzefQRBHq4up2eYr
|
||||||
|
AWS_SECRET_ACCESS_KEY=nW85rHFOlZeMg7v6kkCikpYbyE3Pw28RS2O5FNZu
|
||||||
|
AWS_ENDPOINT_URL="https://api.minio.i.kjuulh.io"
|
||||||
|
AWS_BUCKET="nodata-dev"
|
||||||
|
|||||||
1433
Cargo.lock
generated
1433
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -2,8 +2,11 @@
|
|||||||
members = ["crates/*"]
|
members = ["crates/*"]
|
||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|
||||||
|
[workspace.package]
|
||||||
|
version = "0.1.0"
|
||||||
|
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
nodata-storage = { path = "crates/nodata-storage" }
|
nodata-storage = { path = "crates/nodata-storage", version = "0.1.0" }
|
||||||
|
|
||||||
anyhow = { version = "1" }
|
anyhow = { version = "1" }
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
@@ -12,9 +15,9 @@ tracing-subscriber = { version = "0.3.18" }
|
|||||||
clap = { version = "4", features = ["derive", "env"] }
|
clap = { version = "4", features = ["derive", "env"] }
|
||||||
dotenv = { version = "0.15" }
|
dotenv = { version = "0.15" }
|
||||||
axum = { version = "0.7" }
|
axum = { version = "0.7" }
|
||||||
drift = { git = "https://github.com/kjuulh/drift", branch = "main" }
|
nodrift = { version = "0.2.0" }
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
tonic = "0.12.1"
|
tonic = { version = "0.12.3", features = ["tls", "tls-roots"] }
|
||||||
bytes = "1.7.1"
|
bytes = "1.7.1"
|
||||||
prost = "0.13.1"
|
prost = "0.13.1"
|
||||||
prost-types = "0.13.1"
|
prost-types = "0.13.1"
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "nodata-storage"
|
name = "nodata-storage"
|
||||||
version = "0.1.0"
|
version.workspace = true
|
||||||
|
description = "nodata storage is the backend that serves the nodata message broker, it allows storing data in many different types of backends"
|
||||||
|
license = "MIT"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
@@ -11,6 +13,18 @@ tracing.workspace = true
|
|||||||
prost.workspace = true
|
prost.workspace = true
|
||||||
prost-types.workspace = true
|
prost-types.workspace = true
|
||||||
bytes.workspace = true
|
bytes.workspace = true
|
||||||
|
async-trait.workspace = true
|
||||||
|
|
||||||
hex = "0.4.3"
|
hex = "0.4.3"
|
||||||
sha2 = "0.10.8"
|
sha2 = "0.10.8"
|
||||||
|
|
||||||
|
aws-config = { version = "1.5.10", features = [
|
||||||
|
"behavior-version-latest",
|
||||||
|
], optional = true }
|
||||||
|
aws-sdk-s3 = { version = "1.61.0", features = [
|
||||||
|
"behavior-version-latest",
|
||||||
|
], optional = true }
|
||||||
|
|
||||||
|
[features]
|
||||||
|
default = ["s3"]
|
||||||
|
s3 = ["dep:aws-config", "dep:aws-sdk-s3"]
|
||||||
|
|||||||
@@ -1,84 +1,18 @@
|
|||||||
use std::{
|
use std::time::SystemTime;
|
||||||
env::temp_dir,
|
|
||||||
path::{Path, PathBuf},
|
|
||||||
time::{SystemTime, UNIX_EPOCH},
|
|
||||||
};
|
|
||||||
|
|
||||||
use anyhow::Context;
|
use async_trait::async_trait;
|
||||||
use tokio::io::AsyncWriteExt;
|
|
||||||
|
|
||||||
pub struct StorageBackend {
|
pub mod local;
|
||||||
location: PathBuf,
|
#[cfg(feature = "s3")]
|
||||||
}
|
pub mod s3;
|
||||||
|
|
||||||
impl StorageBackend {
|
#[async_trait]
|
||||||
pub fn new(location: &Path) -> Self {
|
pub trait StorageBackend {
|
||||||
Self {
|
async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result<String>;
|
||||||
location: location.into(),
|
async fn append_index(
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn temp() -> Self {
|
|
||||||
Self::new(&temp_dir().join("nodata"))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result<String> {
|
|
||||||
let segment_key = uuid::Uuid::now_v7();
|
|
||||||
let segment_path = PathBuf::from("logs")
|
|
||||||
.join(topic)
|
|
||||||
.join(segment_key.to_string());
|
|
||||||
tracing::trace!("writing segment file: {}", segment_path.display());
|
|
||||||
let file_location = self.location.join(&segment_path);
|
|
||||||
if let Some(parent) = file_location.parent() {
|
|
||||||
tokio::fs::create_dir_all(parent)
|
|
||||||
.await
|
|
||||||
.context("failed to create storage backend dir")?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut segment_file = tokio::fs::File::create(&file_location).await?;
|
|
||||||
segment_file.write_all(buffer).await?;
|
|
||||||
segment_file.flush().await?;
|
|
||||||
|
|
||||||
Ok(segment_key.to_string())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn append_index(
|
|
||||||
&self,
|
&self,
|
||||||
topic: &str,
|
topic: &str,
|
||||||
segment_file: &str,
|
segment_file: &str,
|
||||||
time: SystemTime,
|
time: SystemTime,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()>;
|
||||||
let index_path = PathBuf::from("indexes").join(topic);
|
|
||||||
tracing::trace!("writing index file: {}", index_path.display());
|
|
||||||
let file_location = self.location.join(&index_path);
|
|
||||||
if let Some(parent) = file_location.parent() {
|
|
||||||
tokio::fs::create_dir_all(parent)
|
|
||||||
.await
|
|
||||||
.context("failed to create storage backend dir, index")?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if !file_location.exists() {
|
|
||||||
tokio::fs::File::create(&file_location).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut index_file = tokio::fs::File::options()
|
|
||||||
.append(true)
|
|
||||||
.open(&file_location)
|
|
||||||
.await?;
|
|
||||||
index_file
|
|
||||||
.write_all(
|
|
||||||
format!(
|
|
||||||
"{},{}\n",
|
|
||||||
time.duration_since(UNIX_EPOCH)
|
|
||||||
.expect("to be able to get time")
|
|
||||||
.as_secs(),
|
|
||||||
segment_file
|
|
||||||
)
|
|
||||||
.as_bytes(),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
index_file.flush().await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
97
crates/nodata-storage/src/backend/local.rs
Normal file
97
crates/nodata-storage/src/backend/local.rs
Normal file
@@ -0,0 +1,97 @@
|
|||||||
|
use std::{
|
||||||
|
env::temp_dir,
|
||||||
|
path::{Path, PathBuf},
|
||||||
|
time::{SystemTime, UNIX_EPOCH},
|
||||||
|
};
|
||||||
|
|
||||||
|
use anyhow::Context;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
use super::StorageBackend;
|
||||||
|
|
||||||
|
pub struct LocalStorageBackend {
|
||||||
|
location: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LocalStorageBackend {
|
||||||
|
pub fn new(location: &Path) -> Self {
|
||||||
|
Self {
|
||||||
|
location: location.into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn new_from_env() -> anyhow::Result<Self> {
|
||||||
|
Ok(Self::new(&PathBuf::from(
|
||||||
|
std::env::var("LOCAL_STORAGE_LOCATION")
|
||||||
|
.context("LOCAL_STORAGE_LOCATION was not found in env")?,
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn temp() -> Self {
|
||||||
|
Self::new(&temp_dir().join("nodata"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl StorageBackend for LocalStorageBackend {
|
||||||
|
async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result<String> {
|
||||||
|
let segment_key = uuid::Uuid::now_v7();
|
||||||
|
let segment_path = PathBuf::from("logs")
|
||||||
|
.join(topic)
|
||||||
|
.join(segment_key.to_string());
|
||||||
|
tracing::trace!("writing segment file: {}", segment_path.display());
|
||||||
|
let file_location = self.location.join(&segment_path);
|
||||||
|
if let Some(parent) = file_location.parent() {
|
||||||
|
tokio::fs::create_dir_all(parent)
|
||||||
|
.await
|
||||||
|
.context("failed to create storage backend dir")?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut segment_file = tokio::fs::File::create(&file_location).await?;
|
||||||
|
segment_file.write_all(buffer).await?;
|
||||||
|
segment_file.flush().await?;
|
||||||
|
|
||||||
|
Ok(segment_key.to_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn append_index(
|
||||||
|
&self,
|
||||||
|
topic: &str,
|
||||||
|
segment_file: &str,
|
||||||
|
time: SystemTime,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let index_path = PathBuf::from("indexes").join(topic);
|
||||||
|
tracing::trace!("writing index file: {}", index_path.display());
|
||||||
|
let file_location = self.location.join(&index_path);
|
||||||
|
if let Some(parent) = file_location.parent() {
|
||||||
|
tokio::fs::create_dir_all(parent)
|
||||||
|
.await
|
||||||
|
.context("failed to create storage backend dir, index")?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !file_location.exists() {
|
||||||
|
tokio::fs::File::create(&file_location).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut index_file = tokio::fs::File::options()
|
||||||
|
.append(true)
|
||||||
|
.open(&file_location)
|
||||||
|
.await?;
|
||||||
|
index_file
|
||||||
|
.write_all(
|
||||||
|
format!(
|
||||||
|
"{},{}\n",
|
||||||
|
time.duration_since(UNIX_EPOCH)
|
||||||
|
.expect("to be able to get time")
|
||||||
|
.as_secs(),
|
||||||
|
segment_file
|
||||||
|
)
|
||||||
|
.as_bytes(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
index_file.flush().await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
171
crates/nodata-storage/src/backend/s3.rs
Normal file
171
crates/nodata-storage/src/backend/s3.rs
Normal file
@@ -0,0 +1,171 @@
|
|||||||
|
use std::{
|
||||||
|
collections::BTreeMap,
|
||||||
|
time::{SystemTime, UNIX_EPOCH},
|
||||||
|
};
|
||||||
|
|
||||||
|
use anyhow::Context;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use aws_config::{BehaviorVersion, Region};
|
||||||
|
use aws_sdk_s3::{
|
||||||
|
config::Credentials,
|
||||||
|
primitives::{ByteStream, SdkBody},
|
||||||
|
};
|
||||||
|
use tokio::{
|
||||||
|
io::{AsyncReadExt, BufReader},
|
||||||
|
sync::RwLock,
|
||||||
|
};
|
||||||
|
|
||||||
|
use super::StorageBackend;
|
||||||
|
|
||||||
|
pub struct S3StorageBackend {
|
||||||
|
client: aws_sdk_s3::Client,
|
||||||
|
bucket: String,
|
||||||
|
|
||||||
|
index_lock: RwLock<BTreeMap<String, RwLock<()>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl S3StorageBackend {
|
||||||
|
pub async fn upload_file(&self, path: &str, buffer: &[u8]) -> anyhow::Result<()> {
|
||||||
|
tracing::trace!("committing file: {}", &path);
|
||||||
|
|
||||||
|
self.client
|
||||||
|
.put_object()
|
||||||
|
.bucket(&self.bucket)
|
||||||
|
.key(path)
|
||||||
|
.body(ByteStream::new(SdkBody::from(buffer)))
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_file(&self, path: &str) -> anyhow::Result<Option<Vec<u8>>> {
|
||||||
|
tracing::trace!("getting file: {}", path);
|
||||||
|
|
||||||
|
let obj = match self
|
||||||
|
.client
|
||||||
|
.get_object()
|
||||||
|
.bucket(&self.bucket)
|
||||||
|
.key(path)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(ok) => ok,
|
||||||
|
Err(err) => match err.into_service_error() {
|
||||||
|
aws_sdk_s3::operation::get_object::GetObjectError::NoSuchKey(_) => return Ok(None),
|
||||||
|
e => anyhow::bail!(e.to_string()),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut buf_reader = BufReader::new(obj.body.into_async_read());
|
||||||
|
|
||||||
|
let mut output = Vec::new();
|
||||||
|
buf_reader.read_buf(&mut output).await?;
|
||||||
|
|
||||||
|
Ok(Some(output))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn append_file(&self, path: &str, buffer: &[u8]) -> anyhow::Result<()> {
|
||||||
|
tracing::trace!("appending file: {}", &path);
|
||||||
|
{
|
||||||
|
let mut index_lock = self.index_lock.write().await;
|
||||||
|
let item = index_lock.get(path);
|
||||||
|
if item.is_none() {
|
||||||
|
index_lock.insert(path.to_string(), RwLock::default());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let index_lock = self.index_lock.read().await;
|
||||||
|
let item = index_lock.get(path).expect("to find a path lock");
|
||||||
|
let lock = item.write().await;
|
||||||
|
|
||||||
|
let file = self.get_file(path).await?;
|
||||||
|
match file {
|
||||||
|
Some(mut file_contents) => {
|
||||||
|
file_contents.extend_from_slice(buffer);
|
||||||
|
self.upload_file(path, &file_contents).await?
|
||||||
|
}
|
||||||
|
None => self.upload_file(path, buffer).await?,
|
||||||
|
}
|
||||||
|
|
||||||
|
drop(lock);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl S3StorageBackend {
|
||||||
|
pub async fn new(
|
||||||
|
key_id: impl Into<String>,
|
||||||
|
key: impl Into<String>,
|
||||||
|
endpint_url: impl Into<String>,
|
||||||
|
bucket: impl Into<String>,
|
||||||
|
) -> anyhow::Result<Self> {
|
||||||
|
let shared_config = aws_config::defaults(BehaviorVersion::latest())
|
||||||
|
.region(Region::new("eu-west-1"))
|
||||||
|
.credentials_provider(Credentials::new(
|
||||||
|
key_id,
|
||||||
|
key,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
env!("CARGO_PKG_NAME"),
|
||||||
|
));
|
||||||
|
|
||||||
|
let config = aws_sdk_s3::config::Builder::from(&shared_config.load().await)
|
||||||
|
.endpoint_url(endpint_url)
|
||||||
|
.force_path_style(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
let client = aws_sdk_s3::Client::from_conf(config);
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
client,
|
||||||
|
bucket: bucket.into(),
|
||||||
|
index_lock: RwLock::default(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn new_from_env() -> anyhow::Result<Self> {
|
||||||
|
let key_id = std::env::var("AWS_ACCESS_KEY_ID").context("AWS_ACCESS_KEY_ID was not set")?;
|
||||||
|
let access_key =
|
||||||
|
std::env::var("AWS_SECRET_ACCESS_KEY").context("AWS_SECRET_ACCESS_KEY was not set")?;
|
||||||
|
let endpoint_url =
|
||||||
|
std::env::var("AWS_ENDPOINT_URL").context("AWS_ENDPOINT_URL was not set")?;
|
||||||
|
let bucket = std::env::var("AWS_BUCKET").context("AWS_BUCKET was not set")?;
|
||||||
|
|
||||||
|
Self::new(key_id, access_key, endpoint_url, bucket).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl StorageBackend for S3StorageBackend {
|
||||||
|
async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result<String> {
|
||||||
|
let segment_key = uuid::Uuid::now_v7();
|
||||||
|
|
||||||
|
self.upload_file(
|
||||||
|
&format!("nodata/logs/{}/{}.pb", topic, &segment_key.to_string()),
|
||||||
|
buffer,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(segment_key.to_string())
|
||||||
|
}
|
||||||
|
async fn append_index(
|
||||||
|
&self,
|
||||||
|
topic: &str,
|
||||||
|
segment_file: &str,
|
||||||
|
time: SystemTime,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
self.append_file(
|
||||||
|
&format!("nodata/indexes/{}", topic),
|
||||||
|
format!(
|
||||||
|
"{},{}\n",
|
||||||
|
time.duration_since(UNIX_EPOCH)
|
||||||
|
.expect("to be able to get time")
|
||||||
|
.as_secs(),
|
||||||
|
segment_file
|
||||||
|
)
|
||||||
|
.as_bytes(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -8,7 +8,7 @@
|
|||||||
use std::{collections::BTreeMap, sync::Arc, time::SystemTime};
|
use std::{collections::BTreeMap, sync::Arc, time::SystemTime};
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use backend::StorageBackend;
|
use backend::{local::LocalStorageBackend, StorageBackend};
|
||||||
use proto::ProtoStorage;
|
use proto::ProtoStorage;
|
||||||
use sha2::{Digest, Sha256};
|
use sha2::{Digest, Sha256};
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
@@ -21,19 +21,43 @@ pub mod backend;
|
|||||||
pub struct Storage {
|
pub struct Storage {
|
||||||
segment_size_bytes: usize,
|
segment_size_bytes: usize,
|
||||||
buffer: Arc<Mutex<BTreeMap<TopicHashKey, Vec<Vec<u8>>>>>,
|
buffer: Arc<Mutex<BTreeMap<TopicHashKey, Vec<Vec<u8>>>>>,
|
||||||
backend: Arc<StorageBackend>,
|
backend: Arc<dyn StorageBackend + Send + Sync + 'static>,
|
||||||
|
|
||||||
codec: ProtoStorage,
|
codec: ProtoStorage,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Storage {
|
impl Storage {
|
||||||
pub fn new(backend: StorageBackend) -> Self {
|
pub fn new(backend: LocalStorageBackend) -> Self {
|
||||||
Self {
|
Self {
|
||||||
segment_size_bytes: 4096 * 1000, // 4MB
|
segment_size_bytes: 4096 * 1000, // 4MB
|
||||||
buffer: Arc::default(),
|
buffer: Arc::default(),
|
||||||
|
codec: ProtoStorage::default(),
|
||||||
|
|
||||||
backend: Arc::new(backend),
|
backend: Arc::new(backend),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn new_from_env() -> anyhow::Result<Self> {
|
||||||
|
match std::env::var("STORAGE_BACKEND")
|
||||||
|
.context("failed to find STORAGE_BACKEND in env")?
|
||||||
|
.as_str()
|
||||||
|
{
|
||||||
|
"local" => Ok(Self {
|
||||||
|
segment_size_bytes: 4096 * 1000, // 4MB
|
||||||
|
buffer: Arc::default(),
|
||||||
codec: ProtoStorage::default(),
|
codec: ProtoStorage::default(),
|
||||||
|
|
||||||
|
backend: Arc::new(LocalStorageBackend::new_from_env()?),
|
||||||
|
}),
|
||||||
|
#[cfg(feature = "s3")]
|
||||||
|
"s3" => Ok(Self {
|
||||||
|
segment_size_bytes: 4 * 1024 * 1000, // 4MB
|
||||||
|
buffer: Arc::default(),
|
||||||
|
codec: ProtoStorage::default(),
|
||||||
|
|
||||||
|
backend: Arc::new(backend::s3::S3StorageBackend::new_from_env().await?),
|
||||||
|
}),
|
||||||
|
backend => anyhow::bail!("backend is not supported: {}", backend),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,6 +2,8 @@
|
|||||||
name = "nodata"
|
name = "nodata"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
description = "nodata is a kafka like message broker that is simple and easy to use, while relying on either local or s3 like data storage for consistency"
|
||||||
|
license = "MIT"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
nodata-storage.workspace = true
|
nodata-storage.workspace = true
|
||||||
@@ -13,17 +15,10 @@ tracing-subscriber.workspace = true
|
|||||||
clap.workspace = true
|
clap.workspace = true
|
||||||
dotenv.workspace = true
|
dotenv.workspace = true
|
||||||
axum.workspace = true
|
axum.workspace = true
|
||||||
drift.workspace = true
|
nodrift.workspace = true
|
||||||
uuid.workspace = true
|
uuid.workspace = true
|
||||||
|
|
||||||
serde = { version = "1.0.197", features = ["derive"] }
|
serde = { version = "1.0.197", features = ["derive"] }
|
||||||
sqlx = { version = "0.8.0", features = [
|
|
||||||
"runtime-tokio",
|
|
||||||
"tls-rustls",
|
|
||||||
"postgres",
|
|
||||||
"uuid",
|
|
||||||
"time",
|
|
||||||
] }
|
|
||||||
tower-http = { version = "0.6.0", features = ["cors", "trace"] }
|
tower-http = { version = "0.6.0", features = ["cors", "trace"] }
|
||||||
tokio-util = "0.7.11"
|
tokio-util = "0.7.11"
|
||||||
tonic.workspace = true
|
tonic.workspace = true
|
||||||
@@ -34,7 +29,7 @@ chrono = { version = "0.4.38", features = ["serde"] }
|
|||||||
tokio-stream = "0.1.15"
|
tokio-stream = "0.1.15"
|
||||||
dagger-sdk = "0.13.0"
|
dagger-sdk = "0.13.0"
|
||||||
rand = "0.8.5"
|
rand = "0.8.5"
|
||||||
notmad = "0.4.0"
|
notmad = "0.5.0"
|
||||||
prometheus = "0.13.4"
|
prometheus = "0.13.4"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
use std::{collections::BTreeMap, sync::Arc, time::Duration};
|
use std::{collections::BTreeMap, sync::Arc, time::Duration};
|
||||||
|
|
||||||
use axum::async_trait;
|
use axum::async_trait;
|
||||||
use drift::Drifter;
|
use nodrift::Drifter;
|
||||||
use notmad::Component;
|
use notmad::Component;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
@@ -33,7 +33,7 @@ impl Component for Broker {
|
|||||||
&self,
|
&self,
|
||||||
cancellation_token: tokio_util::sync::CancellationToken,
|
cancellation_token: tokio_util::sync::CancellationToken,
|
||||||
) -> Result<(), notmad::MadError> {
|
) -> Result<(), notmad::MadError> {
|
||||||
let token = drift::schedule_drifter(Duration::from_secs(1), self.clone());
|
let token = nodrift::schedule_drifter(Duration::from_secs(1), self.clone());
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = token.cancelled() => {},
|
_ = token.cancelled() => {},
|
||||||
@@ -95,7 +95,7 @@ impl BrokerHandler {
|
|||||||
_parent_token: CancellationToken,
|
_parent_token: CancellationToken,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let inner_state = state.clone();
|
let inner_state = state.clone();
|
||||||
let token = drift::schedule(Duration::from_secs(1), move || {
|
let token = nodrift::schedule(Duration::from_secs(1), move || {
|
||||||
let consumer_group = consumer_group.clone();
|
let consumer_group = consumer_group.clone();
|
||||||
let state = inner_state.clone();
|
let state = inner_state.clone();
|
||||||
|
|
||||||
|
|||||||
@@ -61,7 +61,12 @@ impl no_data_service_server::NoDataService for GrpcServer {
|
|||||||
self.counter.inc();
|
self.counter.inc();
|
||||||
|
|
||||||
self.state.ingest().publish(req).await.map_err(|e| {
|
self.state.ingest().publish(req).await.map_err(|e| {
|
||||||
tracing::warn!(error = e.to_string(), "failed to handle ingest of data");
|
let caused_by = e
|
||||||
|
.chain()
|
||||||
|
.map(|e| e.to_string())
|
||||||
|
.collect::<Vec<String>>()
|
||||||
|
.join(", ");
|
||||||
|
tracing::warn!("failed to handle ingest of data: {}: {}", e, caused_by);
|
||||||
tonic::Status::internal(e.to_string())
|
tonic::Status::internal(e.to_string())
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ mod services;
|
|||||||
|
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
use anyhow::Context;
|
||||||
use broker::Broker;
|
use broker::Broker;
|
||||||
use clap::{Parser, Subcommand};
|
use clap::{Parser, Subcommand};
|
||||||
use grpc::{GetTopicsRequest, GrpcServer, PublishEventRequest, SubscribeRequest};
|
use grpc::{GetTopicsRequest, GrpcServer, PublishEventRequest, SubscribeRequest};
|
||||||
@@ -17,6 +18,7 @@ use grpc_component::GrpcComponentClient;
|
|||||||
use http::HttpServer;
|
use http::HttpServer;
|
||||||
use notmad::Mad;
|
use notmad::Mad;
|
||||||
use state::SharedState;
|
use state::SharedState;
|
||||||
|
use tonic::transport::{Channel, ClientTlsConfig};
|
||||||
|
|
||||||
#[derive(Parser)]
|
#[derive(Parser)]
|
||||||
#[command(author, version, about, long_about = None, subcommand_required = true)]
|
#[command(author, version, about, long_about = None, subcommand_required = true)]
|
||||||
@@ -35,7 +37,7 @@ enum Commands {
|
|||||||
)]
|
)]
|
||||||
host: SocketAddr,
|
host: SocketAddr,
|
||||||
#[arg(
|
#[arg(
|
||||||
env = "GRPC_SERVICE_HOST",
|
env = "SERVICE_GRPC_HOST",
|
||||||
long = "service-grpc-host",
|
long = "service-grpc-host",
|
||||||
default_value = "127.0.0.1:7900"
|
default_value = "127.0.0.1:7900"
|
||||||
)]
|
)]
|
||||||
@@ -46,7 +48,7 @@ enum Commands {
|
|||||||
// #[arg(env = "SERVICE_HOST", long, default_value = "http://127.0.0.1:3000")]
|
// #[arg(env = "SERVICE_HOST", long, default_value = "http://127.0.0.1:3000")]
|
||||||
//host: String,
|
//host: String,
|
||||||
#[arg(
|
#[arg(
|
||||||
env = "GRPC_SERVICE_HOST",
|
env = "SERVICE_GRPC_HOST",
|
||||||
long = "service-grpc-host",
|
long = "service-grpc-host",
|
||||||
default_value = "http://127.0.0.1:7900"
|
default_value = "http://127.0.0.1:7900"
|
||||||
)]
|
)]
|
||||||
@@ -192,8 +194,22 @@ async fn create_client(
|
|||||||
) -> anyhow::Result<
|
) -> anyhow::Result<
|
||||||
crate::grpc::no_data_service_client::NoDataServiceClient<tonic::transport::Channel>,
|
crate::grpc::no_data_service_client::NoDataServiceClient<tonic::transport::Channel>,
|
||||||
> {
|
> {
|
||||||
let client =
|
let channel = if grpc_host.starts_with("https") {
|
||||||
crate::grpc::no_data_service_client::NoDataServiceClient::connect(grpc_host).await?;
|
Channel::from_shared(grpc_host.to_owned())
|
||||||
|
.context(format!("failed to connect to: {}", &grpc_host))?
|
||||||
|
.tls_config(ClientTlsConfig::new().with_native_roots())?
|
||||||
|
.connect()
|
||||||
|
.await
|
||||||
|
.context(format!("failed to connect to: {}", &grpc_host))?
|
||||||
|
} else {
|
||||||
|
Channel::from_shared(grpc_host.to_owned())
|
||||||
|
.context(format!("failed to connect to: {}", &grpc_host))?
|
||||||
|
.connect()
|
||||||
|
.await
|
||||||
|
.context(format!("failed to connect to: {}", &grpc_host))?
|
||||||
|
};
|
||||||
|
|
||||||
|
let client = crate::grpc::no_data_service_client::NoDataServiceClient::new(channel);
|
||||||
|
|
||||||
Ok(client)
|
Ok(client)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -288,7 +288,7 @@ mod test {
|
|||||||
let topic = "some-topic".to_string();
|
let topic = "some-topic".to_string();
|
||||||
let offset = 9usize;
|
let offset = 9usize;
|
||||||
|
|
||||||
let staging = Staging::new();
|
let staging = Staging::new().await?;
|
||||||
// Publish 10 messages
|
// Publish 10 messages
|
||||||
for _ in 0..10 {
|
for _ in 0..10 {
|
||||||
let offset = staging
|
let offset = staging
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
use std::{collections::BTreeMap, sync::Arc};
|
use std::{collections::BTreeMap, sync::Arc};
|
||||||
|
|
||||||
|
use nodata_storage::backend::local::LocalStorageBackend;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
use crate::state::SharedState;
|
use crate::state::SharedState;
|
||||||
@@ -23,11 +24,11 @@ pub struct Staging {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Staging {
|
impl Staging {
|
||||||
pub fn new() -> Self {
|
pub async fn new() -> anyhow::Result<Self> {
|
||||||
Self {
|
Ok(Self {
|
||||||
store: Arc::default(),
|
store: Arc::default(),
|
||||||
storage: nodata_storage::Storage::new(nodata_storage::backend::StorageBackend::temp()),
|
storage: nodata_storage::Storage::new_from_env().await?,
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn publish(
|
pub async fn publish(
|
||||||
|
|||||||
@@ -1,8 +1,6 @@
|
|||||||
use std::{ops::Deref, sync::Arc};
|
use std::{ops::Deref, sync::Arc};
|
||||||
|
|
||||||
use anyhow::Context;
|
|
||||||
use prometheus::Registry;
|
use prometheus::Registry;
|
||||||
use sqlx::{Pool, Postgres};
|
|
||||||
|
|
||||||
use crate::services::{consumers::Consumers, handler::Handler, staging::Staging};
|
use crate::services::{consumers::Consumers, handler::Handler, staging::Staging};
|
||||||
|
|
||||||
@@ -24,7 +22,6 @@ impl Deref for SharedState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub struct State {
|
pub struct State {
|
||||||
pub _db: Pool<Postgres>,
|
|
||||||
pub staging: Staging,
|
pub staging: Staging,
|
||||||
pub consumers: Consumers,
|
pub consumers: Consumers,
|
||||||
pub handler: Handler,
|
pub handler: Handler,
|
||||||
@@ -33,23 +30,10 @@ pub struct State {
|
|||||||
|
|
||||||
impl State {
|
impl State {
|
||||||
pub async fn new() -> anyhow::Result<Self> {
|
pub async fn new() -> anyhow::Result<Self> {
|
||||||
let db = sqlx::PgPool::connect(
|
let staging = Staging::new().await?;
|
||||||
&std::env::var("DATABASE_URL").context("DATABASE_URL is not set")?,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
sqlx::migrate!("migrations/crdb")
|
|
||||||
.set_locking(false)
|
|
||||||
.run(&db)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let _ = sqlx::query("SELECT 1;").fetch_one(&db).await?;
|
|
||||||
|
|
||||||
let staging = Staging::new();
|
|
||||||
let handler = Handler::new(staging.clone());
|
let handler = Handler::new(staging.clone());
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
_db: db,
|
|
||||||
consumers: Consumers::new(),
|
consumers: Consumers::new(),
|
||||||
staging,
|
staging,
|
||||||
handler,
|
handler,
|
||||||
|
|||||||
48
cuddle.yaml
48
cuddle.yaml
@@ -6,16 +6,46 @@ vars:
|
|||||||
service: "nodata"
|
service: "nodata"
|
||||||
registry: kasperhermansen
|
registry: kasperhermansen
|
||||||
|
|
||||||
clusters:
|
database:
|
||||||
clank-prod:
|
crdb: "false"
|
||||||
replicas: "3"
|
|
||||||
namespace: prod
|
|
||||||
|
|
||||||
|
ingress:
|
||||||
|
- internal: "true"
|
||||||
|
- internal_grpc: "true"
|
||||||
|
|
||||||
deployment:
|
please:
|
||||||
registry: git@git.front.kjuulh.io:kjuulh/clank-clusters
|
project:
|
||||||
|
owner: kjuulh
|
||||||
|
repository: nodata
|
||||||
|
branch: main
|
||||||
|
settings:
|
||||||
|
api_url: https://git.front.kjuulh.io
|
||||||
|
actions:
|
||||||
|
rust:
|
||||||
|
|
||||||
|
cuddle/clusters:
|
||||||
|
dev:
|
||||||
|
replicas: 1
|
||||||
env:
|
env:
|
||||||
prod:
|
service.host: "0.0.0.0:3001"
|
||||||
clusters:
|
service.grpc.host: "0.0.0.0:4001"
|
||||||
- clank-prod
|
storage.backend: "s3"
|
||||||
|
aws.endpoint.url: "https://api.minio.i.kjuulh.io"
|
||||||
|
aws.bucket: "nodata-dev"
|
||||||
|
aws.access.key.id:
|
||||||
|
vault: true
|
||||||
|
aws.secret.access.key:
|
||||||
|
vault: true
|
||||||
|
|
||||||
|
prod:
|
||||||
|
replicas: 1
|
||||||
|
env:
|
||||||
|
service.host: "0.0.0.0:3001"
|
||||||
|
service.grpc.host: "0.0.0.0:4001"
|
||||||
|
storage.backend: "s3"
|
||||||
|
aws.endpoint.url: "https://api.minio.i.kjuulh.io"
|
||||||
|
aws.bucket: "nodata-prod"
|
||||||
|
aws.access.key.id:
|
||||||
|
vault: true
|
||||||
|
aws.secret.access.key:
|
||||||
|
vault: true
|
||||||
|
|||||||
Reference in New Issue
Block a user