9
crates/forage-db/src/lib.rs
Normal file
9
crates/forage-db/src/lib.rs
Normal file
@@ -0,0 +1,9 @@
|
||||
mod sessions;
|
||||
|
||||
pub use sessions::PgSessionStore;
|
||||
pub use sqlx::PgPool;
|
||||
|
||||
/// Run all pending migrations.
|
||||
pub async fn migrate(pool: &PgPool) -> Result<(), sqlx::migrate::MigrateError> {
|
||||
sqlx::migrate!("src/migrations").run(pool).await
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
CREATE TABLE IF NOT EXISTS sessions (
|
||||
session_id TEXT PRIMARY KEY,
|
||||
access_token TEXT NOT NULL,
|
||||
refresh_token TEXT NOT NULL,
|
||||
access_expires_at TIMESTAMPTZ NOT NULL,
|
||||
user_id TEXT,
|
||||
username TEXT,
|
||||
user_emails JSONB,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
last_seen_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
|
||||
CREATE INDEX idx_sessions_last_seen ON sessions (last_seen_at);
|
||||
@@ -0,0 +1 @@
|
||||
ALTER TABLE sessions ADD COLUMN csrf_token TEXT NOT NULL DEFAULT '';
|
||||
163
crates/forage-db/src/sessions.rs
Normal file
163
crates/forage-db/src/sessions.rs
Normal file
@@ -0,0 +1,163 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use forage_core::auth::UserEmail;
|
||||
use forage_core::session::{CachedUser, SessionData, SessionError, SessionId, SessionStore};
|
||||
use sqlx::PgPool;
|
||||
|
||||
/// PostgreSQL-backed session store for horizontal scaling.
|
||||
pub struct PgSessionStore {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PgSessionStore {
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
|
||||
/// Remove sessions inactive for longer than `max_inactive_days`.
|
||||
pub async fn reap_expired(&self, max_inactive_days: i64) -> Result<u64, SessionError> {
|
||||
let cutoff = Utc::now() - chrono::Duration::days(max_inactive_days);
|
||||
let result = sqlx::query("DELETE FROM sessions WHERE last_seen_at < $1")
|
||||
.bind(cutoff)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| SessionError::Store(e.to_string()))?;
|
||||
Ok(result.rows_affected())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl SessionStore for PgSessionStore {
|
||||
async fn create(&self, data: SessionData) -> Result<SessionId, SessionError> {
|
||||
let id = SessionId::generate();
|
||||
let (user_id, username, emails_json) = match &data.user {
|
||||
Some(u) => (
|
||||
Some(u.user_id.clone()),
|
||||
Some(u.username.clone()),
|
||||
Some(
|
||||
serde_json::to_value(&u.emails)
|
||||
.map_err(|e| SessionError::Store(e.to_string()))?,
|
||||
),
|
||||
),
|
||||
None => (None, None, None),
|
||||
};
|
||||
|
||||
sqlx::query(
|
||||
"INSERT INTO sessions (session_id, access_token, refresh_token, access_expires_at, user_id, username, user_emails, csrf_token, created_at, last_seen_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
|
||||
)
|
||||
.bind(id.as_str())
|
||||
.bind(&data.access_token)
|
||||
.bind(&data.refresh_token)
|
||||
.bind(data.access_expires_at)
|
||||
.bind(&user_id)
|
||||
.bind(&username)
|
||||
.bind(&emails_json)
|
||||
.bind(&data.csrf_token)
|
||||
.bind(data.created_at)
|
||||
.bind(data.last_seen_at)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| SessionError::Store(e.to_string()))?;
|
||||
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
async fn get(&self, id: &SessionId) -> Result<Option<SessionData>, SessionError> {
|
||||
let row: Option<SessionRow> = sqlx::query_as(
|
||||
"SELECT access_token, refresh_token, access_expires_at, user_id, username, user_emails, csrf_token, created_at, last_seen_at
|
||||
FROM sessions WHERE session_id = $1",
|
||||
)
|
||||
.bind(id.as_str())
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| SessionError::Store(e.to_string()))?;
|
||||
|
||||
Ok(row.map(|r| r.into_session_data()))
|
||||
}
|
||||
|
||||
async fn update(&self, id: &SessionId, data: SessionData) -> Result<(), SessionError> {
|
||||
let (user_id, username, emails_json) = match &data.user {
|
||||
Some(u) => (
|
||||
Some(u.user_id.clone()),
|
||||
Some(u.username.clone()),
|
||||
Some(
|
||||
serde_json::to_value(&u.emails)
|
||||
.map_err(|e| SessionError::Store(e.to_string()))?,
|
||||
),
|
||||
),
|
||||
None => (None, None, None),
|
||||
};
|
||||
|
||||
sqlx::query(
|
||||
"UPDATE sessions SET access_token = $1, refresh_token = $2, access_expires_at = $3, user_id = $4, username = $5, user_emails = $6, csrf_token = $7, last_seen_at = $8
|
||||
WHERE session_id = $9",
|
||||
)
|
||||
.bind(&data.access_token)
|
||||
.bind(&data.refresh_token)
|
||||
.bind(data.access_expires_at)
|
||||
.bind(&user_id)
|
||||
.bind(&username)
|
||||
.bind(&emails_json)
|
||||
.bind(&data.csrf_token)
|
||||
.bind(data.last_seen_at)
|
||||
.bind(id.as_str())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| SessionError::Store(e.to_string()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete(&self, id: &SessionId) -> Result<(), SessionError> {
|
||||
sqlx::query("DELETE FROM sessions WHERE session_id = $1")
|
||||
.bind(id.as_str())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| SessionError::Store(e.to_string()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct SessionRow {
|
||||
access_token: String,
|
||||
refresh_token: String,
|
||||
access_expires_at: DateTime<Utc>,
|
||||
user_id: Option<String>,
|
||||
username: Option<String>,
|
||||
user_emails: Option<serde_json::Value>,
|
||||
csrf_token: String,
|
||||
created_at: DateTime<Utc>,
|
||||
last_seen_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl SessionRow {
|
||||
fn into_session_data(self) -> SessionData {
|
||||
let user = match (self.user_id, self.username) {
|
||||
(Some(user_id), Some(username)) => {
|
||||
let emails: Vec<UserEmail> = self
|
||||
.user_emails
|
||||
.and_then(|v| serde_json::from_value(v).ok())
|
||||
.unwrap_or_default();
|
||||
Some(CachedUser {
|
||||
user_id,
|
||||
username,
|
||||
emails,
|
||||
orgs: vec![],
|
||||
})
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
|
||||
SessionData {
|
||||
access_token: self.access_token,
|
||||
refresh_token: self.refresh_token,
|
||||
access_expires_at: self.access_expires_at,
|
||||
user,
|
||||
csrf_token: self.csrf_token,
|
||||
created_at: self.created_at,
|
||||
last_seen_at: self.last_seen_at,
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user