From 98550ace16d93ed786e085a134408cb251881ee8 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sat, 23 Sep 2023 18:21:39 +0200 Subject: [PATCH] feat: add subscriptions Signed-off-by: kjuulh --- Cargo.lock | 25 ++++++ Cargo.toml | 2 + crates/crunch-envelope/build.rs | 8 +- .../src/generated/crunch.envelope.rs | 18 +++++ crates/crunch-envelope/src/generated/mod.rs | 3 + crates/crunch-envelope/src/lib.rs | 1 + crates/crunch-envelope/src/proto_envelope.rs | 13 ++- crates/crunch-in-memory/Cargo.toml | 15 ++++ crates/crunch-in-memory/src/lib.rs | 81 +++++++++++++++++++ crates/crunch-traits/Cargo.toml | 13 +++ crates/crunch-traits/src/errors.rs | 43 ++++++++++ .../traits.rs => crunch-traits/src/lib.rs} | 12 ++- crates/crunch-traits/src/transport.rs | 12 +++ crates/crunch/Cargo.toml | 9 ++- crates/crunch/examples/basic.rs | 24 +++--- crates/crunch/src/errors.rs | 28 ------- crates/crunch/src/impls.rs | 56 ++++++++++--- crates/crunch/src/lib.rs | 80 ++++-------------- crates/crunch/src/outbox.rs | 52 ++++++++++++ crates/crunch/src/publisher.rs | 35 ++++++++ crates/crunch/src/transport.rs | 31 +++++++ 21 files changed, 431 insertions(+), 130 deletions(-) create mode 100644 crates/crunch-envelope/src/generated/crunch.envelope.rs create mode 100644 crates/crunch-envelope/src/generated/mod.rs create mode 100644 crates/crunch-in-memory/Cargo.toml create mode 100644 crates/crunch-in-memory/src/lib.rs create mode 100644 crates/crunch-traits/Cargo.toml create mode 100644 crates/crunch-traits/src/errors.rs rename crates/{crunch/src/traits.rs => crunch-traits/src/lib.rs} (68%) create mode 100644 crates/crunch-traits/src/transport.rs delete mode 100644 crates/crunch/src/errors.rs create mode 100644 crates/crunch/src/outbox.rs create mode 100644 crates/crunch/src/publisher.rs create mode 100644 crates/crunch/src/transport.rs diff --git a/Cargo.lock b/Cargo.lock index 3fc9127..7f8641d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -297,6 +297,8 @@ dependencies = [ "anyhow", "async-trait", "crunch-envelope", + "crunch-in-memory", + "crunch-traits", "futures", "thiserror", "tokio", @@ -323,6 +325,29 @@ dependencies = [ "thiserror", ] +[[package]] +name = "crunch-in-memory" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "crunch-traits", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "crunch-traits" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "thiserror", + "tokio", + "uuid", +] + [[package]] name = "either" version = "1.9.0" diff --git a/Cargo.toml b/Cargo.toml index 2f69601..579e5ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,9 @@ resolver = "2" [workspace.dependencies] crunch = { path = "crates/crunch" } +crunch-traits = { path = "crates/crunch-traits" } crunch-envelope = { path = "crates/crunch-envelope" } +crunch-in-memory = { path = "crates/crunch-in-memory" } anyhow = { version = "1.0.71" } tokio = { version = "1", features = ["full"] } diff --git a/crates/crunch-envelope/build.rs b/crates/crunch-envelope/build.rs index 985d540..aef506d 100644 --- a/crates/crunch-envelope/build.rs +++ b/crates/crunch-envelope/build.rs @@ -8,5 +8,11 @@ fn main() { .run() .unwrap(); - prost_build::compile_protos(&["src/envelope.proto"], &["src/"]).unwrap(); + std::fs::create_dir_all("src/generated").unwrap(); + let mut config = prost_build::Config::default(); + config.out_dir("src/generated/"); + + config + .compile_protos(&["src/envelope.proto"], &["src/"]) + .unwrap(); } diff --git a/crates/crunch-envelope/src/generated/crunch.envelope.rs b/crates/crunch-envelope/src/generated/crunch.envelope.rs new file mode 100644 index 0000000..ac4fe3b --- /dev/null +++ b/crates/crunch-envelope/src/generated/crunch.envelope.rs @@ -0,0 +1,18 @@ +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Envelope { + #[prost(message, optional, tag="1")] + pub metadata: ::std::option::Option, + #[prost(bytes, tag="2")] + pub content: std::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Metadata { + #[prost(string, tag="1")] + pub domain: std::string::String, + #[prost(string, tag="2")] + pub entity: std::string::String, + #[prost(uint64, tag="3")] + pub timestamp: u64, + #[prost(uint64, tag="4")] + pub sequence: u64, +} diff --git a/crates/crunch-envelope/src/generated/mod.rs b/crates/crunch-envelope/src/generated/mod.rs new file mode 100644 index 0000000..fa232de --- /dev/null +++ b/crates/crunch-envelope/src/generated/mod.rs @@ -0,0 +1,3 @@ +pub mod crunch { + include!("crunch.envelope.rs"); +} diff --git a/crates/crunch-envelope/src/lib.rs b/crates/crunch-envelope/src/lib.rs index 9d51c75..6e1fcbe 100644 --- a/crates/crunch-envelope/src/lib.rs +++ b/crates/crunch-envelope/src/lib.rs @@ -3,6 +3,7 @@ mod envelope_capnp; #[cfg(feature = "json")] mod json_envelope; +mod generated; #[cfg(feature = "proto")] mod proto_envelope; diff --git a/crates/crunch-envelope/src/proto_envelope.rs b/crates/crunch-envelope/src/proto_envelope.rs index c49a610..12ee478 100644 --- a/crates/crunch-envelope/src/proto_envelope.rs +++ b/crates/crunch-envelope/src/proto_envelope.rs @@ -1,14 +1,11 @@ -pub mod envelope { - include!(concat!(env!("OUT_DIR"), "/crunch.envelope.rs")); -} - use prost::Message; +use crate::generated::crunch::*; use crate::EnvelopeError; pub fn wrap<'a>(domain: &'a str, entity: &'a str, content: &'a [u8]) -> Vec { - let out = envelope::Envelope { - metadata: Some(envelope::Metadata { + let out = Envelope { + metadata: Some(Metadata { domain: domain.to_string(), entity: entity.to_string(), timestamp: 0, @@ -20,8 +17,8 @@ pub fn wrap<'a>(domain: &'a str, entity: &'a str, content: &'a [u8]) -> Vec out.encode_to_vec() } -pub fn unwrap<'a>(message: &'a [u8]) -> Result<(Vec, envelope::Metadata), EnvelopeError> { - let out = envelope::Envelope::decode(message).map_err(EnvelopeError::ProtoError)?; +pub fn unwrap<'a>(message: &'a [u8]) -> Result<(Vec, Metadata), EnvelopeError> { + let out = Envelope::decode(message).map_err(EnvelopeError::ProtoError)?; Ok(( out.content, diff --git a/crates/crunch-in-memory/Cargo.toml b/crates/crunch-in-memory/Cargo.toml new file mode 100644 index 0000000..26e664d --- /dev/null +++ b/crates/crunch-in-memory/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "crunch-in-memory" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +crunch-traits.workspace = true + +anyhow.workspace = true +tracing.workspace = true +tokio.workspace = true +thiserror.workspace = true +async-trait.workspace = true diff --git a/crates/crunch-in-memory/src/lib.rs b/crates/crunch-in-memory/src/lib.rs new file mode 100644 index 0000000..6ed85d8 --- /dev/null +++ b/crates/crunch-in-memory/src/lib.rs @@ -0,0 +1,81 @@ +use std::collections::BTreeMap; + +use async_trait::async_trait; +use crunch_traits::{errors::TransportError, EventInfo, Transport}; +use tokio::sync::broadcast::{Receiver, Sender}; + +#[derive(Clone)] +struct TransportEnvelope { + info: EventInfo, + content: Vec, +} + +pub struct InMemoryTransport { + events: tokio::sync::RwLock>>, +} + +impl InMemoryTransport { + pub fn new() -> Self { + Self { + events: tokio::sync::RwLock::default(), + } + } +} + +impl Default for InMemoryTransport { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl Transport for InMemoryTransport { + async fn publish( + &self, + event_info: &EventInfo, + content: Vec, + ) -> Result<(), TransportError> { + let transport_key = event_info.transport_name(); + + // Possibly create a register handle instead, as this requires a write and then read. It may not matter for in memory though + { + let mut events = self.events.write().await; + if let None = events.get(&transport_key) { + let (sender, mut receiver) = tokio::sync::broadcast::channel(100); + events.insert(transport_key.clone(), sender); + tokio::spawn(async move { + while let Ok(item) = receiver.recv().await { + tracing::info!("default receiver: {}", item.info.transport_name()); + } + }); + } + } + + let events = self.events.read().await; + let sender = events + .get(&transport_key) + .expect("transport to be available, as we just created it"); + sender + .send(TransportEnvelope { + info: event_info.clone(), + content, + }) + .map_err(|e| anyhow::anyhow!(e.to_string())) + .map_err(TransportError::Err)?; + + Ok(()) + } +} + +trait EventInfoExt { + fn transport_name(&self) -> String; +} + +impl EventInfoExt for EventInfo { + fn transport_name(&self) -> String { + format!( + "crunch.{}.{}.{}", + self.domain, self.entity_type, self.event_name + ) + } +} diff --git a/crates/crunch-traits/Cargo.toml b/crates/crunch-traits/Cargo.toml new file mode 100644 index 0000000..8ab0a74 --- /dev/null +++ b/crates/crunch-traits/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "crunch-traits" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow.workspace = true +tokio.workspace = true +thiserror.workspace = true +async-trait.workspace = true +uuid.workspace = true diff --git a/crates/crunch-traits/src/errors.rs b/crates/crunch-traits/src/errors.rs new file mode 100644 index 0000000..10f1c0d --- /dev/null +++ b/crates/crunch-traits/src/errors.rs @@ -0,0 +1,43 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum SerializeError { + #[error("failed to serialize {0}")] + FailedToSerialize(anyhow::Error), +} + +#[derive(Error, Debug)] +pub enum DeserializeError { + #[error("failed to serialize {0}")] + FailedToDeserialize(anyhow::Error), +} + +#[derive(Error, Debug)] +pub enum PublishError { + #[error("failed to serialize {0}")] + SerializeError(#[source] SerializeError), + + #[error("failed to commit to database {0}")] + DbError(#[source] anyhow::Error), + + #[error("transaction failed {0}")] + DbTxError(#[source] anyhow::Error), + + #[error("failed to connect to database {0}")] + ConnectionError(#[source] anyhow::Error), +} + +#[derive(Error, Debug)] +pub enum TransportError { + #[error("to publish to transport {0}")] + Err(anyhow::Error), +} + +#[derive(Error, Debug)] +pub enum PersistenceError { + #[error("failed to get item {0}")] + GetErr(anyhow::Error), + + #[error("failed to publish item {0}")] + UpdatePublished(anyhow::Error), +} diff --git a/crates/crunch/src/traits.rs b/crates/crunch-traits/src/lib.rs similarity index 68% rename from crates/crunch/src/traits.rs rename to crates/crunch-traits/src/lib.rs index 1247b13..d2d9759 100644 --- a/crates/crunch/src/traits.rs +++ b/crates/crunch-traits/src/lib.rs @@ -1,13 +1,14 @@ -use std::fmt::Display; +use std::{fmt::Display, sync::Arc}; use async_trait::async_trait; - -use crate::{DeserializeError, SerializeError}; +use errors::{DeserializeError, PersistenceError, SerializeError, TransportError}; #[async_trait] pub trait Persistence { async fn insert(&self, event_info: &EventInfo, content: Vec) -> anyhow::Result<()>; async fn next(&self) -> Option; + async fn get(&self, event_id: &str) -> Result)>, PersistenceError>; + async fn update_published(&self, event_id: &str) -> Result<(), PersistenceError>; } pub trait Serializer { @@ -24,6 +25,7 @@ pub trait Deserializer { pub struct EventInfo { pub domain: &'static str, pub entity_type: &'static str, + pub event_name: &'static str, } impl Display for EventInfo { @@ -38,3 +40,7 @@ impl Display for EventInfo { pub trait Event: Serializer + Deserializer { fn event_info(&self) -> EventInfo; } + +pub mod errors; +mod transport; +pub use transport::*; diff --git a/crates/crunch-traits/src/transport.rs b/crates/crunch-traits/src/transport.rs new file mode 100644 index 0000000..d0ec1a6 --- /dev/null +++ b/crates/crunch-traits/src/transport.rs @@ -0,0 +1,12 @@ +use std::sync::Arc; + +use async_trait::async_trait; + +use crate::{errors::TransportError, EventInfo}; + +#[async_trait] +pub trait Transport { + async fn publish(&self, event_info: &EventInfo, content: Vec) + -> Result<(), TransportError>; +} +pub type DynTransport = Arc; diff --git a/crates/crunch/Cargo.toml b/crates/crunch/Cargo.toml index 7539027..8111044 100644 --- a/crates/crunch/Cargo.toml +++ b/crates/crunch/Cargo.toml @@ -5,6 +5,8 @@ edition = "2021" [dependencies] crunch-envelope.workspace = true +crunch-in-memory = { workspace = true, optional = true } +crunch-traits.workspace = true anyhow.workspace = true tracing.workspace = true @@ -16,4 +18,9 @@ uuid.workspace = true futures.workspace = true [dev-dependencies] -tracing-subscriber.workspace = true \ No newline at end of file +tracing-subscriber.workspace = true + +[features] +default = ["in-memory", "traits"] +traits = [] +in-memory = ["dep:crunch-in-memory"] \ No newline at end of file diff --git a/crates/crunch/examples/basic.rs b/crates/crunch/examples/basic.rs index b8f7ad5..663844d 100644 --- a/crates/crunch/examples/basic.rs +++ b/crates/crunch/examples/basic.rs @@ -1,17 +1,17 @@ -use crunch::{Deserializer, Event, EventInfo, OutboxHandler, Persistence, Publisher, Serializer}; +use crunch::errors::*; struct SomeEvent { name: String, } -impl Serializer for SomeEvent { - fn serialize(&self) -> Result, crunch::SerializeError> { +impl crunch::traits::Serializer for SomeEvent { + fn serialize(&self) -> Result, SerializeError> { Ok(b"field=name".to_vec()) } } -impl Deserializer for SomeEvent { - fn deserialize(_raw: Vec) -> Result +impl crunch::traits::Deserializer for SomeEvent { + fn deserialize(_raw: Vec) -> Result where Self: Sized, { @@ -21,11 +21,12 @@ impl Deserializer for SomeEvent { } } -impl Event for SomeEvent { - fn event_info(&self) -> EventInfo { - EventInfo { +impl crunch::traits::Event for SomeEvent { + fn event_info(&self) -> crunch::traits::EventInfo { + crunch::traits::EventInfo { domain: "some-domain", entity_type: "some-entity", + event_name: "some-event", } } } @@ -34,9 +35,10 @@ impl Event for SomeEvent { async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); - let in_memory = Persistence::in_memory(); - OutboxHandler::new(in_memory.clone()).spawn(); - let publisher = Publisher::new(in_memory); + let in_memory = crunch::Persistence::in_memory(); + let transport = crunch::Transport::in_memory(); + crunch::OutboxHandler::new(in_memory.clone(), transport.clone()).spawn(); + let publisher = crunch::Publisher::new(in_memory); publisher .publish(SomeEvent { diff --git a/crates/crunch/src/errors.rs b/crates/crunch/src/errors.rs deleted file mode 100644 index 202e003..0000000 --- a/crates/crunch/src/errors.rs +++ /dev/null @@ -1,28 +0,0 @@ -use thiserror::Error; - -#[derive(Error, Debug)] -pub enum SerializeError { - #[error("failed to serialize")] - FailedToSerialize(anyhow::Error), -} - -#[derive(Error, Debug)] -pub enum DeserializeError { - #[error("failed to serialize")] - FailedToDeserialize(anyhow::Error), -} - -#[derive(Error, Debug)] -pub enum PublishError { - #[error("failed to serialize")] - SerializeError(#[source] SerializeError), - - #[error("failed to commit to database")] - DbError(#[source] anyhow::Error), - - #[error("transaction failed")] - DbTxError(#[source] anyhow::Error), - - #[error("failed to connect to database")] - ConnectionError(#[source] anyhow::Error), -} diff --git a/crates/crunch/src/impls.rs b/crates/crunch/src/impls.rs index 74bc6b0..d3383e9 100644 --- a/crates/crunch/src/impls.rs +++ b/crates/crunch/src/impls.rs @@ -1,10 +1,13 @@ -use std::{collections::VecDeque, ops::Deref, sync::Arc}; +use std::{ + collections::{BTreeMap, VecDeque}, + ops::Deref, + sync::Arc, +}; use async_trait::async_trait; +use crunch_traits::{errors::PersistenceError, EventInfo}; use tokio::sync::RwLock; -use crate::{traits, EventInfo}; - #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] enum MsgState { Pending, @@ -21,20 +24,22 @@ struct Msg { pub struct InMemoryPersistence { outbox: Arc>>, + store: Arc>>, } #[async_trait] -impl traits::Persistence for InMemoryPersistence { +impl crunch_traits::Persistence for InMemoryPersistence { async fn insert(&self, event_info: &EventInfo, content: Vec) -> anyhow::Result<()> { let msg = crunch_envelope::proto::wrap(event_info.domain, event_info.entity_type, &content); - - let mut outbox = self.outbox.write().await; - outbox.push_back(Msg { + let msg = Msg { id: uuid::Uuid::new_v4().to_string(), info: event_info.clone(), msg, state: MsgState::Pending, - }); + }; + let mut outbox = self.outbox.write().await; + outbox.push_back(msg.clone()); + self.store.write().await.insert(msg.id.clone(), msg); tracing::info!( event_info = event_info.to_string(), @@ -49,25 +54,52 @@ impl traits::Persistence for InMemoryPersistence { let mut outbox = self.outbox.write().await; outbox.pop_front().map(|i| i.id) } + + async fn get(&self, event_id: &str) -> Result)>, PersistenceError> { + Ok(self + .store + .read() + .await + .get(event_id) + .filter(|m| m.state == MsgState::Pending) + .map(|m| m.clone()) + .map(|m| (m.info, m.msg))) + } + + async fn update_published(&self, event_id: &str) -> Result<(), PersistenceError> { + match self.store.write().await.get_mut(event_id) { + Some(msg) => msg.state = MsgState::Published, + None => { + return Err(PersistenceError::UpdatePublished(anyhow::anyhow!( + "event was not found on id: {}", + event_id + ))) + } + } + + Ok(()) + } } #[derive(Clone)] pub struct Persistence { - inner: Arc, + inner: Arc, } impl Persistence { + #[cfg(feature = "in-memory")] pub fn in_memory() -> Self { Self { - inner: Arc::new(InMemoryPersistence { - outbox: Arc::default(), + inner: std::sync::Arc::new(InMemoryPersistence { + outbox: std::sync::Arc::default(), + store: std::sync::Arc::default(), }), } } } impl Deref for Persistence { - type Target = Arc; + type Target = Arc; fn deref(&self) -> &Self::Target { &self.inner diff --git a/crates/crunch/src/lib.rs b/crates/crunch/src/lib.rs index 8ef4bd1..57a1f66 100644 --- a/crates/crunch/src/lib.rs +++ b/crates/crunch/src/lib.rs @@ -1,70 +1,18 @@ -mod errors; mod impls; -mod traits; +mod outbox; +mod publisher; +mod transport; + +#[cfg(feature = "traits")] +pub mod traits { + pub use crunch_traits::{Deserializer, Event, EventInfo, Persistence, Serializer, Transport}; +} + +pub mod errors { + pub use crunch_traits::errors::*; +} -pub use errors::*; pub use impls::Persistence; pub use outbox::OutboxHandler; -pub use traits::{Deserializer, Event, EventInfo, Serializer}; - -mod outbox { - use crate::Persistence; - - pub struct OutboxHandler { - persistence: Persistence, - } - - impl OutboxHandler { - pub fn new(persistence: Persistence) -> Self { - Self { persistence } - } - - pub fn spawn(&mut self) { - let p = self.persistence.clone(); - tokio::spawn(async move { - loop { - match p.next().await { - Some(item) => { - tracing::info!("got item: {}", item); - } - None => { - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - } - } - } - }); - } - } -} - -pub struct Publisher { - persistence: Persistence, -} - -#[allow(dead_code)] -impl Publisher { - pub fn new(persistence: Persistence) -> Self { - Self { persistence } - } - - pub async fn publish(&self, event: T) -> Result<(), PublishError> - where - T: Event, - { - let content = event.serialize().map_err(PublishError::SerializeError)?; - - self.persistence - .insert(&event.event_info(), content) - .await - .map_err(PublishError::DbError)?; - - Ok(()) - } - pub async fn publish_tx(&self, event: T) -> Result<(), PublishError> - where - T: Event, - { - // TODO: add transaction support later - self.publish(event).await - } -} +pub use publisher::Publisher; +pub use transport::Transport; diff --git a/crates/crunch/src/outbox.rs b/crates/crunch/src/outbox.rs new file mode 100644 index 0000000..e0c2948 --- /dev/null +++ b/crates/crunch/src/outbox.rs @@ -0,0 +1,52 @@ +use crate::{Persistence, Transport}; + +pub struct OutboxHandler { + persistence: Persistence, + transport: Transport, +} + +impl OutboxHandler { + pub fn new(persistence: Persistence, transport: Transport) -> Self { + Self { + persistence, + transport, + } + } + + pub fn spawn(&mut self) { + let p = self.persistence.clone(); + let t = self.transport.clone(); + tokio::spawn(async move { + loop { + match handle_messages(&p, &t).await { + Err(e) => { + tracing::error!("failed to handle message: {}", e); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + } + Ok(None) => { + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + } + _ => (), + } + } + }); + } +} + +async fn handle_messages(p: &Persistence, t: &Transport) -> anyhow::Result> { + match p.next().await { + Some(item) => match p.get(&item).await? { + Some((info, content)) => { + t.publish(&info, content).await?; + p.update_published(&item).await?; + tracing::info!("published item: {}", item); + } + None => { + tracing::info!("did not find any events for item: {}", item); + } + }, + None => return Ok(None), + } + + Ok(Some(())) +} diff --git a/crates/crunch/src/publisher.rs b/crates/crunch/src/publisher.rs new file mode 100644 index 0000000..14e9885 --- /dev/null +++ b/crates/crunch/src/publisher.rs @@ -0,0 +1,35 @@ +use crunch_traits::{errors::PublishError, Event}; + +use crate::Persistence; + +pub struct Publisher { + persistence: Persistence, +} + +#[allow(dead_code)] +impl Publisher { + pub fn new(persistence: Persistence) -> Self { + Self { persistence } + } + + pub async fn publish(&self, event: T) -> Result<(), PublishError> + where + T: Event, + { + let content = event.serialize().map_err(PublishError::SerializeError)?; + + self.persistence + .insert(&event.event_info(), content) + .await + .map_err(PublishError::DbError)?; + + Ok(()) + } + pub async fn publish_tx(&self, event: T) -> Result<(), PublishError> + where + T: Event, + { + // TODO: add transaction support later + self.publish(event).await + } +} diff --git a/crates/crunch/src/transport.rs b/crates/crunch/src/transport.rs new file mode 100644 index 0000000..1fba2bf --- /dev/null +++ b/crates/crunch/src/transport.rs @@ -0,0 +1,31 @@ +use crunch_traits::DynTransport; + +#[derive(Clone)] +pub struct Transport(DynTransport); + +impl Transport { + pub fn new(transport: DynTransport) -> Self { + Self(transport) + } + + #[cfg(feature = "in-memory")] + pub fn in_memory() -> Self { + Self(std::sync::Arc::new( + crunch_in_memory::InMemoryTransport::default(), + )) + } +} + +impl From for Transport { + fn from(value: DynTransport) -> Self { + Self::new(value) + } +} + +impl std::ops::Deref for Transport { + type Target = DynTransport; + + fn deref(&self) -> &Self::Target { + &self.0 + } +}