@@ -5,6 +5,8 @@ edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
crunch-envelope.workspace = true
|
||||
crunch-in-memory = { workspace = true, optional = true }
|
||||
crunch-traits.workspace = true
|
||||
|
||||
anyhow.workspace = true
|
||||
tracing.workspace = true
|
||||
@@ -16,4 +18,9 @@ uuid.workspace = true
|
||||
futures.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tracing-subscriber.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
|
||||
[features]
|
||||
default = ["in-memory", "traits"]
|
||||
traits = []
|
||||
in-memory = ["dep:crunch-in-memory"]
|
@@ -1,17 +1,17 @@
|
||||
use crunch::{Deserializer, Event, EventInfo, OutboxHandler, Persistence, Publisher, Serializer};
|
||||
use crunch::errors::*;
|
||||
|
||||
struct SomeEvent {
|
||||
name: String,
|
||||
}
|
||||
|
||||
impl Serializer for SomeEvent {
|
||||
fn serialize(&self) -> Result<Vec<u8>, crunch::SerializeError> {
|
||||
impl crunch::traits::Serializer for SomeEvent {
|
||||
fn serialize(&self) -> Result<Vec<u8>, SerializeError> {
|
||||
Ok(b"field=name".to_vec())
|
||||
}
|
||||
}
|
||||
|
||||
impl Deserializer for SomeEvent {
|
||||
fn deserialize(_raw: Vec<u8>) -> Result<Self, crunch::DeserializeError>
|
||||
impl crunch::traits::Deserializer for SomeEvent {
|
||||
fn deserialize(_raw: Vec<u8>) -> Result<Self, DeserializeError>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
@@ -21,11 +21,12 @@ impl Deserializer for SomeEvent {
|
||||
}
|
||||
}
|
||||
|
||||
impl Event for SomeEvent {
|
||||
fn event_info(&self) -> EventInfo {
|
||||
EventInfo {
|
||||
impl crunch::traits::Event for SomeEvent {
|
||||
fn event_info(&self) -> crunch::traits::EventInfo {
|
||||
crunch::traits::EventInfo {
|
||||
domain: "some-domain",
|
||||
entity_type: "some-entity",
|
||||
event_name: "some-event",
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -34,9 +35,10 @@ impl Event for SomeEvent {
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
let in_memory = Persistence::in_memory();
|
||||
OutboxHandler::new(in_memory.clone()).spawn();
|
||||
let publisher = Publisher::new(in_memory);
|
||||
let in_memory = crunch::Persistence::in_memory();
|
||||
let transport = crunch::Transport::in_memory();
|
||||
crunch::OutboxHandler::new(in_memory.clone(), transport.clone()).spawn();
|
||||
let publisher = crunch::Publisher::new(in_memory);
|
||||
|
||||
publisher
|
||||
.publish(SomeEvent {
|
||||
|
@@ -1,28 +0,0 @@
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum SerializeError {
|
||||
#[error("failed to serialize")]
|
||||
FailedToSerialize(anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum DeserializeError {
|
||||
#[error("failed to serialize")]
|
||||
FailedToDeserialize(anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum PublishError {
|
||||
#[error("failed to serialize")]
|
||||
SerializeError(#[source] SerializeError),
|
||||
|
||||
#[error("failed to commit to database")]
|
||||
DbError(#[source] anyhow::Error),
|
||||
|
||||
#[error("transaction failed")]
|
||||
DbTxError(#[source] anyhow::Error),
|
||||
|
||||
#[error("failed to connect to database")]
|
||||
ConnectionError(#[source] anyhow::Error),
|
||||
}
|
@@ -1,10 +1,13 @@
|
||||
use std::{collections::VecDeque, ops::Deref, sync::Arc};
|
||||
use std::{
|
||||
collections::{BTreeMap, VecDeque},
|
||||
ops::Deref,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use crunch_traits::{errors::PersistenceError, EventInfo};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::{traits, EventInfo};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
|
||||
enum MsgState {
|
||||
Pending,
|
||||
@@ -21,20 +24,22 @@ struct Msg {
|
||||
|
||||
pub struct InMemoryPersistence {
|
||||
outbox: Arc<RwLock<VecDeque<Msg>>>,
|
||||
store: Arc<RwLock<BTreeMap<String, Msg>>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl traits::Persistence for InMemoryPersistence {
|
||||
impl crunch_traits::Persistence for InMemoryPersistence {
|
||||
async fn insert(&self, event_info: &EventInfo, content: Vec<u8>) -> anyhow::Result<()> {
|
||||
let msg = crunch_envelope::proto::wrap(event_info.domain, event_info.entity_type, &content);
|
||||
|
||||
let mut outbox = self.outbox.write().await;
|
||||
outbox.push_back(Msg {
|
||||
let msg = Msg {
|
||||
id: uuid::Uuid::new_v4().to_string(),
|
||||
info: event_info.clone(),
|
||||
msg,
|
||||
state: MsgState::Pending,
|
||||
});
|
||||
};
|
||||
let mut outbox = self.outbox.write().await;
|
||||
outbox.push_back(msg.clone());
|
||||
self.store.write().await.insert(msg.id.clone(), msg);
|
||||
|
||||
tracing::info!(
|
||||
event_info = event_info.to_string(),
|
||||
@@ -49,25 +54,52 @@ impl traits::Persistence for InMemoryPersistence {
|
||||
let mut outbox = self.outbox.write().await;
|
||||
outbox.pop_front().map(|i| i.id)
|
||||
}
|
||||
|
||||
async fn get(&self, event_id: &str) -> Result<Option<(EventInfo, Vec<u8>)>, PersistenceError> {
|
||||
Ok(self
|
||||
.store
|
||||
.read()
|
||||
.await
|
||||
.get(event_id)
|
||||
.filter(|m| m.state == MsgState::Pending)
|
||||
.map(|m| m.clone())
|
||||
.map(|m| (m.info, m.msg)))
|
||||
}
|
||||
|
||||
async fn update_published(&self, event_id: &str) -> Result<(), PersistenceError> {
|
||||
match self.store.write().await.get_mut(event_id) {
|
||||
Some(msg) => msg.state = MsgState::Published,
|
||||
None => {
|
||||
return Err(PersistenceError::UpdatePublished(anyhow::anyhow!(
|
||||
"event was not found on id: {}",
|
||||
event_id
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Persistence {
|
||||
inner: Arc<dyn traits::Persistence + Send + Sync + 'static>,
|
||||
inner: Arc<dyn crunch_traits::Persistence + Send + Sync + 'static>,
|
||||
}
|
||||
|
||||
impl Persistence {
|
||||
#[cfg(feature = "in-memory")]
|
||||
pub fn in_memory() -> Self {
|
||||
Self {
|
||||
inner: Arc::new(InMemoryPersistence {
|
||||
outbox: Arc::default(),
|
||||
inner: std::sync::Arc::new(InMemoryPersistence {
|
||||
outbox: std::sync::Arc::default(),
|
||||
store: std::sync::Arc::default(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for Persistence {
|
||||
type Target = Arc<dyn traits::Persistence + Send + Sync + 'static>;
|
||||
type Target = Arc<dyn crunch_traits::Persistence + Send + Sync + 'static>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.inner
|
||||
|
@@ -1,70 +1,18 @@
|
||||
mod errors;
|
||||
mod impls;
|
||||
mod traits;
|
||||
mod outbox;
|
||||
mod publisher;
|
||||
mod transport;
|
||||
|
||||
#[cfg(feature = "traits")]
|
||||
pub mod traits {
|
||||
pub use crunch_traits::{Deserializer, Event, EventInfo, Persistence, Serializer, Transport};
|
||||
}
|
||||
|
||||
pub mod errors {
|
||||
pub use crunch_traits::errors::*;
|
||||
}
|
||||
|
||||
pub use errors::*;
|
||||
pub use impls::Persistence;
|
||||
pub use outbox::OutboxHandler;
|
||||
pub use traits::{Deserializer, Event, EventInfo, Serializer};
|
||||
|
||||
mod outbox {
|
||||
use crate::Persistence;
|
||||
|
||||
pub struct OutboxHandler {
|
||||
persistence: Persistence,
|
||||
}
|
||||
|
||||
impl OutboxHandler {
|
||||
pub fn new(persistence: Persistence) -> Self {
|
||||
Self { persistence }
|
||||
}
|
||||
|
||||
pub fn spawn(&mut self) {
|
||||
let p = self.persistence.clone();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match p.next().await {
|
||||
Some(item) => {
|
||||
tracing::info!("got item: {}", item);
|
||||
}
|
||||
None => {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Publisher {
|
||||
persistence: Persistence,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl Publisher {
|
||||
pub fn new(persistence: Persistence) -> Self {
|
||||
Self { persistence }
|
||||
}
|
||||
|
||||
pub async fn publish<T>(&self, event: T) -> Result<(), PublishError>
|
||||
where
|
||||
T: Event,
|
||||
{
|
||||
let content = event.serialize().map_err(PublishError::SerializeError)?;
|
||||
|
||||
self.persistence
|
||||
.insert(&event.event_info(), content)
|
||||
.await
|
||||
.map_err(PublishError::DbError)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
pub async fn publish_tx<T>(&self, event: T) -> Result<(), PublishError>
|
||||
where
|
||||
T: Event,
|
||||
{
|
||||
// TODO: add transaction support later
|
||||
self.publish(event).await
|
||||
}
|
||||
}
|
||||
pub use publisher::Publisher;
|
||||
pub use transport::Transport;
|
||||
|
52
crates/crunch/src/outbox.rs
Normal file
52
crates/crunch/src/outbox.rs
Normal file
@@ -0,0 +1,52 @@
|
||||
use crate::{Persistence, Transport};
|
||||
|
||||
pub struct OutboxHandler {
|
||||
persistence: Persistence,
|
||||
transport: Transport,
|
||||
}
|
||||
|
||||
impl OutboxHandler {
|
||||
pub fn new(persistence: Persistence, transport: Transport) -> Self {
|
||||
Self {
|
||||
persistence,
|
||||
transport,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn(&mut self) {
|
||||
let p = self.persistence.clone();
|
||||
let t = self.transport.clone();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match handle_messages(&p, &t).await {
|
||||
Err(e) => {
|
||||
tracing::error!("failed to handle message: {}", e);
|
||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||
}
|
||||
Ok(None) => {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_messages(p: &Persistence, t: &Transport) -> anyhow::Result<Option<()>> {
|
||||
match p.next().await {
|
||||
Some(item) => match p.get(&item).await? {
|
||||
Some((info, content)) => {
|
||||
t.publish(&info, content).await?;
|
||||
p.update_published(&item).await?;
|
||||
tracing::info!("published item: {}", item);
|
||||
}
|
||||
None => {
|
||||
tracing::info!("did not find any events for item: {}", item);
|
||||
}
|
||||
},
|
||||
None => return Ok(None),
|
||||
}
|
||||
|
||||
Ok(Some(()))
|
||||
}
|
35
crates/crunch/src/publisher.rs
Normal file
35
crates/crunch/src/publisher.rs
Normal file
@@ -0,0 +1,35 @@
|
||||
use crunch_traits::{errors::PublishError, Event};
|
||||
|
||||
use crate::Persistence;
|
||||
|
||||
pub struct Publisher {
|
||||
persistence: Persistence,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl Publisher {
|
||||
pub fn new(persistence: Persistence) -> Self {
|
||||
Self { persistence }
|
||||
}
|
||||
|
||||
pub async fn publish<T>(&self, event: T) -> Result<(), PublishError>
|
||||
where
|
||||
T: Event,
|
||||
{
|
||||
let content = event.serialize().map_err(PublishError::SerializeError)?;
|
||||
|
||||
self.persistence
|
||||
.insert(&event.event_info(), content)
|
||||
.await
|
||||
.map_err(PublishError::DbError)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
pub async fn publish_tx<T>(&self, event: T) -> Result<(), PublishError>
|
||||
where
|
||||
T: Event,
|
||||
{
|
||||
// TODO: add transaction support later
|
||||
self.publish(event).await
|
||||
}
|
||||
}
|
@@ -1,40 +0,0 @@
|
||||
use std::fmt::Display;
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::{DeserializeError, SerializeError};
|
||||
|
||||
#[async_trait]
|
||||
pub trait Persistence {
|
||||
async fn insert(&self, event_info: &EventInfo, content: Vec<u8>) -> anyhow::Result<()>;
|
||||
async fn next(&self) -> Option<String>;
|
||||
}
|
||||
|
||||
pub trait Serializer {
|
||||
fn serialize(&self) -> Result<Vec<u8>, SerializeError>;
|
||||
}
|
||||
|
||||
pub trait Deserializer {
|
||||
fn deserialize(raw: Vec<u8>) -> Result<Self, DeserializeError>
|
||||
where
|
||||
Self: Sized;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct EventInfo {
|
||||
pub domain: &'static str,
|
||||
pub entity_type: &'static str,
|
||||
}
|
||||
|
||||
impl Display for EventInfo {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str(&format!(
|
||||
"domain: {}, entity_type: {}",
|
||||
self.domain, self.entity_type
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Event: Serializer + Deserializer {
|
||||
fn event_info(&self) -> EventInfo;
|
||||
}
|
31
crates/crunch/src/transport.rs
Normal file
31
crates/crunch/src/transport.rs
Normal file
@@ -0,0 +1,31 @@
|
||||
use crunch_traits::DynTransport;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Transport(DynTransport);
|
||||
|
||||
impl Transport {
|
||||
pub fn new(transport: DynTransport) -> Self {
|
||||
Self(transport)
|
||||
}
|
||||
|
||||
#[cfg(feature = "in-memory")]
|
||||
pub fn in_memory() -> Self {
|
||||
Self(std::sync::Arc::new(
|
||||
crunch_in_memory::InMemoryTransport::default(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DynTransport> for Transport {
|
||||
fn from(value: DynTransport) -> Self {
|
||||
Self::new(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for Transport {
|
||||
type Target = DynTransport;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user