29
crates/como_infrastructure/src/configs/mod.rs
Normal file
29
crates/como_infrastructure/src/configs/mod.rs
Normal file
@@ -0,0 +1,29 @@
|
||||
use clap::ValueEnum;
|
||||
use como_auth::AuthClap;
|
||||
|
||||
#[derive(clap::Parser)]
|
||||
pub struct AppConfig {
|
||||
#[clap(long, env)]
|
||||
pub database_url: String,
|
||||
#[clap(long, env, default_value = "postgres")]
|
||||
pub database_type: DatabaseType,
|
||||
#[clap(long, env)]
|
||||
pub rust_log: String,
|
||||
#[clap(long, env, default_value = "3001")]
|
||||
pub api_port: u32,
|
||||
#[clap(long, env, default_value = "true")]
|
||||
pub run_migrations: bool,
|
||||
#[clap(long, env, default_value = "false")]
|
||||
pub seed: bool,
|
||||
#[clap(long, env)]
|
||||
pub cors_origin: String,
|
||||
|
||||
#[clap(flatten)]
|
||||
pub auth: AuthClap,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, ValueEnum)]
|
||||
pub enum DatabaseType {
|
||||
Postgres,
|
||||
InMemory,
|
||||
}
|
33
crates/como_infrastructure/src/database/mod.rs
Normal file
33
crates/como_infrastructure/src/database/mod.rs
Normal file
@@ -0,0 +1,33 @@
|
||||
use anyhow::Context;
|
||||
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
|
||||
use tracing::log::info;
|
||||
|
||||
pub type ConnectionPool = Pool<Postgres>;
|
||||
|
||||
pub struct ConnectionPoolManager;
|
||||
|
||||
impl ConnectionPoolManager {
|
||||
pub async fn new_pool(
|
||||
connection_string: &str,
|
||||
run_migrations: bool,
|
||||
) -> anyhow::Result<ConnectionPool> {
|
||||
info!("initializing the database connection pool");
|
||||
let pool = PgPoolOptions::new()
|
||||
.max_connections(5)
|
||||
.connect(connection_string)
|
||||
.await
|
||||
.context("error while initializing the database connection pool")?;
|
||||
|
||||
if run_migrations {
|
||||
info!("migrations enabled");
|
||||
info!("migrating database");
|
||||
|
||||
sqlx::migrate!()
|
||||
.run(&pool)
|
||||
.await
|
||||
.context("error while running database migrations")?;
|
||||
}
|
||||
|
||||
Ok(pool)
|
||||
}
|
||||
}
|
5
crates/como_infrastructure/src/lib.rs
Normal file
5
crates/como_infrastructure/src/lib.rs
Normal file
@@ -0,0 +1,5 @@
|
||||
pub mod configs;
|
||||
pub mod database;
|
||||
pub mod register;
|
||||
pub mod repositories;
|
||||
pub mod services;
|
74
crates/como_infrastructure/src/register.rs
Normal file
74
crates/como_infrastructure/src/register.rs
Normal file
@@ -0,0 +1,74 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_sqlx_session::PostgresSessionStore;
|
||||
use como_auth::{AuthService, SessionService};
|
||||
use como_core::{items::DynItemService, projects::DynProjectService, users::DynUserService};
|
||||
use tracing::log::info;
|
||||
|
||||
use crate::{
|
||||
configs::{AppConfig, DatabaseType},
|
||||
database::ConnectionPool,
|
||||
services::{
|
||||
item_service::{DefaultItemService, MemoryItemService},
|
||||
project_service::{DefaultProjectService, MemoryProjectService},
|
||||
user_service::DefaultUserService,
|
||||
},
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ServiceRegister {
|
||||
pub item_service: DynItemService,
|
||||
pub project_service: DynProjectService,
|
||||
pub user_service: DynUserService,
|
||||
pub session_store: PostgresSessionStore,
|
||||
pub auth_service: AuthService,
|
||||
}
|
||||
|
||||
impl ServiceRegister {
|
||||
pub async fn new(pool: ConnectionPool, config: Arc<AppConfig>) -> anyhow::Result<Self> {
|
||||
info!("creating services");
|
||||
|
||||
let session = SessionService::new(&config.auth).await?;
|
||||
let auth = AuthService::new(&config.auth, session).await?;
|
||||
|
||||
let s = match config.database_type {
|
||||
DatabaseType::Postgres => {
|
||||
let item_service =
|
||||
Arc::new(DefaultItemService::new(pool.clone())) as DynItemService;
|
||||
let project_service =
|
||||
Arc::new(DefaultProjectService::new(pool.clone())) as DynProjectService;
|
||||
let user_service =
|
||||
Arc::new(DefaultUserService::new(pool.clone())) as DynUserService;
|
||||
let store = PostgresSessionStore::new(&config.database_url).await?;
|
||||
store.migrate().await?;
|
||||
|
||||
Self {
|
||||
item_service,
|
||||
user_service,
|
||||
project_service,
|
||||
session_store: store,
|
||||
auth_service: auth,
|
||||
}
|
||||
}
|
||||
DatabaseType::InMemory => {
|
||||
let item_service = Arc::new(MemoryItemService::new()) as DynItemService;
|
||||
let project_service = Arc::new(MemoryProjectService::new()) as DynProjectService;
|
||||
let user_service =
|
||||
Arc::new(DefaultUserService::new(pool.clone())) as DynUserService;
|
||||
let store = PostgresSessionStore::new(&config.database_url).await?;
|
||||
store.migrate().await?;
|
||||
|
||||
Self {
|
||||
item_service,
|
||||
user_service,
|
||||
project_service,
|
||||
session_store: store,
|
||||
auth_service: auth,
|
||||
}
|
||||
}
|
||||
};
|
||||
info!("services created succesfully");
|
||||
|
||||
Ok(s)
|
||||
}
|
||||
}
|
1
crates/como_infrastructure/src/repositories/mod.rs
Normal file
1
crates/como_infrastructure/src/repositories/mod.rs
Normal file
@@ -0,0 +1 @@
|
||||
|
221
crates/como_infrastructure/src/services/item_service.rs
Normal file
221
crates/como_infrastructure/src/services/item_service.rs
Normal file
@@ -0,0 +1,221 @@
|
||||
use crate::database::ConnectionPool;
|
||||
use async_trait::async_trait;
|
||||
use como_core::items::ItemService;
|
||||
use como_domain::{
|
||||
item::{
|
||||
queries::{GetItemQuery, GetItemsQuery},
|
||||
requests::{CreateItemDto, UpdateItemDto},
|
||||
responses::CreatedItemDto,
|
||||
ItemDto,
|
||||
},
|
||||
user::ContextUserExt,
|
||||
Context,
|
||||
};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub struct DefaultItemService {
|
||||
pool: ConnectionPool,
|
||||
}
|
||||
|
||||
impl DefaultItemService {
|
||||
pub fn new(connection_pool: ConnectionPool) -> Self {
|
||||
Self {
|
||||
pool: connection_pool,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ItemService for DefaultItemService {
|
||||
async fn add_item(
|
||||
&self,
|
||||
context: &Context,
|
||||
item: CreateItemDto,
|
||||
) -> anyhow::Result<CreatedItemDto> {
|
||||
let state = serde_json::to_string(&como_domain::item::ItemState::Created {})?;
|
||||
let user_id = context.get_user_id().ok_or(anyhow::anyhow!("no user id"))?;
|
||||
|
||||
let rec = sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO items (id, title, description, state, project_id, user_id, created_at, updated_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, now(), now())
|
||||
RETURNING id, title, description, state, project_id
|
||||
"#,
|
||||
Uuid::new_v4(),
|
||||
item.title,
|
||||
item.description,
|
||||
state,
|
||||
item.project_id,
|
||||
user_id,
|
||||
)
|
||||
.fetch_one(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(CreatedItemDto {
|
||||
id: rec.id,
|
||||
title: rec.title,
|
||||
description: rec.description,
|
||||
state: como_domain::item::ItemState::Created {},
|
||||
project_id: rec.project_id,
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_item(&self, context: &Context, query: GetItemQuery) -> anyhow::Result<ItemDto> {
|
||||
let user_id = context.get_user_id().ok_or(anyhow::anyhow!("no user id"))?;
|
||||
|
||||
let rec = sqlx::query!(
|
||||
r#"
|
||||
SELECT id, title, description, state, project_id
|
||||
FROM items
|
||||
WHERE id = $1 AND user_id = $2
|
||||
"#,
|
||||
query.item_id,
|
||||
user_id,
|
||||
)
|
||||
.fetch_one(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(ItemDto {
|
||||
id: rec.id,
|
||||
title: rec.title,
|
||||
description: rec.description,
|
||||
state: serde_json::from_str(&rec.state)?,
|
||||
project_id: rec.project_id,
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_items(
|
||||
&self,
|
||||
context: &Context,
|
||||
query: GetItemsQuery,
|
||||
) -> anyhow::Result<Vec<ItemDto>> {
|
||||
let user_id = context.get_user_id().ok_or(anyhow::anyhow!("no user id"))?;
|
||||
|
||||
let recs = sqlx::query!(
|
||||
r#"
|
||||
SELECT id, title, description, state, project_id
|
||||
FROM items
|
||||
WHERE user_id = $1 and project_id = $2
|
||||
LIMIT 500
|
||||
"#,
|
||||
user_id,
|
||||
query.project_id,
|
||||
)
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(recs
|
||||
.into_iter()
|
||||
.map(|rec| ItemDto {
|
||||
id: rec.id,
|
||||
title: rec.title,
|
||||
description: rec.description,
|
||||
state: serde_json::from_str(&rec.state).unwrap(),
|
||||
project_id: rec.project_id,
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn update_item(&self, context: &Context, item: UpdateItemDto) -> anyhow::Result<ItemDto> {
|
||||
let state = item.state.map(|s| serde_json::to_string(&s)).transpose()?;
|
||||
let user_id = context.get_user_id().ok_or(anyhow::anyhow!("no user id"))?;
|
||||
|
||||
let rec = sqlx::query!(
|
||||
r#"
|
||||
UPDATE items
|
||||
SET
|
||||
title = COALESCE($1, title),
|
||||
description = COALESCE($2, description),
|
||||
state = COALESCE($3, state),
|
||||
project_id = COALESCE($4, project_id),
|
||||
updated_at = now()
|
||||
WHERE id = $5 AND user_id = $6
|
||||
RETURNING id, title, description, state, project_id
|
||||
"#,
|
||||
item.title,
|
||||
item.description,
|
||||
state,
|
||||
item.project_id,
|
||||
item.id,
|
||||
user_id,
|
||||
)
|
||||
.fetch_one(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(ItemDto {
|
||||
id: rec.id,
|
||||
title: rec.title,
|
||||
description: rec.description,
|
||||
state: serde_json::from_str(&rec.state)?,
|
||||
project_id: rec.project_id,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MemoryItemService {
|
||||
item_store: Arc<Mutex<HashMap<String, ItemDto>>>,
|
||||
}
|
||||
|
||||
impl MemoryItemService {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
item_store: Arc::new(Mutex::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ItemService for MemoryItemService {
|
||||
async fn add_item(
|
||||
&self,
|
||||
_context: &Context,
|
||||
create_item: CreateItemDto,
|
||||
) -> anyhow::Result<CreatedItemDto> {
|
||||
if let Ok(mut item_store) = self.item_store.lock() {
|
||||
let item = ItemDto {
|
||||
id: Uuid::new_v4(),
|
||||
title: create_item.title,
|
||||
description: create_item.description,
|
||||
state: como_domain::item::ItemState::Created,
|
||||
project_id: create_item.project_id,
|
||||
};
|
||||
|
||||
item_store.insert(item.id.to_string(), item.clone());
|
||||
|
||||
return Ok(item);
|
||||
} else {
|
||||
Err(anyhow::anyhow!("could not unlock item_store"))
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_item(&self, _context: &Context, query: GetItemQuery) -> anyhow::Result<ItemDto> {
|
||||
if let Ok(item_store) = self.item_store.lock() {
|
||||
let item = item_store
|
||||
.get(&query.item_id.to_string())
|
||||
.ok_or(anyhow::anyhow!("could not find item"))?;
|
||||
return Ok(item.clone());
|
||||
} else {
|
||||
Err(anyhow::anyhow!("could not unlock item_store"))
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_items(
|
||||
&self,
|
||||
_context: &Context,
|
||||
_query: GetItemsQuery,
|
||||
) -> anyhow::Result<Vec<ItemDto>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn update_item(
|
||||
&self,
|
||||
_context: &Context,
|
||||
_item: UpdateItemDto,
|
||||
) -> anyhow::Result<ItemDto> {
|
||||
todo!()
|
||||
}
|
||||
}
|
3
crates/como_infrastructure/src/services/mod.rs
Normal file
3
crates/como_infrastructure/src/services/mod.rs
Normal file
@@ -0,0 +1,3 @@
|
||||
pub mod item_service;
|
||||
pub mod project_service;
|
||||
pub mod user_service;
|
167
crates/como_infrastructure/src/services/project_service.rs
Normal file
167
crates/como_infrastructure/src/services/project_service.rs
Normal file
@@ -0,0 +1,167 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use axum::async_trait;
|
||||
use como_core::projects::ProjectService;
|
||||
use como_domain::{
|
||||
projects::{mutation::CreateProjectMutation, queries::GetProjectQuery, ProjectDto},
|
||||
user::ContextUserExt,
|
||||
Context,
|
||||
};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::database::ConnectionPool;
|
||||
|
||||
pub struct DefaultProjectService {
|
||||
pool: ConnectionPool,
|
||||
}
|
||||
|
||||
impl DefaultProjectService {
|
||||
pub fn new(connection_pool: ConnectionPool) -> Self {
|
||||
Self {
|
||||
pool: connection_pool,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ProjectService for DefaultProjectService {
|
||||
async fn get_project(
|
||||
&self,
|
||||
context: &Context,
|
||||
query: GetProjectQuery,
|
||||
) -> anyhow::Result<ProjectDto> {
|
||||
let user_id = context.get_user_id().ok_or(anyhow::anyhow!("no user id"))?;
|
||||
|
||||
let rec = sqlx::query!(
|
||||
r#"
|
||||
SELECT id, name, description, user_id
|
||||
FROM projects
|
||||
WHERE id = $1 and user_id = $2
|
||||
"#,
|
||||
query.project_id,
|
||||
&user_id
|
||||
)
|
||||
.fetch_one(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(ProjectDto {
|
||||
id: rec.id,
|
||||
name: rec.name,
|
||||
description: rec.description,
|
||||
user_id: rec.user_id,
|
||||
})
|
||||
}
|
||||
async fn get_projects(&self, context: &Context) -> anyhow::Result<Vec<ProjectDto>> {
|
||||
let user_id = context.get_user_id().ok_or(anyhow::anyhow!("no user id"))?;
|
||||
|
||||
let recs = sqlx::query!(
|
||||
r#"
|
||||
SELECT id, name, description, user_id
|
||||
FROM projects
|
||||
WHERE user_id = $1
|
||||
LIMIT 500
|
||||
"#,
|
||||
&user_id
|
||||
)
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(recs
|
||||
.into_iter()
|
||||
.map(|rec| ProjectDto {
|
||||
id: rec.id,
|
||||
name: rec.name,
|
||||
description: rec.description,
|
||||
user_id: rec.user_id,
|
||||
})
|
||||
.collect::<_>())
|
||||
}
|
||||
async fn create_project(
|
||||
&self,
|
||||
context: &Context,
|
||||
request: CreateProjectMutation,
|
||||
) -> anyhow::Result<ProjectDto> {
|
||||
let user_id = context.get_user_id().ok_or(anyhow::anyhow!("no user id"))?;
|
||||
|
||||
let rec = sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO projects (id, name, description, user_id, created_at, updated_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6)
|
||||
RETURNING id
|
||||
"#,
|
||||
uuid::Uuid::new_v4(),
|
||||
request.name,
|
||||
request.description,
|
||||
&user_id,
|
||||
chrono::Utc::now().naive_utc(),
|
||||
chrono::Utc::now().naive_utc(),
|
||||
)
|
||||
.fetch_one(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(ProjectDto {
|
||||
id: rec.id,
|
||||
name: request.name,
|
||||
description: request.description,
|
||||
user_id: user_id.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MemoryProjectService {
|
||||
project_store: Arc<Mutex<HashMap<String, ProjectDto>>>,
|
||||
}
|
||||
|
||||
impl MemoryProjectService {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
project_store: Arc::new(Mutex::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ProjectService for MemoryProjectService {
|
||||
async fn get_project(
|
||||
&self,
|
||||
_context: &Context,
|
||||
query: GetProjectQuery,
|
||||
) -> anyhow::Result<ProjectDto> {
|
||||
let ps = self.project_store.lock().await;
|
||||
Ok(ps
|
||||
.get(&query.project_id.to_string())
|
||||
.ok_or(anyhow::anyhow!("could not find project"))?
|
||||
.clone())
|
||||
}
|
||||
async fn get_projects(&self, context: &Context) -> anyhow::Result<Vec<ProjectDto>> {
|
||||
let user_id = context.get_user_id().ok_or(anyhow::anyhow!("no user id"))?;
|
||||
|
||||
Ok(self
|
||||
.project_store
|
||||
.lock()
|
||||
.await
|
||||
.values()
|
||||
.filter(|p| p.user_id == user_id)
|
||||
.cloned()
|
||||
.collect::<_>())
|
||||
}
|
||||
|
||||
async fn create_project(
|
||||
&self,
|
||||
context: &Context,
|
||||
mutation: CreateProjectMutation,
|
||||
) -> anyhow::Result<ProjectDto> {
|
||||
let user_id = context.get_user_id().ok_or(anyhow::anyhow!("no user id"))?;
|
||||
|
||||
let mut ps = self.project_store.lock().await;
|
||||
let project = ProjectDto {
|
||||
id: uuid::Uuid::new_v4(),
|
||||
name: mutation.name,
|
||||
description: None,
|
||||
user_id,
|
||||
};
|
||||
|
||||
ps.insert(project.id.to_string(), project.clone());
|
||||
Ok(project)
|
||||
}
|
||||
}
|
95
crates/como_infrastructure/src/services/user_service.rs
Normal file
95
crates/como_infrastructure/src/services/user_service.rs
Normal file
@@ -0,0 +1,95 @@
|
||||
use argon2::{password_hash::SaltString, Argon2, PasswordHash, PasswordHasher, PasswordVerifier};
|
||||
use axum::async_trait;
|
||||
use como_core::users::UserService;
|
||||
use como_domain::Context;
|
||||
use rand_core::OsRng;
|
||||
|
||||
use crate::database::ConnectionPool;
|
||||
|
||||
pub struct DefaultUserService {
|
||||
pool: ConnectionPool,
|
||||
}
|
||||
|
||||
impl DefaultUserService {
|
||||
pub fn new(pool: ConnectionPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
|
||||
fn hash_password(&self, _context: &Context, password: String) -> anyhow::Result<String> {
|
||||
let salt = SaltString::generate(&mut OsRng);
|
||||
let argon2 = Argon2::default();
|
||||
|
||||
let password_hash = argon2
|
||||
.hash_password(password.as_bytes(), &salt)
|
||||
.map_err(|e| anyhow::anyhow!(e))?
|
||||
.to_string();
|
||||
|
||||
Ok(password_hash)
|
||||
}
|
||||
|
||||
fn validate_password(
|
||||
&self,
|
||||
_context: &Context,
|
||||
password: String,
|
||||
hashed_password: String,
|
||||
) -> anyhow::Result<bool> {
|
||||
let argon2 = Argon2::default();
|
||||
|
||||
let parsed_hash = PasswordHash::new(&hashed_password).map_err(|e| anyhow::anyhow!(e))?;
|
||||
match argon2.verify_password(password.as_bytes(), &parsed_hash) {
|
||||
Ok(..) => Ok(true),
|
||||
Err(..) => Ok(false),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl UserService for DefaultUserService {
|
||||
async fn add_user(
|
||||
&self,
|
||||
context: &Context,
|
||||
username: String,
|
||||
password: String,
|
||||
) -> anyhow::Result<String> {
|
||||
let hashed_password = self.hash_password(context, password)?;
|
||||
|
||||
let rec = sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO users (username, password_hash)
|
||||
VALUES ( $1, $2 )
|
||||
RETURNING id
|
||||
"#,
|
||||
username,
|
||||
hashed_password
|
||||
)
|
||||
.fetch_one(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(rec.id.to_string())
|
||||
}
|
||||
|
||||
async fn validate_user(
|
||||
&self,
|
||||
context: &Context,
|
||||
username: String,
|
||||
password: String,
|
||||
) -> anyhow::Result<Option<String>> {
|
||||
let rec = sqlx::query!(
|
||||
r#"
|
||||
SELECT * from users
|
||||
where username=$1
|
||||
"#,
|
||||
username,
|
||||
)
|
||||
.fetch_optional(&self.pool)
|
||||
.await?;
|
||||
|
||||
match rec {
|
||||
Some(user) => match self.validate_password(context, password, user.password_hash)? {
|
||||
true => Ok(Some(user.id.to_string())),
|
||||
false => Ok(None),
|
||||
},
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user