diff --git a/Cargo.lock b/Cargo.lock index e81d29a..7a3554d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,6 +23,17 @@ version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" +[[package]] +name = "async-trait" +version = "0.1.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -76,8 +87,11 @@ name = "crunch" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", + "thiserror", "tokio", "tracing", + "tracing-subscriber", ] [[package]] @@ -92,6 +106,12 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + [[package]] name = "libc" version = "0.2.148" @@ -140,6 +160,16 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num_cpus" version = "1.16.0" @@ -165,6 +195,12 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking_lot" version = "0.12.1" @@ -233,6 +269,15 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sharded-slab" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +dependencies = [ + "lazy_static", +] + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -269,6 +314,36 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "thiserror" +version = "1.0.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "thread_local" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "tokio" version = "1.32.0" @@ -330,6 +405,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" +dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", ] [[package]] @@ -338,12 +439,40 @@ version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index 944a5ee..4bf5fa1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,3 +8,6 @@ crunch = { path = "crates/crunch" } anyhow = { version = "1.0.71" } tokio = { version = "1", features = ["full"] } tracing = { version = "0.1", features = ["log"] } +tracing-subscriber = "0.3.17" +thiserror = {version = "1.0.48"} +async-trait = "0.1.73" diff --git a/crates/crunch/Cargo.toml b/crates/crunch/Cargo.toml index 53ff6ca..c2fd671 100644 --- a/crates/crunch/Cargo.toml +++ b/crates/crunch/Cargo.toml @@ -7,3 +7,8 @@ edition = "2021" anyhow.workspace = true tracing.workspace = true tokio.workspace = true +thiserror.workspace = true +async-trait.workspace = true + +[dev-dependencies] +tracing-subscriber.workspace = true \ No newline at end of file diff --git a/crates/crunch/examples/basic.rs b/crates/crunch/examples/basic.rs new file mode 100644 index 0000000..4395eb2 --- /dev/null +++ b/crates/crunch/examples/basic.rs @@ -0,0 +1,47 @@ +use crunch::{Deserializer, Event, EventInfo, Persistence, Publisher, Serializer}; + +struct SomeEvent { + name: String, +} + +impl Serializer for SomeEvent { + fn serialize(&self) -> Result, crunch::SerializeError> { + Ok(b"field=name".to_vec()) + } +} + +impl Deserializer for SomeEvent { + fn deserialize(raw: Vec) -> Result + where + Self: Sized, + { + Ok(Self { + name: "something".into(), + }) + } +} + +impl Event for SomeEvent { + fn event_info(&self) -> EventInfo { + EventInfo { + domain: "some-domain", + entity_type: "some-entity", + } + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + + let in_memory = Persistence::in_memory(); + let publisher = Publisher::new(in_memory); + + publisher + .publish(SomeEvent { + name: "something".into(), + }) + .await?; + + Ok(()) +} diff --git a/crates/crunch/src/lib.rs b/crates/crunch/src/lib.rs index 8b13789..f748220 100644 --- a/crates/crunch/src/lib.rs +++ b/crates/crunch/src/lib.rs @@ -1 +1,146 @@ +mod traits { + use std::fmt::Display; + use async_trait::async_trait; + + use crate::{DeserializeError, SerializeError}; + + #[async_trait] + pub trait Persistence { + async fn insert(&self, event_info: &EventInfo, content: Vec) -> anyhow::Result<()>; + } + + pub trait Serializer { + fn serialize(&self) -> Result, SerializeError>; + } + + pub trait Deserializer { + fn deserialize(raw: Vec) -> Result + where + Self: Sized; + } + + #[derive(Debug, Clone, Copy)] + pub struct EventInfo { + pub domain: &'static str, + pub entity_type: &'static str, + } + + impl Display for EventInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&format!( + "domain: {}, entity_type: {}", + self.domain, self.entity_type + )) + } + } + + pub trait Event: Serializer + Deserializer { + fn event_info(&self) -> EventInfo; + } +} + +mod impls { + use std::{ops::Deref, sync::Arc}; + + use async_trait::async_trait; + + use crate::{traits, EventInfo}; + + pub struct InMemoryPersistence {} + #[async_trait] + impl traits::Persistence for InMemoryPersistence { + async fn insert(&self, event_info: &EventInfo, content: Vec) -> anyhow::Result<()> { + tracing::info!( + event_info = event_info.to_string(), + content_len = content.len(), + "inserted event" + ); + + Ok(()) + } + } + + pub struct Persistence(Arc); + + impl Persistence { + pub fn in_memory() -> Self { + Self(Arc::new(InMemoryPersistence {})) + } + } + + impl Deref for Persistence { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.0 + } + } +} + +mod errors { + use thiserror::Error; + + #[derive(Error, Debug)] + pub enum SerializeError { + #[error("failed to serialize")] + FailedToSerialize(anyhow::Error), + } + + #[derive(Error, Debug)] + pub enum DeserializeError { + #[error("failed to serialize")] + FailedToDeserialize(anyhow::Error), + } + + #[derive(Error, Debug)] + pub enum PublishError { + #[error("failed to serialize")] + SerializeError(#[source] SerializeError), + + #[error("failed to commit to database")] + DbError(#[source] anyhow::Error), + + #[error("transaction failed")] + DbTxError(#[source] anyhow::Error), + + #[error("failed to connect to database")] + ConnectionError(#[source] anyhow::Error), + } +} + +pub use errors::*; +pub use impls::Persistence; +pub use traits::{Deserializer, Event, EventInfo, Serializer}; + +pub struct Publisher { + persistence: Persistence, +} + +#[allow(dead_code)] +impl Publisher { + pub fn new(persistence: Persistence) -> Self { + Self { persistence } + } + + pub async fn publish(&self, event: T) -> Result<(), PublishError> + where + T: Event, + { + let content = event.serialize().map_err(PublishError::SerializeError)?; + + self.persistence + .insert(&event.event_info(), content) + .await + .map_err(PublishError::DbError)?; + + Ok(()) + } + pub async fn publish_tx(&self, event: T) -> Result<(), PublishError> + where + T: Event, + { + // TODO: add transaction support later + self.publish(event).await + } +}