feat: add post3 s3 proxy for postgresql
Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
37
crates/post3-server/Cargo.toml
Normal file
37
crates/post3-server/Cargo.toml
Normal file
@@ -0,0 +1,37 @@
|
||||
[package]
|
||||
name = "post3-server"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
|
||||
[dependencies]
|
||||
post3.workspace = true
|
||||
|
||||
anyhow.workspace = true
|
||||
tokio.workspace = true
|
||||
tracing.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
clap.workspace = true
|
||||
dotenvy.workspace = true
|
||||
uuid.workspace = true
|
||||
bytes.workspace = true
|
||||
axum.workspace = true
|
||||
tower.workspace = true
|
||||
tower-http.workspace = true
|
||||
notmad.workspace = true
|
||||
tokio-util.workspace = true
|
||||
sqlx.workspace = true
|
||||
chrono.workspace = true
|
||||
quick-xml.workspace = true
|
||||
md-5.workspace = true
|
||||
hex.workspace = true
|
||||
serde.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
aws-config = "1"
|
||||
aws-sdk-s3 = "1"
|
||||
aws-credential-types = "1"
|
||||
aws-types = "1"
|
||||
tokio = { workspace = true, features = ["test-util"] }
|
||||
tower.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
tempfile.workspace = true
|
||||
58
crates/post3-server/src/cli.rs
Normal file
58
crates/post3-server/src/cli.rs
Normal file
@@ -0,0 +1,58 @@
|
||||
pub mod serve;
|
||||
|
||||
use anyhow::Context;
|
||||
use clap::{Parser, Subcommand};
|
||||
use post3::{FilesystemBackend, PostgresBackend};
|
||||
use sqlx::PgPool;
|
||||
|
||||
use crate::state::State;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(name = "post3-server", about = "S3-compatible storage server")]
|
||||
struct App {
|
||||
#[command(subcommand)]
|
||||
command: Commands,
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum Commands {
|
||||
Serve(serve::ServeCommand),
|
||||
}
|
||||
|
||||
pub async fn execute() -> anyhow::Result<()> {
|
||||
let app = App::parse();
|
||||
|
||||
match app.command {
|
||||
Commands::Serve(cmd) => match cmd.backend {
|
||||
serve::BackendType::Pg => {
|
||||
let database_url =
|
||||
std::env::var("DATABASE_URL").context("DATABASE_URL not set")?;
|
||||
let pool = PgPool::connect(&database_url).await?;
|
||||
|
||||
sqlx::migrate!("../post3/migrations/")
|
||||
.set_locking(false)
|
||||
.run(&pool)
|
||||
.await?;
|
||||
|
||||
tracing::info!("database migrations applied");
|
||||
|
||||
let backend = PostgresBackend::new(pool);
|
||||
let state = State { store: backend };
|
||||
cmd.run(&state).await
|
||||
}
|
||||
serve::BackendType::Fs => {
|
||||
let data_dir = cmd
|
||||
.data_dir
|
||||
.as_ref()
|
||||
.context("--data-dir is required when using --backend fs")?;
|
||||
|
||||
std::fs::create_dir_all(data_dir)?;
|
||||
tracing::info!(path = %data_dir.display(), "using filesystem backend");
|
||||
|
||||
let backend = FilesystemBackend::new(data_dir);
|
||||
let state = State { store: backend };
|
||||
cmd.run(&state).await
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
44
crates/post3-server/src/cli/serve.rs
Normal file
44
crates/post3-server/src/cli/serve.rs
Normal file
@@ -0,0 +1,44 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use clap::{Parser, ValueEnum};
|
||||
use post3::StorageBackend;
|
||||
|
||||
use crate::s3::S3Server;
|
||||
use crate::state::State;
|
||||
|
||||
#[derive(Clone, ValueEnum)]
|
||||
pub enum BackendType {
|
||||
/// PostgreSQL backend (requires DATABASE_URL)
|
||||
Pg,
|
||||
/// Local filesystem backend
|
||||
Fs,
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
pub struct ServeCommand {
|
||||
#[arg(long, env = "POST3_HOST", default_value = "127.0.0.1:9000")]
|
||||
pub host: SocketAddr,
|
||||
|
||||
/// Storage backend to use
|
||||
#[arg(long, default_value = "pg")]
|
||||
pub backend: BackendType,
|
||||
|
||||
/// Data directory for filesystem backend
|
||||
#[arg(long)]
|
||||
pub data_dir: Option<PathBuf>,
|
||||
}
|
||||
|
||||
impl ServeCommand {
|
||||
pub async fn run<B: StorageBackend>(&self, state: &State<B>) -> anyhow::Result<()> {
|
||||
notmad::Mad::builder()
|
||||
.add(S3Server {
|
||||
host: self.host,
|
||||
state: state.clone(),
|
||||
})
|
||||
.run()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
2
crates/post3-server/src/lib.rs
Normal file
2
crates/post3-server/src/lib.rs
Normal file
@@ -0,0 +1,2 @@
|
||||
pub mod s3;
|
||||
pub mod state;
|
||||
18
crates/post3-server/src/main.rs
Normal file
18
crates/post3-server/src/main.rs
Normal file
@@ -0,0 +1,18 @@
|
||||
mod cli;
|
||||
pub mod s3;
|
||||
pub mod state;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
dotenvy::dotenv().ok();
|
||||
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(
|
||||
tracing_subscriber::EnvFilter::from_default_env()
|
||||
.add_directive("post3_server=debug".parse()?)
|
||||
.add_directive("post3=debug".parse()?),
|
||||
)
|
||||
.init();
|
||||
|
||||
cli::execute().await
|
||||
}
|
||||
55
crates/post3-server/src/s3/extractors.rs
Normal file
55
crates/post3-server/src/s3/extractors.rs
Normal file
@@ -0,0 +1,55 @@
|
||||
use serde::Deserialize;
|
||||
|
||||
/// Query params for GET /{bucket} — dispatches between ListObjectsV2, ListMultipartUploads,
|
||||
/// ListObjectVersions, and GetBucketLocation.
|
||||
#[derive(Debug, Default, Deserialize)]
|
||||
pub struct BucketGetQuery {
|
||||
/// Presence of `?uploads` signals ListMultipartUploads
|
||||
pub uploads: Option<String>,
|
||||
/// Presence of `?versions` signals ListObjectVersions
|
||||
pub versions: Option<String>,
|
||||
/// Presence of `?location` signals GetBucketLocation
|
||||
pub location: Option<String>,
|
||||
#[serde(rename = "list-type")]
|
||||
pub list_type: Option<i32>,
|
||||
pub prefix: Option<String>,
|
||||
#[serde(rename = "max-keys")]
|
||||
pub max_keys: Option<i64>,
|
||||
#[serde(rename = "continuation-token")]
|
||||
pub continuation_token: Option<String>,
|
||||
#[serde(rename = "start-after")]
|
||||
pub start_after: Option<String>,
|
||||
/// ListObjects v1 pagination marker
|
||||
pub marker: Option<String>,
|
||||
pub delimiter: Option<String>,
|
||||
#[serde(rename = "encoding-type")]
|
||||
pub encoding_type: Option<String>,
|
||||
#[serde(rename = "key-marker")]
|
||||
pub key_marker: Option<String>,
|
||||
#[serde(rename = "upload-id-marker")]
|
||||
pub upload_id_marker: Option<String>,
|
||||
#[serde(rename = "max-uploads")]
|
||||
pub max_uploads: Option<i32>,
|
||||
}
|
||||
|
||||
/// Query params for POST /{bucket} — dispatches between DeleteObjects and other ops.
|
||||
#[derive(Debug, Default, Deserialize)]
|
||||
pub struct BucketPostQuery {
|
||||
/// Presence of `?delete` signals DeleteObjects
|
||||
pub delete: Option<String>,
|
||||
}
|
||||
|
||||
/// Query params for /{bucket}/{*key} dispatchers (PUT, GET, DELETE, POST).
|
||||
#[derive(Debug, Default, Deserialize)]
|
||||
pub struct ObjectKeyQuery {
|
||||
#[serde(rename = "uploadId")]
|
||||
pub upload_id: Option<String>,
|
||||
#[serde(rename = "partNumber")]
|
||||
pub part_number: Option<i32>,
|
||||
/// Presence of `?uploads` signals CreateMultipartUpload (POST only)
|
||||
pub uploads: Option<String>,
|
||||
#[serde(rename = "max-parts")]
|
||||
pub max_parts: Option<i32>,
|
||||
#[serde(rename = "part-number-marker")]
|
||||
pub part_number_marker: Option<i32>,
|
||||
}
|
||||
187
crates/post3-server/src/s3/handlers/buckets.rs
Normal file
187
crates/post3-server/src/s3/handlers/buckets.rs
Normal file
@@ -0,0 +1,187 @@
|
||||
use axum::{
|
||||
extract::{Path, State},
|
||||
http::StatusCode,
|
||||
response::IntoResponse,
|
||||
};
|
||||
use post3::{Post3Error, StorageBackend};
|
||||
|
||||
use crate::s3::responses;
|
||||
use crate::state::State as AppState;
|
||||
|
||||
fn is_valid_bucket_name(name: &str) -> bool {
|
||||
let len = name.len();
|
||||
if len < 3 || len > 63 {
|
||||
return false;
|
||||
}
|
||||
// Must contain only lowercase letters, numbers, hyphens, and periods
|
||||
if !name
|
||||
.bytes()
|
||||
.all(|b| b.is_ascii_lowercase() || b.is_ascii_digit() || b == b'-' || b == b'.')
|
||||
{
|
||||
return false;
|
||||
}
|
||||
// Must start and end with a letter or number
|
||||
let first = name.as_bytes()[0];
|
||||
let last = name.as_bytes()[len - 1];
|
||||
if !(first.is_ascii_lowercase() || first.is_ascii_digit()) {
|
||||
return false;
|
||||
}
|
||||
if !(last.is_ascii_lowercase() || last.is_ascii_digit()) {
|
||||
return false;
|
||||
}
|
||||
// Must not be formatted as an IP address
|
||||
if name.split('.').count() == 4
|
||||
&& name
|
||||
.split('.')
|
||||
.all(|part| part.parse::<u8>().is_ok())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
pub async fn create_bucket<B: StorageBackend>(
|
||||
State(state): State<AppState<B>>,
|
||||
Path(bucket): Path<String>,
|
||||
) -> impl IntoResponse {
|
||||
if !is_valid_bucket_name(&bucket) {
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
[("Content-Type", "application/xml".to_string())],
|
||||
responses::error_xml(
|
||||
"InvalidBucketName",
|
||||
"The specified bucket is not valid.",
|
||||
&bucket,
|
||||
),
|
||||
)
|
||||
.into_response();
|
||||
}
|
||||
|
||||
match state.store.create_bucket(&bucket).await {
|
||||
Ok(_) => (
|
||||
StatusCode::OK,
|
||||
[
|
||||
("Location", format!("/{bucket}")),
|
||||
(
|
||||
"x-amz-request-id",
|
||||
uuid::Uuid::new_v4().to_string(),
|
||||
),
|
||||
],
|
||||
)
|
||||
.into_response(),
|
||||
Err(Post3Error::BucketAlreadyExists(_)) => (
|
||||
StatusCode::CONFLICT,
|
||||
[("Content-Type", "application/xml".to_string())],
|
||||
responses::error_xml(
|
||||
"BucketAlreadyOwnedByYou",
|
||||
"Your previous request to create the named bucket succeeded and you already own it.",
|
||||
&bucket,
|
||||
),
|
||||
)
|
||||
.into_response(),
|
||||
Err(e) => {
|
||||
tracing::error!("create_bucket error: {e}");
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
[("Content-Type", "application/xml".to_string())],
|
||||
responses::error_xml("InternalError", &e.to_string(), &bucket),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn head_bucket<B: StorageBackend>(
|
||||
State(state): State<AppState<B>>,
|
||||
Path(bucket): Path<String>,
|
||||
) -> impl IntoResponse {
|
||||
match state.store.head_bucket(&bucket).await {
|
||||
Ok(Some(_)) => (
|
||||
StatusCode::OK,
|
||||
[
|
||||
("x-amz-request-id", uuid::Uuid::new_v4().to_string()),
|
||||
("x-amz-bucket-region", "us-east-1".to_string()),
|
||||
],
|
||||
)
|
||||
.into_response(),
|
||||
Ok(None) => (
|
||||
StatusCode::NOT_FOUND,
|
||||
[("x-amz-request-id", uuid::Uuid::new_v4().to_string())],
|
||||
)
|
||||
.into_response(),
|
||||
Err(e) => {
|
||||
tracing::error!("head_bucket error: {e}");
|
||||
StatusCode::INTERNAL_SERVER_ERROR.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn delete_bucket<B: StorageBackend>(
|
||||
State(state): State<AppState<B>>,
|
||||
Path(bucket): Path<String>,
|
||||
) -> impl IntoResponse {
|
||||
match state.store.delete_bucket(&bucket).await {
|
||||
Ok(()) => (
|
||||
StatusCode::NO_CONTENT,
|
||||
[("x-amz-request-id", uuid::Uuid::new_v4().to_string())],
|
||||
)
|
||||
.into_response(),
|
||||
Err(Post3Error::BucketNotFound(_)) => (
|
||||
StatusCode::NOT_FOUND,
|
||||
[("Content-Type", "application/xml".to_string())],
|
||||
responses::error_xml(
|
||||
"NoSuchBucket",
|
||||
"The specified bucket does not exist",
|
||||
&bucket,
|
||||
),
|
||||
)
|
||||
.into_response(),
|
||||
Err(Post3Error::BucketNotEmpty(_)) => (
|
||||
StatusCode::CONFLICT,
|
||||
[("Content-Type", "application/xml".to_string())],
|
||||
responses::error_xml(
|
||||
"BucketNotEmpty",
|
||||
"The bucket you tried to delete is not empty",
|
||||
&bucket,
|
||||
),
|
||||
)
|
||||
.into_response(),
|
||||
Err(e) => {
|
||||
tracing::error!("delete_bucket error: {e}");
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
[("Content-Type", "application/xml".to_string())],
|
||||
responses::error_xml("InternalError", &e.to_string(), &bucket),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn list_buckets<B: StorageBackend>(
|
||||
State(state): State<AppState<B>>,
|
||||
) -> impl IntoResponse {
|
||||
match state.store.list_buckets().await {
|
||||
Ok(buckets) => (
|
||||
StatusCode::OK,
|
||||
[
|
||||
("Content-Type", "application/xml".to_string()),
|
||||
(
|
||||
"x-amz-request-id",
|
||||
uuid::Uuid::new_v4().to_string(),
|
||||
),
|
||||
],
|
||||
responses::list_buckets_xml(&buckets),
|
||||
)
|
||||
.into_response(),
|
||||
Err(e) => {
|
||||
tracing::error!("list_buckets error: {e}");
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
[("Content-Type", "application/xml".to_string())],
|
||||
responses::error_xml("InternalError", &e.to_string(), "/"),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
3
crates/post3-server/src/s3/handlers/mod.rs
Normal file
3
crates/post3-server/src/s3/handlers/mod.rs
Normal file
@@ -0,0 +1,3 @@
|
||||
pub mod buckets;
|
||||
pub mod multipart;
|
||||
pub mod objects;
|
||||
509
crates/post3-server/src/s3/handlers/multipart.rs
Normal file
509
crates/post3-server/src/s3/handlers/multipart.rs
Normal file
@@ -0,0 +1,509 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use axum::{
|
||||
extract::{Path, Query, State},
|
||||
http::{HeaderMap, HeaderValue, StatusCode},
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use post3::{Post3Error, StorageBackend};
|
||||
|
||||
use crate::s3::extractors::{BucketGetQuery, ObjectKeyQuery};
|
||||
use crate::s3::responses;
|
||||
use crate::state::State as AppState;
|
||||
|
||||
pub async fn create_multipart_upload<B: StorageBackend>(
|
||||
State(state): State<AppState<B>>,
|
||||
Path((bucket, key)): Path<(String, String)>,
|
||||
headers: HeaderMap,
|
||||
) -> Response {
|
||||
let content_type = headers
|
||||
.get("content-type")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(|s| s.to_string());
|
||||
|
||||
let mut metadata = HashMap::new();
|
||||
for (name, value) in headers.iter() {
|
||||
let name_str = name.as_str();
|
||||
if let Some(meta_key) = name_str.strip_prefix("x-amz-meta-") {
|
||||
if let Ok(v) = value.to_str() {
|
||||
metadata.insert(meta_key.to_string(), v.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match state
|
||||
.store
|
||||
.create_multipart_upload(&bucket, &key, content_type.as_deref(), metadata)
|
||||
.await
|
||||
{
|
||||
Ok(result) => {
|
||||
let mut response_headers = HeaderMap::new();
|
||||
response_headers
|
||||
.insert("Content-Type", HeaderValue::from_static("application/xml"));
|
||||
response_headers.insert(
|
||||
"x-amz-request-id",
|
||||
HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
|
||||
);
|
||||
(
|
||||
StatusCode::OK,
|
||||
response_headers,
|
||||
responses::initiate_multipart_upload_xml(
|
||||
&result.bucket,
|
||||
&result.key,
|
||||
&result.upload_id,
|
||||
),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
Err(Post3Error::BucketNotFound(b)) => (
|
||||
StatusCode::NOT_FOUND,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"NoSuchBucket",
|
||||
"The specified bucket does not exist",
|
||||
&b,
|
||||
),
|
||||
)
|
||||
.into_response(),
|
||||
Err(e) => {
|
||||
tracing::error!("create_multipart_upload error: {e}");
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"InternalError",
|
||||
&e.to_string(),
|
||||
&format!("/{bucket}/{key}"),
|
||||
),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn upload_part<B: StorageBackend>(
|
||||
State(state): State<AppState<B>>,
|
||||
Path((bucket, key)): Path<(String, String)>,
|
||||
Query(query): Query<ObjectKeyQuery>,
|
||||
body: Bytes,
|
||||
) -> Response {
|
||||
let upload_id = match &query.upload_id {
|
||||
Some(id) => id.clone(),
|
||||
None => {
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"InvalidRequest",
|
||||
"Missing uploadId parameter",
|
||||
&format!("/{bucket}/{key}"),
|
||||
),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
};
|
||||
|
||||
let part_number = match query.part_number {
|
||||
Some(n) => n,
|
||||
None => {
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"InvalidRequest",
|
||||
"Missing partNumber parameter",
|
||||
&format!("/{bucket}/{key}"),
|
||||
),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
};
|
||||
|
||||
match state
|
||||
.store
|
||||
.upload_part(&bucket, &key, &upload_id, part_number, body)
|
||||
.await
|
||||
{
|
||||
Ok(result) => {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("ETag", result.etag.parse().unwrap());
|
||||
headers.insert(
|
||||
"x-amz-request-id",
|
||||
HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
|
||||
);
|
||||
(StatusCode::OK, headers).into_response()
|
||||
}
|
||||
Err(Post3Error::UploadNotFound(id)) => (
|
||||
StatusCode::NOT_FOUND,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"NoSuchUpload",
|
||||
"The specified multipart upload does not exist",
|
||||
&id,
|
||||
),
|
||||
)
|
||||
.into_response(),
|
||||
Err(Post3Error::BucketNotFound(b)) => (
|
||||
StatusCode::NOT_FOUND,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"NoSuchBucket",
|
||||
"The specified bucket does not exist",
|
||||
&b,
|
||||
),
|
||||
)
|
||||
.into_response(),
|
||||
Err(e) => {
|
||||
tracing::error!("upload_part error: {e}");
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"InternalError",
|
||||
&e.to_string(),
|
||||
&format!("/{bucket}/{key}"),
|
||||
),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn complete_multipart_upload<B: StorageBackend>(
|
||||
State(state): State<AppState<B>>,
|
||||
Path((bucket, key)): Path<(String, String)>,
|
||||
Query(query): Query<ObjectKeyQuery>,
|
||||
body: Bytes,
|
||||
) -> Response {
|
||||
let upload_id = match &query.upload_id {
|
||||
Some(id) => id.clone(),
|
||||
None => {
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"InvalidRequest",
|
||||
"Missing uploadId parameter",
|
||||
&format!("/{bucket}/{key}"),
|
||||
),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
};
|
||||
|
||||
let part_etags = match responses::parse_complete_multipart_xml(&body) {
|
||||
Ok(parts) => parts,
|
||||
Err(msg) => {
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml("MalformedXML", &msg, &format!("/{bucket}/{key}")),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
};
|
||||
|
||||
match state
|
||||
.store
|
||||
.complete_multipart_upload(&bucket, &key, &upload_id, part_etags)
|
||||
.await
|
||||
{
|
||||
Ok(result) => {
|
||||
let location = format!("/{}/{}", bucket, key);
|
||||
let mut headers = HeaderMap::new();
|
||||
headers
|
||||
.insert("Content-Type", HeaderValue::from_static("application/xml"));
|
||||
headers.insert(
|
||||
"x-amz-request-id",
|
||||
HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
|
||||
);
|
||||
(
|
||||
StatusCode::OK,
|
||||
headers,
|
||||
responses::complete_multipart_upload_xml(
|
||||
&location,
|
||||
&result.bucket,
|
||||
&result.key,
|
||||
&result.etag,
|
||||
),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
Err(Post3Error::UploadNotFound(id)) => (
|
||||
StatusCode::NOT_FOUND,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"NoSuchUpload",
|
||||
"The specified multipart upload does not exist",
|
||||
&id,
|
||||
),
|
||||
)
|
||||
.into_response(),
|
||||
Err(Post3Error::InvalidPart {
|
||||
upload_id: _,
|
||||
part_number,
|
||||
}) => (
|
||||
StatusCode::BAD_REQUEST,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"InvalidPart",
|
||||
&format!("Part {part_number} not found or not uploaded"),
|
||||
&format!("/{bucket}/{key}"),
|
||||
),
|
||||
)
|
||||
.into_response(),
|
||||
Err(Post3Error::ETagMismatch {
|
||||
part_number,
|
||||
expected,
|
||||
got,
|
||||
}) => (
|
||||
StatusCode::BAD_REQUEST,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"InvalidPart",
|
||||
&format!(
|
||||
"ETag mismatch for part {part_number}: expected {expected}, got {got}"
|
||||
),
|
||||
&format!("/{bucket}/{key}"),
|
||||
),
|
||||
)
|
||||
.into_response(),
|
||||
Err(Post3Error::InvalidPartOrder) => (
|
||||
StatusCode::BAD_REQUEST,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"InvalidPartOrder",
|
||||
"Parts must be in ascending order",
|
||||
&format!("/{bucket}/{key}"),
|
||||
),
|
||||
)
|
||||
.into_response(),
|
||||
Err(Post3Error::EntityTooSmall {
|
||||
part_number,
|
||||
size,
|
||||
}) => (
|
||||
StatusCode::BAD_REQUEST,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"EntityTooSmall",
|
||||
&format!(
|
||||
"Your proposed upload is smaller than the minimum allowed size. Part {part_number} has size {size}."
|
||||
),
|
||||
&format!("/{bucket}/{key}"),
|
||||
),
|
||||
)
|
||||
.into_response(),
|
||||
Err(Post3Error::BucketNotFound(b)) => (
|
||||
StatusCode::NOT_FOUND,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"NoSuchBucket",
|
||||
"The specified bucket does not exist",
|
||||
&b,
|
||||
),
|
||||
)
|
||||
.into_response(),
|
||||
Err(e) => {
|
||||
tracing::error!("complete_multipart_upload error: {e}");
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"InternalError",
|
||||
&e.to_string(),
|
||||
&format!("/{bucket}/{key}"),
|
||||
),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn abort_multipart_upload<B: StorageBackend>(
|
||||
State(state): State<AppState<B>>,
|
||||
Path((bucket, key)): Path<(String, String)>,
|
||||
Query(query): Query<ObjectKeyQuery>,
|
||||
) -> Response {
|
||||
let upload_id = match &query.upload_id {
|
||||
Some(id) => id.clone(),
|
||||
None => {
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"InvalidRequest",
|
||||
"Missing uploadId parameter",
|
||||
&format!("/{bucket}/{key}"),
|
||||
),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
};
|
||||
|
||||
match state
|
||||
.store
|
||||
.abort_multipart_upload(&bucket, &key, &upload_id)
|
||||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(
|
||||
"x-amz-request-id",
|
||||
HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
|
||||
);
|
||||
(StatusCode::NO_CONTENT, headers).into_response()
|
||||
}
|
||||
Err(Post3Error::UploadNotFound(id)) => (
|
||||
StatusCode::NOT_FOUND,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"NoSuchUpload",
|
||||
"The specified multipart upload does not exist",
|
||||
&id,
|
||||
),
|
||||
)
|
||||
.into_response(),
|
||||
Err(e) => {
|
||||
tracing::error!("abort_multipart_upload error: {e}");
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"InternalError",
|
||||
&e.to_string(),
|
||||
&format!("/{bucket}/{key}"),
|
||||
),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn list_parts<B: StorageBackend>(
|
||||
State(state): State<AppState<B>>,
|
||||
Path((bucket, key)): Path<(String, String)>,
|
||||
Query(query): Query<ObjectKeyQuery>,
|
||||
) -> Response {
|
||||
let upload_id = match &query.upload_id {
|
||||
Some(id) => id.clone(),
|
||||
None => {
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"InvalidRequest",
|
||||
"Missing uploadId parameter",
|
||||
&format!("/{bucket}/{key}"),
|
||||
),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
};
|
||||
|
||||
match state
|
||||
.store
|
||||
.list_parts(
|
||||
&bucket,
|
||||
&key,
|
||||
&upload_id,
|
||||
query.max_parts,
|
||||
query.part_number_marker,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result) => {
|
||||
let max_parts = query.max_parts.unwrap_or(1000);
|
||||
let mut headers = HeaderMap::new();
|
||||
headers
|
||||
.insert("Content-Type", HeaderValue::from_static("application/xml"));
|
||||
headers.insert(
|
||||
"x-amz-request-id",
|
||||
HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
|
||||
);
|
||||
(
|
||||
StatusCode::OK,
|
||||
headers,
|
||||
responses::list_parts_xml(&result, max_parts),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
Err(Post3Error::UploadNotFound(id)) => (
|
||||
StatusCode::NOT_FOUND,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"NoSuchUpload",
|
||||
"The specified multipart upload does not exist",
|
||||
&id,
|
||||
),
|
||||
)
|
||||
.into_response(),
|
||||
Err(e) => {
|
||||
tracing::error!("list_parts error: {e}");
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"InternalError",
|
||||
&e.to_string(),
|
||||
&format!("/{bucket}/{key}"),
|
||||
),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn list_multipart_uploads<B: StorageBackend>(
|
||||
State(state): State<AppState<B>>,
|
||||
Path(bucket): Path<String>,
|
||||
Query(query): Query<BucketGetQuery>,
|
||||
) -> Response {
|
||||
match state
|
||||
.store
|
||||
.list_multipart_uploads(
|
||||
&bucket,
|
||||
query.prefix.as_deref(),
|
||||
query.key_marker.as_deref(),
|
||||
query.upload_id_marker.as_deref(),
|
||||
query.max_uploads,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result) => {
|
||||
let max_uploads = query.max_uploads.unwrap_or(1000);
|
||||
let mut headers = HeaderMap::new();
|
||||
headers
|
||||
.insert("Content-Type", HeaderValue::from_static("application/xml"));
|
||||
headers.insert(
|
||||
"x-amz-request-id",
|
||||
HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
|
||||
);
|
||||
(
|
||||
StatusCode::OK,
|
||||
headers,
|
||||
responses::list_multipart_uploads_xml(&result, max_uploads),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
Err(Post3Error::BucketNotFound(b)) => (
|
||||
StatusCode::NOT_FOUND,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"NoSuchBucket",
|
||||
"The specified bucket does not exist",
|
||||
&b,
|
||||
),
|
||||
)
|
||||
.into_response(),
|
||||
Err(e) => {
|
||||
tracing::error!("list_multipart_uploads error: {e}");
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml("InternalError", &e.to_string(), &bucket),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
598
crates/post3-server/src/s3/handlers/objects.rs
Normal file
598
crates/post3-server/src/s3/handlers/objects.rs
Normal file
@@ -0,0 +1,598 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use axum::{
|
||||
body::Body,
|
||||
extract::{Path, Query, State},
|
||||
http::{header::HeaderName, HeaderMap, HeaderValue, StatusCode},
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use post3::{Post3Error, StorageBackend};
|
||||
|
||||
use crate::s3::extractors::{BucketGetQuery, ObjectKeyQuery};
|
||||
use crate::s3::handlers::multipart;
|
||||
use crate::s3::responses;
|
||||
use crate::state::State as AppState;
|
||||
|
||||
// --- Dispatch functions ---
|
||||
|
||||
/// PUT /{bucket}/{*key} — dispatches to upload_part or put_object based on query params.
|
||||
pub async fn put_dispatch<B: StorageBackend>(
|
||||
state: State<AppState<B>>,
|
||||
path: Path<(String, String)>,
|
||||
query: Query<ObjectKeyQuery>,
|
||||
headers: HeaderMap,
|
||||
body: Bytes,
|
||||
) -> Response {
|
||||
if query.upload_id.is_some() && query.part_number.is_some() {
|
||||
multipart::upload_part(state, path, query, body).await
|
||||
} else {
|
||||
put_object(state, path, headers, body).await
|
||||
}
|
||||
}
|
||||
|
||||
/// GET /{bucket}/{*key} — dispatches to list_parts or get_object based on query params.
|
||||
pub async fn get_dispatch<B: StorageBackend>(
|
||||
state: State<AppState<B>>,
|
||||
path: Path<(String, String)>,
|
||||
query: Query<ObjectKeyQuery>,
|
||||
) -> Response {
|
||||
if query.upload_id.is_some() {
|
||||
multipart::list_parts(state, path, query).await
|
||||
} else {
|
||||
get_object(state, path).await
|
||||
}
|
||||
}
|
||||
|
||||
/// DELETE /{bucket}/{*key} — dispatches to abort_multipart_upload or delete_object.
|
||||
pub async fn delete_dispatch<B: StorageBackend>(
|
||||
state: State<AppState<B>>,
|
||||
path: Path<(String, String)>,
|
||||
query: Query<ObjectKeyQuery>,
|
||||
) -> Response {
|
||||
if query.upload_id.is_some() {
|
||||
multipart::abort_multipart_upload(state, path, query).await
|
||||
} else {
|
||||
delete_object(state, path).await
|
||||
}
|
||||
}
|
||||
|
||||
/// POST /{bucket}/{*key} — dispatches to create_multipart_upload or complete_multipart_upload.
|
||||
pub async fn post_dispatch<B: StorageBackend>(
|
||||
state: State<AppState<B>>,
|
||||
path: Path<(String, String)>,
|
||||
query: Query<ObjectKeyQuery>,
|
||||
headers: HeaderMap,
|
||||
body: Bytes,
|
||||
) -> Response {
|
||||
if query.uploads.is_some() {
|
||||
multipart::create_multipart_upload(state, path, headers).await
|
||||
} else if query.upload_id.is_some() {
|
||||
multipart::complete_multipart_upload(state, path, query, body).await
|
||||
} else {
|
||||
(
|
||||
StatusCode::BAD_REQUEST,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"InvalidRequest",
|
||||
"POST requires ?uploads or ?uploadId parameter",
|
||||
&format!("/{}/{}", path.0 .0, path.0 .1),
|
||||
),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
}
|
||||
|
||||
// --- Object handlers ---
|
||||
|
||||
pub async fn put_object<B: StorageBackend>(
|
||||
State(state): State<AppState<B>>,
|
||||
Path((bucket, key)): Path<(String, String)>,
|
||||
headers: HeaderMap,
|
||||
body: Bytes,
|
||||
) -> Response {
|
||||
let content_type = headers
|
||||
.get("content-type")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(|s| s.to_string());
|
||||
|
||||
// Extract x-amz-meta-* user metadata
|
||||
let mut metadata = HashMap::new();
|
||||
for (name, value) in headers.iter() {
|
||||
let name_str = name.as_str();
|
||||
if let Some(meta_key) = name_str.strip_prefix("x-amz-meta-") {
|
||||
if let Ok(v) = value.to_str() {
|
||||
metadata.insert(meta_key.to_string(), v.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match state
|
||||
.store
|
||||
.put_object(&bucket, &key, content_type.as_deref(), metadata, body)
|
||||
.await
|
||||
{
|
||||
Ok(result) => {
|
||||
let mut response_headers = HeaderMap::new();
|
||||
response_headers.insert("ETag", result.etag.parse().unwrap());
|
||||
response_headers.insert(
|
||||
"x-amz-request-id",
|
||||
HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
|
||||
);
|
||||
(StatusCode::OK, response_headers).into_response()
|
||||
}
|
||||
Err(Post3Error::BucketNotFound(b)) => (
|
||||
StatusCode::NOT_FOUND,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"NoSuchBucket",
|
||||
"The specified bucket does not exist",
|
||||
&b,
|
||||
),
|
||||
)
|
||||
.into_response(),
|
||||
Err(e) => {
|
||||
tracing::error!("put_object error: {e}");
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"InternalError",
|
||||
&e.to_string(),
|
||||
&format!("/{bucket}/{key}"),
|
||||
),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_object<B: StorageBackend>(
|
||||
State(state): State<AppState<B>>,
|
||||
Path((bucket, key)): Path<(String, String)>,
|
||||
) -> Response {
|
||||
match state.store.get_object(&bucket, &key).await {
|
||||
Ok(result) => {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(
|
||||
"Content-Type",
|
||||
HeaderValue::from_str(&result.metadata.content_type).unwrap(),
|
||||
);
|
||||
headers.insert(
|
||||
"Content-Length",
|
||||
HeaderValue::from_str(&result.metadata.size.to_string()).unwrap(),
|
||||
);
|
||||
headers.insert("ETag", HeaderValue::from_str(&result.metadata.etag).unwrap());
|
||||
headers.insert(
|
||||
"Last-Modified",
|
||||
HeaderValue::from_str(
|
||||
&result
|
||||
.metadata
|
||||
.last_modified
|
||||
.format("%a, %d %b %Y %H:%M:%S GMT")
|
||||
.to_string(),
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
headers.insert(
|
||||
"x-amz-request-id",
|
||||
HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
|
||||
);
|
||||
|
||||
// Return user metadata as x-amz-meta-* headers
|
||||
for (k, v) in &result.user_metadata {
|
||||
let header_name = format!("x-amz-meta-{k}");
|
||||
if let (Ok(name), Ok(val)) = (
|
||||
header_name.parse::<HeaderName>(),
|
||||
HeaderValue::from_str(v),
|
||||
) {
|
||||
headers.insert(name, val);
|
||||
}
|
||||
}
|
||||
|
||||
(StatusCode::OK, headers, Body::from(result.body)).into_response()
|
||||
}
|
||||
Err(Post3Error::BucketNotFound(b)) => (
|
||||
StatusCode::NOT_FOUND,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"NoSuchBucket",
|
||||
"The specified bucket does not exist",
|
||||
&b,
|
||||
),
|
||||
)
|
||||
.into_response(),
|
||||
Err(Post3Error::ObjectNotFound { bucket: b, key: k }) => (
|
||||
StatusCode::NOT_FOUND,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"NoSuchKey",
|
||||
"The specified key does not exist.",
|
||||
&format!("/{b}/{k}"),
|
||||
),
|
||||
)
|
||||
.into_response(),
|
||||
Err(e) => {
|
||||
tracing::error!("get_object error: {e}");
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"InternalError",
|
||||
&e.to_string(),
|
||||
&format!("/{bucket}/{key}"),
|
||||
),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn head_object<B: StorageBackend>(
|
||||
State(state): State<AppState<B>>,
|
||||
Path((bucket, key)): Path<(String, String)>,
|
||||
) -> Response {
|
||||
match state.store.head_object(&bucket, &key).await {
|
||||
Ok(Some(result)) => {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(
|
||||
"Content-Type",
|
||||
HeaderValue::from_str(&result.object.content_type).unwrap(),
|
||||
);
|
||||
headers.insert(
|
||||
"Content-Length",
|
||||
HeaderValue::from_str(&result.object.size.to_string()).unwrap(),
|
||||
);
|
||||
headers.insert("ETag", HeaderValue::from_str(&result.object.etag).unwrap());
|
||||
headers.insert(
|
||||
"Last-Modified",
|
||||
HeaderValue::from_str(
|
||||
&result
|
||||
.object
|
||||
.last_modified
|
||||
.format("%a, %d %b %Y %H:%M:%S GMT")
|
||||
.to_string(),
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
headers.insert(
|
||||
"x-amz-request-id",
|
||||
HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
|
||||
);
|
||||
|
||||
for (k, v) in &result.user_metadata {
|
||||
let header_name = format!("x-amz-meta-{k}");
|
||||
if let (Ok(name), Ok(val)) = (
|
||||
header_name.parse::<HeaderName>(),
|
||||
HeaderValue::from_str(v),
|
||||
) {
|
||||
headers.insert(name, val);
|
||||
}
|
||||
}
|
||||
|
||||
(StatusCode::OK, headers).into_response()
|
||||
}
|
||||
Ok(None) => {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(
|
||||
"x-amz-request-id",
|
||||
HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
|
||||
);
|
||||
(StatusCode::NOT_FOUND, headers).into_response()
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("head_object error: {e}");
|
||||
StatusCode::INTERNAL_SERVER_ERROR.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn delete_object<B: StorageBackend>(
|
||||
State(state): State<AppState<B>>,
|
||||
Path((bucket, key)): Path<(String, String)>,
|
||||
) -> Response {
|
||||
match state.store.delete_object(&bucket, &key).await {
|
||||
Ok(()) => {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(
|
||||
"x-amz-request-id",
|
||||
HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
|
||||
);
|
||||
(StatusCode::NO_CONTENT, headers).into_response()
|
||||
}
|
||||
Err(Post3Error::BucketNotFound(b)) => (
|
||||
StatusCode::NOT_FOUND,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"NoSuchBucket",
|
||||
"The specified bucket does not exist",
|
||||
&b,
|
||||
),
|
||||
)
|
||||
.into_response(),
|
||||
Err(e) => {
|
||||
tracing::error!("delete_object error: {e}");
|
||||
StatusCode::INTERNAL_SERVER_ERROR.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles GET /{bucket} — dispatches to ListMultipartUploads, ListObjectVersions,
|
||||
/// GetBucketLocation, or ListObjects (v1/v2).
|
||||
pub async fn list_or_get<B: StorageBackend>(
|
||||
State(state): State<AppState<B>>,
|
||||
Path(bucket): Path<String>,
|
||||
Query(query): Query<BucketGetQuery>,
|
||||
) -> Response {
|
||||
// ?uploads → ListMultipartUploads
|
||||
if query.uploads.is_some() {
|
||||
return multipart::list_multipart_uploads(
|
||||
State(state),
|
||||
Path(bucket),
|
||||
Query(query),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// ?location → GetBucketLocation
|
||||
if query.location.is_some() {
|
||||
return get_bucket_location(State(state), Path(bucket)).await;
|
||||
}
|
||||
|
||||
// ?versions → ListObjectVersions
|
||||
if query.versions.is_some() {
|
||||
return list_object_versions(State(state), Path(bucket), Query(query)).await;
|
||||
}
|
||||
|
||||
// Default: ListObjects (v1 or v2)
|
||||
let is_v2 = query.list_type == Some(2);
|
||||
let continuation_token = if is_v2 {
|
||||
// v2: use continuation-token if present, else start-after
|
||||
query
|
||||
.continuation_token
|
||||
.as_deref()
|
||||
.or(query.start_after.as_deref())
|
||||
} else {
|
||||
query.marker.as_deref()
|
||||
};
|
||||
|
||||
// Treat empty delimiter as absent (S3 spec: empty delimiter = no delimiter)
|
||||
let delimiter = query
|
||||
.delimiter
|
||||
.as_deref()
|
||||
.filter(|d| !d.is_empty());
|
||||
|
||||
match state
|
||||
.store
|
||||
.list_objects_v2(
|
||||
&bucket,
|
||||
query.prefix.as_deref(),
|
||||
continuation_token,
|
||||
query.max_keys,
|
||||
delimiter,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result) => {
|
||||
let max_keys = query.max_keys.unwrap_or(1000);
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("Content-Type", HeaderValue::from_static("application/xml"));
|
||||
headers.insert(
|
||||
"x-amz-request-id",
|
||||
HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
|
||||
);
|
||||
|
||||
let xml = if is_v2 {
|
||||
responses::list_objects_v2_xml(
|
||||
&bucket,
|
||||
&result,
|
||||
max_keys,
|
||||
query.continuation_token.as_deref(),
|
||||
query.start_after.as_deref(),
|
||||
)
|
||||
} else {
|
||||
responses::list_objects_v1_xml(
|
||||
&bucket,
|
||||
&result,
|
||||
max_keys,
|
||||
query.marker.as_deref(),
|
||||
)
|
||||
};
|
||||
|
||||
(StatusCode::OK, headers, xml).into_response()
|
||||
}
|
||||
Err(Post3Error::BucketNotFound(b)) => (
|
||||
StatusCode::NOT_FOUND,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"NoSuchBucket",
|
||||
"The specified bucket does not exist",
|
||||
&b,
|
||||
),
|
||||
)
|
||||
.into_response(),
|
||||
Err(e) => {
|
||||
tracing::error!("list_objects error: {e}");
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml("InternalError", &e.to_string(), &bucket),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// GET /{bucket}?versions — ListObjectVersions (stub: returns all as version "null").
|
||||
async fn list_object_versions<B: StorageBackend>(
|
||||
State(state): State<AppState<B>>,
|
||||
Path(bucket): Path<String>,
|
||||
Query(query): Query<BucketGetQuery>,
|
||||
) -> Response {
|
||||
let delimiter = query.delimiter.as_deref().filter(|d| !d.is_empty());
|
||||
match state
|
||||
.store
|
||||
.list_objects_v2(
|
||||
&bucket,
|
||||
query.prefix.as_deref(),
|
||||
query.key_marker.as_deref(),
|
||||
query.max_keys,
|
||||
delimiter,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result) => {
|
||||
let max_keys = query.max_keys.unwrap_or(1000);
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("Content-Type", HeaderValue::from_static("application/xml"));
|
||||
headers.insert(
|
||||
"x-amz-request-id",
|
||||
HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
|
||||
);
|
||||
(
|
||||
StatusCode::OK,
|
||||
headers,
|
||||
responses::list_object_versions_xml(
|
||||
&bucket,
|
||||
&result,
|
||||
max_keys,
|
||||
query.key_marker.as_deref(),
|
||||
),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
Err(Post3Error::BucketNotFound(b)) => (
|
||||
StatusCode::NOT_FOUND,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml("NoSuchBucket", "The specified bucket does not exist", &b),
|
||||
)
|
||||
.into_response(),
|
||||
Err(e) => {
|
||||
tracing::error!("list_object_versions error: {e}");
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml("InternalError", &e.to_string(), &bucket),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// GET /{bucket}?location — GetBucketLocation.
|
||||
async fn get_bucket_location<B: StorageBackend>(
|
||||
State(state): State<AppState<B>>,
|
||||
Path(bucket): Path<String>,
|
||||
) -> Response {
|
||||
match state.store.head_bucket(&bucket).await {
|
||||
Ok(Some(_)) => {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("Content-Type", HeaderValue::from_static("application/xml"));
|
||||
headers.insert(
|
||||
"x-amz-request-id",
|
||||
HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
|
||||
);
|
||||
(StatusCode::OK, headers, responses::get_bucket_location_xml()).into_response()
|
||||
}
|
||||
Ok(None) => (
|
||||
StatusCode::NOT_FOUND,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"NoSuchBucket",
|
||||
"The specified bucket does not exist",
|
||||
&bucket,
|
||||
),
|
||||
)
|
||||
.into_response(),
|
||||
Err(e) => {
|
||||
tracing::error!("get_bucket_location error: {e}");
|
||||
StatusCode::INTERNAL_SERVER_ERROR.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// POST /{bucket} — dispatches to DeleteObjects based on ?delete query param.
|
||||
pub async fn bucket_post_dispatch<B: StorageBackend>(
|
||||
state: State<AppState<B>>,
|
||||
path: Path<String>,
|
||||
query: Query<crate::s3::extractors::BucketPostQuery>,
|
||||
body: Bytes,
|
||||
) -> Response {
|
||||
if query.delete.is_some() {
|
||||
delete_objects(state, path, body).await
|
||||
} else {
|
||||
(
|
||||
StatusCode::BAD_REQUEST,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"InvalidRequest",
|
||||
"POST on bucket requires ?delete parameter",
|
||||
&format!("/{}", path.0),
|
||||
),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
}
|
||||
|
||||
/// POST /{bucket}?delete — DeleteObjects (batch delete).
|
||||
async fn delete_objects<B: StorageBackend>(
|
||||
State(state): State<AppState<B>>,
|
||||
Path(bucket): Path<String>,
|
||||
body: Bytes,
|
||||
) -> Response {
|
||||
let (keys, quiet) = match responses::parse_delete_objects_xml(&body) {
|
||||
Ok(result) => result,
|
||||
Err(msg) => {
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml("MalformedXML", &msg, &format!("/{bucket}")),
|
||||
)
|
||||
.into_response();
|
||||
}
|
||||
};
|
||||
|
||||
// S3 limits DeleteObjects to 1000 keys
|
||||
if keys.len() > 1000 {
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
[("Content-Type", "application/xml")],
|
||||
responses::error_xml(
|
||||
"MalformedXML",
|
||||
"The number of keys in a DeleteObjects request cannot exceed 1000",
|
||||
&format!("/{bucket}"),
|
||||
),
|
||||
)
|
||||
.into_response();
|
||||
}
|
||||
|
||||
let mut deleted = Vec::new();
|
||||
let mut errors: Vec<(String, String, String)> = Vec::new();
|
||||
|
||||
for key in keys {
|
||||
match state.store.delete_object(&bucket, &key).await {
|
||||
Ok(()) => {
|
||||
if !quiet {
|
||||
deleted.push(key);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
errors.push((key, "InternalError".to_string(), e.to_string()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("Content-Type", HeaderValue::from_static("application/xml"));
|
||||
headers.insert(
|
||||
"x-amz-request-id",
|
||||
HeaderValue::from_str(&uuid::Uuid::new_v4().to_string()).unwrap(),
|
||||
);
|
||||
|
||||
(
|
||||
StatusCode::OK,
|
||||
headers,
|
||||
responses::delete_objects_result_xml(&deleted, &errors),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
42
crates/post3-server/src/s3/mod.rs
Normal file
42
crates/post3-server/src/s3/mod.rs
Normal file
@@ -0,0 +1,42 @@
|
||||
pub mod extractors;
|
||||
pub mod handlers;
|
||||
pub mod responses;
|
||||
pub mod router;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use notmad::{Component, ComponentInfo, MadError};
|
||||
use post3::StorageBackend;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::state::State;
|
||||
|
||||
pub struct S3Server<B: StorageBackend> {
|
||||
pub host: SocketAddr,
|
||||
pub state: State<B>,
|
||||
}
|
||||
|
||||
impl<B: StorageBackend> Component for S3Server<B> {
|
||||
fn info(&self) -> ComponentInfo {
|
||||
"post3/s3".into()
|
||||
}
|
||||
|
||||
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError> {
|
||||
let app = router::build_router(self.state.clone());
|
||||
|
||||
tracing::info!("post3 s3-compatible server listening on {}", self.host);
|
||||
let listener = TcpListener::bind(&self.host).await.map_err(|e| {
|
||||
MadError::Inner(anyhow::anyhow!("failed to bind: {e}"))
|
||||
})?;
|
||||
|
||||
axum::serve(listener, app.into_make_service())
|
||||
.with_graceful_shutdown(async move {
|
||||
cancellation_token.cancelled().await;
|
||||
})
|
||||
.await
|
||||
.map_err(|e| MadError::Inner(anyhow::anyhow!("server error: {e}")))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
538
crates/post3-server/src/s3/responses.rs
Normal file
538
crates/post3-server/src/s3/responses.rs
Normal file
@@ -0,0 +1,538 @@
|
||||
use post3::models::{BucketInfo, ListMultipartUploadsResult, ListObjectsResult, ListPartsResult};
|
||||
use serde::Deserialize;
|
||||
|
||||
pub fn list_buckets_xml(buckets: &[BucketInfo]) -> String {
|
||||
let mut xml = String::from(
|
||||
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
|
||||
<ListAllMyBucketsResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
|
||||
<Owner><ID>post3</ID><DisplayName>post3</DisplayName></Owner>\
|
||||
<Buckets>",
|
||||
);
|
||||
|
||||
for b in buckets {
|
||||
xml.push_str("<Bucket><Name>");
|
||||
xml.push_str(&xml_escape(&b.name));
|
||||
xml.push_str("</Name><CreationDate>");
|
||||
xml.push_str(&b.created_at.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string());
|
||||
xml.push_str("</CreationDate></Bucket>");
|
||||
}
|
||||
|
||||
xml.push_str("</Buckets></ListAllMyBucketsResult>");
|
||||
xml
|
||||
}
|
||||
|
||||
pub fn list_objects_v2_xml(
|
||||
bucket_name: &str,
|
||||
result: &ListObjectsResult,
|
||||
max_keys: i64,
|
||||
continuation_token: Option<&str>,
|
||||
start_after: Option<&str>,
|
||||
) -> String {
|
||||
let mut xml = String::from(
|
||||
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
|
||||
<ListBucketResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">",
|
||||
);
|
||||
|
||||
xml.push_str("<Name>");
|
||||
xml.push_str(&xml_escape(bucket_name));
|
||||
xml.push_str("</Name>");
|
||||
|
||||
xml.push_str("<Prefix>");
|
||||
if let Some(ref pfx) = result.prefix {
|
||||
xml.push_str(&xml_escape(pfx));
|
||||
}
|
||||
xml.push_str("</Prefix>");
|
||||
|
||||
if let Some(sa) = start_after {
|
||||
xml.push_str("<StartAfter>");
|
||||
xml.push_str(&xml_escape(sa));
|
||||
xml.push_str("</StartAfter>");
|
||||
}
|
||||
|
||||
xml.push_str("<KeyCount>");
|
||||
xml.push_str(&result.key_count.to_string());
|
||||
xml.push_str("</KeyCount>");
|
||||
|
||||
xml.push_str("<MaxKeys>");
|
||||
xml.push_str(&max_keys.to_string());
|
||||
xml.push_str("</MaxKeys>");
|
||||
|
||||
xml.push_str("<IsTruncated>");
|
||||
xml.push_str(if result.is_truncated { "true" } else { "false" });
|
||||
xml.push_str("</IsTruncated>");
|
||||
|
||||
if let Some(ref delim) = result.delimiter {
|
||||
xml.push_str("<Delimiter>");
|
||||
xml.push_str(&xml_escape(delim));
|
||||
xml.push_str("</Delimiter>");
|
||||
}
|
||||
|
||||
if let Some(token) = continuation_token {
|
||||
xml.push_str("<ContinuationToken>");
|
||||
xml.push_str(&xml_escape(token));
|
||||
xml.push_str("</ContinuationToken>");
|
||||
}
|
||||
|
||||
if let Some(ref token) = result.next_continuation_token {
|
||||
xml.push_str("<NextContinuationToken>");
|
||||
xml.push_str(&xml_escape(token));
|
||||
xml.push_str("</NextContinuationToken>");
|
||||
}
|
||||
|
||||
for obj in &result.objects {
|
||||
xml.push_str("<Contents>");
|
||||
xml.push_str("<Key>");
|
||||
xml.push_str(&xml_escape(&obj.key));
|
||||
xml.push_str("</Key>");
|
||||
xml.push_str("<LastModified>");
|
||||
xml.push_str(
|
||||
&obj.last_modified
|
||||
.format("%Y-%m-%dT%H:%M:%S%.3fZ")
|
||||
.to_string(),
|
||||
);
|
||||
xml.push_str("</LastModified>");
|
||||
xml.push_str("<ETag>");
|
||||
xml.push_str(&xml_escape(&obj.etag));
|
||||
xml.push_str("</ETag>");
|
||||
xml.push_str("<Size>");
|
||||
xml.push_str(&obj.size.to_string());
|
||||
xml.push_str("</Size>");
|
||||
xml.push_str("<StorageClass>STANDARD</StorageClass>");
|
||||
xml.push_str("</Contents>");
|
||||
}
|
||||
|
||||
for cp in &result.common_prefixes {
|
||||
xml.push_str("<CommonPrefixes><Prefix>");
|
||||
xml.push_str(&xml_escape(cp));
|
||||
xml.push_str("</Prefix></CommonPrefixes>");
|
||||
}
|
||||
|
||||
xml.push_str("</ListBucketResult>");
|
||||
xml
|
||||
}
|
||||
|
||||
pub fn list_objects_v1_xml(
|
||||
bucket_name: &str,
|
||||
result: &ListObjectsResult,
|
||||
max_keys: i64,
|
||||
marker: Option<&str>,
|
||||
) -> String {
|
||||
let mut xml = String::from(
|
||||
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
|
||||
<ListBucketResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">",
|
||||
);
|
||||
|
||||
xml.push_str("<Name>");
|
||||
xml.push_str(&xml_escape(bucket_name));
|
||||
xml.push_str("</Name>");
|
||||
|
||||
xml.push_str("<Prefix>");
|
||||
if let Some(ref pfx) = result.prefix {
|
||||
xml.push_str(&xml_escape(pfx));
|
||||
}
|
||||
xml.push_str("</Prefix>");
|
||||
|
||||
xml.push_str("<Marker>");
|
||||
if let Some(m) = marker {
|
||||
xml.push_str(&xml_escape(m));
|
||||
}
|
||||
xml.push_str("</Marker>");
|
||||
|
||||
xml.push_str("<MaxKeys>");
|
||||
xml.push_str(&max_keys.to_string());
|
||||
xml.push_str("</MaxKeys>");
|
||||
|
||||
xml.push_str("<IsTruncated>");
|
||||
xml.push_str(if result.is_truncated { "true" } else { "false" });
|
||||
xml.push_str("</IsTruncated>");
|
||||
|
||||
if let Some(ref token) = result.next_continuation_token {
|
||||
xml.push_str("<NextMarker>");
|
||||
xml.push_str(&xml_escape(token));
|
||||
xml.push_str("</NextMarker>");
|
||||
}
|
||||
|
||||
if let Some(ref delim) = result.delimiter {
|
||||
xml.push_str("<Delimiter>");
|
||||
xml.push_str(&xml_escape(delim));
|
||||
xml.push_str("</Delimiter>");
|
||||
}
|
||||
|
||||
for obj in &result.objects {
|
||||
xml.push_str("<Contents>");
|
||||
xml.push_str("<Key>");
|
||||
xml.push_str(&xml_escape(&obj.key));
|
||||
xml.push_str("</Key>");
|
||||
xml.push_str("<LastModified>");
|
||||
xml.push_str(
|
||||
&obj.last_modified
|
||||
.format("%Y-%m-%dT%H:%M:%S%.3fZ")
|
||||
.to_string(),
|
||||
);
|
||||
xml.push_str("</LastModified>");
|
||||
xml.push_str("<ETag>");
|
||||
xml.push_str(&xml_escape(&obj.etag));
|
||||
xml.push_str("</ETag>");
|
||||
xml.push_str("<Size>");
|
||||
xml.push_str(&obj.size.to_string());
|
||||
xml.push_str("</Size>");
|
||||
xml.push_str("<Owner><ID>post3</ID><DisplayName>post3</DisplayName></Owner>");
|
||||
xml.push_str("<StorageClass>STANDARD</StorageClass>");
|
||||
xml.push_str("</Contents>");
|
||||
}
|
||||
|
||||
for cp in &result.common_prefixes {
|
||||
xml.push_str("<CommonPrefixes><Prefix>");
|
||||
xml.push_str(&xml_escape(cp));
|
||||
xml.push_str("</Prefix></CommonPrefixes>");
|
||||
}
|
||||
|
||||
xml.push_str("</ListBucketResult>");
|
||||
xml
|
||||
}
|
||||
|
||||
pub fn list_object_versions_xml(
|
||||
bucket_name: &str,
|
||||
result: &ListObjectsResult,
|
||||
max_keys: i64,
|
||||
key_marker: Option<&str>,
|
||||
) -> String {
|
||||
let mut xml = String::from(
|
||||
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
|
||||
<ListVersionsResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">",
|
||||
);
|
||||
|
||||
xml.push_str("<Name>");
|
||||
xml.push_str(&xml_escape(bucket_name));
|
||||
xml.push_str("</Name>");
|
||||
|
||||
xml.push_str("<Prefix>");
|
||||
if let Some(ref pfx) = result.prefix {
|
||||
xml.push_str(&xml_escape(pfx));
|
||||
}
|
||||
xml.push_str("</Prefix>");
|
||||
|
||||
// Echo back input markers
|
||||
xml.push_str("<KeyMarker>");
|
||||
if let Some(km) = key_marker {
|
||||
xml.push_str(&xml_escape(km));
|
||||
}
|
||||
xml.push_str("</KeyMarker>");
|
||||
xml.push_str("<VersionIdMarker/>");
|
||||
|
||||
xml.push_str("<MaxKeys>");
|
||||
xml.push_str(&max_keys.to_string());
|
||||
xml.push_str("</MaxKeys>");
|
||||
|
||||
xml.push_str("<IsTruncated>");
|
||||
xml.push_str(if result.is_truncated { "true" } else { "false" });
|
||||
xml.push_str("</IsTruncated>");
|
||||
|
||||
for obj in &result.objects {
|
||||
xml.push_str("<Version>");
|
||||
xml.push_str("<Key>");
|
||||
xml.push_str(&xml_escape(&obj.key));
|
||||
xml.push_str("</Key>");
|
||||
xml.push_str("<VersionId>null</VersionId>");
|
||||
xml.push_str("<IsLatest>true</IsLatest>");
|
||||
xml.push_str("<LastModified>");
|
||||
xml.push_str(
|
||||
&obj.last_modified
|
||||
.format("%Y-%m-%dT%H:%M:%S%.3fZ")
|
||||
.to_string(),
|
||||
);
|
||||
xml.push_str("</LastModified>");
|
||||
xml.push_str("<ETag>");
|
||||
xml.push_str(&xml_escape(&obj.etag));
|
||||
xml.push_str("</ETag>");
|
||||
xml.push_str("<Size>");
|
||||
xml.push_str(&obj.size.to_string());
|
||||
xml.push_str("</Size>");
|
||||
xml.push_str("<StorageClass>STANDARD</StorageClass>");
|
||||
xml.push_str("<Owner><ID>post3</ID><DisplayName>post3</DisplayName></Owner>");
|
||||
xml.push_str("</Version>");
|
||||
}
|
||||
|
||||
// Include NextKeyMarker/NextVersionIdMarker when truncated for pagination
|
||||
if result.is_truncated {
|
||||
if let Some(last_obj) = result.objects.last() {
|
||||
xml.push_str("<NextKeyMarker>");
|
||||
xml.push_str(&xml_escape(&last_obj.key));
|
||||
xml.push_str("</NextKeyMarker>");
|
||||
xml.push_str("<NextVersionIdMarker>null</NextVersionIdMarker>");
|
||||
}
|
||||
}
|
||||
|
||||
xml.push_str("</ListVersionsResult>");
|
||||
xml
|
||||
}
|
||||
|
||||
pub fn get_bucket_location_xml() -> String {
|
||||
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
|
||||
<LocationConstraint xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\"/>"
|
||||
.to_string()
|
||||
}
|
||||
|
||||
// --- DeleteObjects ---
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[serde(rename = "Delete")]
|
||||
struct DeleteObjectsRequest {
|
||||
#[serde(rename = "Object")]
|
||||
objects: Vec<DeleteObjectEntry>,
|
||||
#[serde(rename = "Quiet", default)]
|
||||
quiet: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct DeleteObjectEntry {
|
||||
#[serde(rename = "Key")]
|
||||
key: String,
|
||||
}
|
||||
|
||||
pub fn parse_delete_objects_xml(body: &[u8]) -> Result<(Vec<String>, bool), String> {
|
||||
let request: DeleteObjectsRequest =
|
||||
quick_xml::de::from_reader(body).map_err(|e| format!("invalid XML: {e}"))?;
|
||||
let quiet = request.quiet.unwrap_or(false);
|
||||
let keys = request.objects.into_iter().map(|o| o.key).collect();
|
||||
Ok((keys, quiet))
|
||||
}
|
||||
|
||||
pub fn delete_objects_result_xml(deleted: &[String], errors: &[(String, String, String)]) -> String {
|
||||
let mut xml = String::from(
|
||||
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
|
||||
<DeleteResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">",
|
||||
);
|
||||
|
||||
for key in deleted {
|
||||
xml.push_str("<Deleted><Key>");
|
||||
xml.push_str(&xml_escape(key));
|
||||
xml.push_str("</Key></Deleted>");
|
||||
}
|
||||
|
||||
for (key, code, message) in errors {
|
||||
xml.push_str("<Error><Key>");
|
||||
xml.push_str(&xml_escape(key));
|
||||
xml.push_str("</Key><Code>");
|
||||
xml.push_str(&xml_escape(code));
|
||||
xml.push_str("</Code><Message>");
|
||||
xml.push_str(&xml_escape(message));
|
||||
xml.push_str("</Message></Error>");
|
||||
}
|
||||
|
||||
xml.push_str("</DeleteResult>");
|
||||
xml
|
||||
}
|
||||
|
||||
pub fn error_xml(code: &str, message: &str, resource: &str) -> String {
|
||||
let request_id = uuid::Uuid::new_v4().to_string();
|
||||
format!(
|
||||
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
|
||||
<Error>\
|
||||
<Code>{code}</Code>\
|
||||
<Message>{message}</Message>\
|
||||
<Resource>{resource}</Resource>\
|
||||
<RequestId>{request_id}</RequestId>\
|
||||
</Error>",
|
||||
code = xml_escape(code),
|
||||
message = xml_escape(message),
|
||||
resource = xml_escape(resource),
|
||||
request_id = request_id,
|
||||
)
|
||||
}
|
||||
|
||||
// --- Multipart upload responses ---
|
||||
|
||||
pub fn initiate_multipart_upload_xml(bucket: &str, key: &str, upload_id: &str) -> String {
|
||||
format!(
|
||||
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
|
||||
<InitiateMultipartUploadResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
|
||||
<Bucket>{bucket}</Bucket>\
|
||||
<Key>{key}</Key>\
|
||||
<UploadId>{upload_id}</UploadId>\
|
||||
</InitiateMultipartUploadResult>",
|
||||
bucket = xml_escape(bucket),
|
||||
key = xml_escape(key),
|
||||
upload_id = xml_escape(upload_id),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn complete_multipart_upload_xml(
|
||||
location: &str,
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
etag: &str,
|
||||
) -> String {
|
||||
format!(
|
||||
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
|
||||
<CompleteMultipartUploadResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
|
||||
<Location>{location}</Location>\
|
||||
<Bucket>{bucket}</Bucket>\
|
||||
<Key>{key}</Key>\
|
||||
<ETag>{etag}</ETag>\
|
||||
</CompleteMultipartUploadResult>",
|
||||
location = xml_escape(location),
|
||||
bucket = xml_escape(bucket),
|
||||
key = xml_escape(key),
|
||||
etag = xml_escape(etag),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn list_parts_xml(result: &ListPartsResult, max_parts: i32) -> String {
|
||||
let mut xml = String::from(
|
||||
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
|
||||
<ListPartsResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">",
|
||||
);
|
||||
|
||||
xml.push_str("<Bucket>");
|
||||
xml.push_str(&xml_escape(&result.bucket));
|
||||
xml.push_str("</Bucket>");
|
||||
|
||||
xml.push_str("<Key>");
|
||||
xml.push_str(&xml_escape(&result.key));
|
||||
xml.push_str("</Key>");
|
||||
|
||||
xml.push_str("<UploadId>");
|
||||
xml.push_str(&xml_escape(&result.upload_id));
|
||||
xml.push_str("</UploadId>");
|
||||
|
||||
xml.push_str("<MaxParts>");
|
||||
xml.push_str(&max_parts.to_string());
|
||||
xml.push_str("</MaxParts>");
|
||||
|
||||
xml.push_str("<IsTruncated>");
|
||||
xml.push_str(if result.is_truncated { "true" } else { "false" });
|
||||
xml.push_str("</IsTruncated>");
|
||||
|
||||
if let Some(marker) = result.next_part_number_marker {
|
||||
xml.push_str("<NextPartNumberMarker>");
|
||||
xml.push_str(&marker.to_string());
|
||||
xml.push_str("</NextPartNumberMarker>");
|
||||
}
|
||||
|
||||
for part in &result.parts {
|
||||
xml.push_str("<Part>");
|
||||
xml.push_str("<PartNumber>");
|
||||
xml.push_str(&part.part_number.to_string());
|
||||
xml.push_str("</PartNumber>");
|
||||
xml.push_str("<LastModified>");
|
||||
xml.push_str(
|
||||
&part
|
||||
.created_at
|
||||
.format("%Y-%m-%dT%H:%M:%S%.3fZ")
|
||||
.to_string(),
|
||||
);
|
||||
xml.push_str("</LastModified>");
|
||||
xml.push_str("<ETag>");
|
||||
xml.push_str(&xml_escape(&part.etag));
|
||||
xml.push_str("</ETag>");
|
||||
xml.push_str("<Size>");
|
||||
xml.push_str(&part.size.to_string());
|
||||
xml.push_str("</Size>");
|
||||
xml.push_str("</Part>");
|
||||
}
|
||||
|
||||
xml.push_str("</ListPartsResult>");
|
||||
xml
|
||||
}
|
||||
|
||||
pub fn list_multipart_uploads_xml(
|
||||
result: &ListMultipartUploadsResult,
|
||||
max_uploads: i32,
|
||||
) -> String {
|
||||
let mut xml = String::from(
|
||||
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
|
||||
<ListMultipartUploadsResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">",
|
||||
);
|
||||
|
||||
xml.push_str("<Bucket>");
|
||||
xml.push_str(&xml_escape(&result.bucket));
|
||||
xml.push_str("</Bucket>");
|
||||
|
||||
xml.push_str("<Prefix>");
|
||||
if let Some(ref pfx) = result.prefix {
|
||||
xml.push_str(&xml_escape(pfx));
|
||||
}
|
||||
xml.push_str("</Prefix>");
|
||||
|
||||
xml.push_str("<MaxUploads>");
|
||||
xml.push_str(&max_uploads.to_string());
|
||||
xml.push_str("</MaxUploads>");
|
||||
|
||||
xml.push_str("<IsTruncated>");
|
||||
xml.push_str(if result.is_truncated {
|
||||
"true"
|
||||
} else {
|
||||
"false"
|
||||
});
|
||||
xml.push_str("</IsTruncated>");
|
||||
|
||||
if let Some(ref marker) = result.next_key_marker {
|
||||
xml.push_str("<NextKeyMarker>");
|
||||
xml.push_str(&xml_escape(marker));
|
||||
xml.push_str("</NextKeyMarker>");
|
||||
}
|
||||
if let Some(ref marker) = result.next_upload_id_marker {
|
||||
xml.push_str("<NextUploadIdMarker>");
|
||||
xml.push_str(&xml_escape(marker));
|
||||
xml.push_str("</NextUploadIdMarker>");
|
||||
}
|
||||
|
||||
for upload in &result.uploads {
|
||||
xml.push_str("<Upload>");
|
||||
xml.push_str("<Key>");
|
||||
xml.push_str(&xml_escape(&upload.key));
|
||||
xml.push_str("</Key>");
|
||||
xml.push_str("<UploadId>");
|
||||
xml.push_str(&xml_escape(&upload.upload_id));
|
||||
xml.push_str("</UploadId>");
|
||||
xml.push_str("<Initiated>");
|
||||
xml.push_str(
|
||||
&upload
|
||||
.initiated
|
||||
.format("%Y-%m-%dT%H:%M:%S%.3fZ")
|
||||
.to_string(),
|
||||
);
|
||||
xml.push_str("</Initiated>");
|
||||
xml.push_str("</Upload>");
|
||||
}
|
||||
|
||||
xml.push_str("</ListMultipartUploadsResult>");
|
||||
xml
|
||||
}
|
||||
|
||||
// --- XML request parsing for CompleteMultipartUpload ---
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[serde(rename = "CompleteMultipartUpload")]
|
||||
struct CompleteMultipartUploadRequest {
|
||||
#[serde(rename = "Part")]
|
||||
parts: Vec<CompletePart>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct CompletePart {
|
||||
#[serde(rename = "PartNumber")]
|
||||
part_number: i32,
|
||||
#[serde(rename = "ETag")]
|
||||
etag: String,
|
||||
}
|
||||
|
||||
pub fn parse_complete_multipart_xml(body: &[u8]) -> Result<Vec<(i32, String)>, String> {
|
||||
let request: CompleteMultipartUploadRequest =
|
||||
quick_xml::de::from_reader(body).map_err(|e| format!("invalid XML: {e}"))?;
|
||||
|
||||
Ok(request
|
||||
.parts
|
||||
.into_iter()
|
||||
.map(|p| (p.part_number, p.etag))
|
||||
.collect())
|
||||
}
|
||||
|
||||
fn xml_escape(s: &str) -> String {
|
||||
s.replace('&', "&")
|
||||
.replace('<', "<")
|
||||
.replace('>', ">")
|
||||
.replace('"', """)
|
||||
.replace('\'', "'")
|
||||
}
|
||||
48
crates/post3-server/src/s3/router.rs
Normal file
48
crates/post3-server/src/s3/router.rs
Normal file
@@ -0,0 +1,48 @@
|
||||
use axum::{
|
||||
extract::{DefaultBodyLimit, Request},
|
||||
http::StatusCode,
|
||||
response::IntoResponse,
|
||||
routing::{delete, get, head, post, put},
|
||||
Router,
|
||||
};
|
||||
use post3::StorageBackend;
|
||||
use tower_http::trace::TraceLayer;
|
||||
|
||||
use super::handlers::{buckets, objects};
|
||||
use crate::state::State;
|
||||
|
||||
pub fn build_router<B: StorageBackend>(state: State<B>) -> Router {
|
||||
Router::new()
|
||||
// Service-level
|
||||
.route("/", get(buckets::list_buckets::<B>))
|
||||
// Bucket-level (with and without trailing slash for SDK compat)
|
||||
.route("/{bucket}", put(buckets::create_bucket::<B>))
|
||||
.route("/{bucket}/", put(buckets::create_bucket::<B>))
|
||||
.route("/{bucket}", head(buckets::head_bucket::<B>))
|
||||
.route("/{bucket}/", head(buckets::head_bucket::<B>))
|
||||
.route("/{bucket}", delete(buckets::delete_bucket::<B>))
|
||||
.route("/{bucket}/", delete(buckets::delete_bucket::<B>))
|
||||
.route("/{bucket}", get(objects::list_or_get::<B>))
|
||||
.route("/{bucket}/", get(objects::list_or_get::<B>))
|
||||
.route("/{bucket}", post(objects::bucket_post_dispatch::<B>))
|
||||
.route("/{bucket}/", post(objects::bucket_post_dispatch::<B>))
|
||||
// Object-level (wildcard key for nested paths like "a/b/c")
|
||||
.route("/{bucket}/{*key}", put(objects::put_dispatch::<B>))
|
||||
.route("/{bucket}/{*key}", get(objects::get_dispatch::<B>))
|
||||
.route("/{bucket}/{*key}", head(objects::head_object::<B>))
|
||||
.route("/{bucket}/{*key}", delete(objects::delete_dispatch::<B>))
|
||||
.route("/{bucket}/{*key}", post(objects::post_dispatch::<B>))
|
||||
.fallback(fallback)
|
||||
.layer(DefaultBodyLimit::max(5 * 1024 * 1024 * 1024)) // 5 GiB
|
||||
.layer(TraceLayer::new_for_http())
|
||||
.with_state(state)
|
||||
}
|
||||
|
||||
async fn fallback(req: Request) -> impl IntoResponse {
|
||||
tracing::warn!(
|
||||
method = %req.method(),
|
||||
uri = %req.uri(),
|
||||
"unmatched request"
|
||||
);
|
||||
StatusCode::NOT_FOUND
|
||||
}
|
||||
6
crates/post3-server/src/state.rs
Normal file
6
crates/post3-server/src/state.rs
Normal file
@@ -0,0 +1,6 @@
|
||||
use post3::StorageBackend;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct State<B: StorageBackend> {
|
||||
pub store: B,
|
||||
}
|
||||
106
crates/post3-server/tests/common/mod.rs
Normal file
106
crates/post3-server/tests/common/mod.rs
Normal file
@@ -0,0 +1,106 @@
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use aws_credential_types::Credentials;
|
||||
use aws_sdk_s3::Client;
|
||||
use post3::PostgresBackend;
|
||||
use sqlx::PgPool;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
static TRACING: std::sync::Once = std::sync::Once::new();
|
||||
|
||||
fn init_tracing() {
|
||||
TRACING.call_once(|| {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(
|
||||
tracing_subscriber::EnvFilter::from_default_env()
|
||||
.add_directive("post3_server=debug".parse().unwrap())
|
||||
.add_directive("tower_http=debug".parse().unwrap()),
|
||||
)
|
||||
.with_test_writer()
|
||||
.init();
|
||||
});
|
||||
}
|
||||
|
||||
pub struct TestServer {
|
||||
pub addr: SocketAddr,
|
||||
pub client: Client,
|
||||
cancel: CancellationToken,
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl TestServer {
|
||||
pub async fn start() -> Self {
|
||||
init_tracing();
|
||||
|
||||
let db_url = std::env::var("DATABASE_URL").unwrap_or_else(|_| {
|
||||
"postgresql://devuser:devpassword@localhost:5435/post3_dev".into()
|
||||
});
|
||||
|
||||
let pool = sqlx::pool::PoolOptions::new()
|
||||
.max_connections(5)
|
||||
.connect(&db_url)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Run migrations
|
||||
sqlx::migrate!("../post3/migrations/")
|
||||
.set_locking(false)
|
||||
.run(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Clean slate
|
||||
sqlx::query("DELETE FROM upload_parts").execute(&pool).await.unwrap();
|
||||
sqlx::query("DELETE FROM multipart_upload_metadata").execute(&pool).await.unwrap();
|
||||
sqlx::query("DELETE FROM multipart_uploads").execute(&pool).await.unwrap();
|
||||
sqlx::query("DELETE FROM blocks").execute(&pool).await.unwrap();
|
||||
sqlx::query("DELETE FROM object_metadata").execute(&pool).await.unwrap();
|
||||
sqlx::query("DELETE FROM objects").execute(&pool).await.unwrap();
|
||||
sqlx::query("DELETE FROM buckets").execute(&pool).await.unwrap();
|
||||
|
||||
let backend = PostgresBackend::new(pool.clone());
|
||||
let state = post3_server::state::State { store: backend };
|
||||
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
let cancel_clone = cancel.clone();
|
||||
|
||||
let router = post3_server::s3::router::build_router(state);
|
||||
tokio::spawn(async move {
|
||||
axum::serve(listener, router.into_make_service())
|
||||
.with_graceful_shutdown(async move {
|
||||
cancel_clone.cancelled().await;
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
let creds = Credentials::new("test", "test", None, None, "test");
|
||||
let config = aws_sdk_s3::Config::builder()
|
||||
.behavior_version_latest()
|
||||
.region(aws_types::region::Region::new("us-east-1"))
|
||||
.endpoint_url(format!("http://{}", addr))
|
||||
.credentials_provider(creds)
|
||||
.force_path_style(true)
|
||||
.build();
|
||||
|
||||
let client = Client::from_conf(config);
|
||||
|
||||
Self {
|
||||
addr,
|
||||
client,
|
||||
cancel,
|
||||
pool,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn shutdown(self) {
|
||||
self.cancel.cancel();
|
||||
// Give the server task a moment to wind down
|
||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||
self.pool.close().await;
|
||||
}
|
||||
}
|
||||
390
crates/post3-server/tests/fs_integration.rs
Normal file
390
crates/post3-server/tests/fs_integration.rs
Normal file
@@ -0,0 +1,390 @@
|
||||
//! Integration tests using FilesystemBackend (no PostgreSQL required).
|
||||
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use aws_credential_types::Credentials;
|
||||
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
|
||||
use aws_sdk_s3::Client;
|
||||
use post3::FilesystemBackend;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
struct FsTestServer {
|
||||
client: Client,
|
||||
cancel: CancellationToken,
|
||||
_tmpdir: tempfile::TempDir,
|
||||
}
|
||||
|
||||
impl FsTestServer {
|
||||
async fn start() -> Self {
|
||||
let tmpdir = tempfile::tempdir().unwrap();
|
||||
let backend = FilesystemBackend::new(tmpdir.path());
|
||||
let state = post3_server::state::State { store: backend };
|
||||
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr: SocketAddr = listener.local_addr().unwrap();
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
let cancel_clone = cancel.clone();
|
||||
|
||||
let router = post3_server::s3::router::build_router(state);
|
||||
tokio::spawn(async move {
|
||||
axum::serve(listener, router.into_make_service())
|
||||
.with_graceful_shutdown(async move {
|
||||
cancel_clone.cancelled().await;
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
let creds = Credentials::new("test", "test", None, None, "test");
|
||||
let config = aws_sdk_s3::Config::builder()
|
||||
.behavior_version_latest()
|
||||
.region(aws_types::region::Region::new("us-east-1"))
|
||||
.endpoint_url(format!("http://{}", addr))
|
||||
.credentials_provider(creds)
|
||||
.force_path_style(true)
|
||||
.build();
|
||||
|
||||
let client = Client::from_conf(config);
|
||||
|
||||
Self {
|
||||
client,
|
||||
cancel,
|
||||
_tmpdir: tmpdir,
|
||||
}
|
||||
}
|
||||
|
||||
async fn shutdown(self) {
|
||||
self.cancel.cancel();
|
||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||
}
|
||||
}
|
||||
|
||||
// --- Tests ---
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fs_bucket_crud() {
|
||||
let server = FsTestServer::start().await;
|
||||
let c = &server.client;
|
||||
|
||||
// Create
|
||||
c.create_bucket().bucket("my-bucket").send().await.unwrap();
|
||||
|
||||
// Head
|
||||
c.head_bucket().bucket("my-bucket").send().await.unwrap();
|
||||
|
||||
// List
|
||||
let resp = c.list_buckets().send().await.unwrap();
|
||||
let names: Vec<_> = resp
|
||||
.buckets()
|
||||
.iter()
|
||||
.filter_map(|b| b.name())
|
||||
.collect();
|
||||
assert!(names.contains(&"my-bucket"));
|
||||
|
||||
// Delete
|
||||
c.delete_bucket().bucket("my-bucket").send().await.unwrap();
|
||||
|
||||
// Verify gone
|
||||
let result = c.head_bucket().bucket("my-bucket").send().await;
|
||||
assert!(result.is_err());
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fs_put_get_delete() {
|
||||
let server = FsTestServer::start().await;
|
||||
let c = &server.client;
|
||||
|
||||
c.create_bucket().bucket("test").send().await.unwrap();
|
||||
|
||||
// Put
|
||||
c.put_object()
|
||||
.bucket("test")
|
||||
.key("hello.txt")
|
||||
.content_type("text/plain")
|
||||
.body(aws_sdk_s3::primitives::ByteStream::from_static(
|
||||
b"hello world",
|
||||
))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Get
|
||||
let resp = c
|
||||
.get_object()
|
||||
.bucket("test")
|
||||
.key("hello.txt")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
let body = resp.body.collect().await.unwrap().into_bytes();
|
||||
assert_eq!(body.as_ref(), b"hello world");
|
||||
|
||||
// Head
|
||||
let head = c
|
||||
.head_object()
|
||||
.bucket("test")
|
||||
.key("hello.txt")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(head.content_length(), Some(11));
|
||||
assert_eq!(head.content_type(), Some("text/plain"));
|
||||
|
||||
// Delete
|
||||
c.delete_object()
|
||||
.bucket("test")
|
||||
.key("hello.txt")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Verify gone
|
||||
let result = c
|
||||
.get_object()
|
||||
.bucket("test")
|
||||
.key("hello.txt")
|
||||
.send()
|
||||
.await;
|
||||
assert!(result.is_err());
|
||||
|
||||
// Cleanup
|
||||
c.delete_bucket().bucket("test").send().await.unwrap();
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fs_list_objects() {
|
||||
let server = FsTestServer::start().await;
|
||||
let c = &server.client;
|
||||
|
||||
c.create_bucket().bucket("test").send().await.unwrap();
|
||||
|
||||
for i in 0..5 {
|
||||
c.put_object()
|
||||
.bucket("test")
|
||||
.key(format!("item-{i:02}"))
|
||||
.body(aws_sdk_s3::primitives::ByteStream::from_static(b"data"))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// List all
|
||||
let resp = c
|
||||
.list_objects_v2()
|
||||
.bucket("test")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.key_count(), Some(5));
|
||||
|
||||
// List with prefix
|
||||
let resp = c
|
||||
.list_objects_v2()
|
||||
.bucket("test")
|
||||
.prefix("item-03")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.key_count(), Some(1));
|
||||
|
||||
// Cleanup
|
||||
for i in 0..5 {
|
||||
c.delete_object()
|
||||
.bucket("test")
|
||||
.key(format!("item-{i:02}"))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
c.delete_bucket().bucket("test").send().await.unwrap();
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fs_user_metadata() {
|
||||
let server = FsTestServer::start().await;
|
||||
let c = &server.client;
|
||||
|
||||
c.create_bucket().bucket("test").send().await.unwrap();
|
||||
|
||||
c.put_object()
|
||||
.bucket("test")
|
||||
.key("meta.txt")
|
||||
.metadata("author", "test-user")
|
||||
.metadata("version", "1")
|
||||
.body(aws_sdk_s3::primitives::ByteStream::from_static(b"data"))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let head = c
|
||||
.head_object()
|
||||
.bucket("test")
|
||||
.key("meta.txt")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let meta = head.metadata().unwrap();
|
||||
assert_eq!(meta.get("author").unwrap(), "test-user");
|
||||
assert_eq!(meta.get("version").unwrap(), "1");
|
||||
|
||||
// Cleanup
|
||||
c.delete_object()
|
||||
.bucket("test")
|
||||
.key("meta.txt")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
c.delete_bucket().bucket("test").send().await.unwrap();
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fs_multipart_upload() {
|
||||
let server = FsTestServer::start().await;
|
||||
let c = &server.client;
|
||||
|
||||
c.create_bucket().bucket("test").send().await.unwrap();
|
||||
|
||||
// Create multipart upload
|
||||
let create = c
|
||||
.create_multipart_upload()
|
||||
.bucket("test")
|
||||
.key("big.bin")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
let upload_id = create.upload_id().unwrap();
|
||||
|
||||
// Upload parts (non-last parts must be >= 5 MB per S3 spec)
|
||||
let min_part = 5 * 1024 * 1024;
|
||||
let part1 = c
|
||||
.upload_part()
|
||||
.bucket("test")
|
||||
.key("big.bin")
|
||||
.upload_id(upload_id)
|
||||
.part_number(1)
|
||||
.body(aws_sdk_s3::primitives::ByteStream::from(vec![0xAAu8; min_part]))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let part2 = c
|
||||
.upload_part()
|
||||
.bucket("test")
|
||||
.key("big.bin")
|
||||
.upload_id(upload_id)
|
||||
.part_number(2)
|
||||
.body(aws_sdk_s3::primitives::ByteStream::from(vec![0xBBu8; 1024]))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Complete
|
||||
let completed = CompletedMultipartUpload::builder()
|
||||
.parts(
|
||||
CompletedPart::builder()
|
||||
.part_number(1)
|
||||
.e_tag(part1.e_tag().unwrap())
|
||||
.build(),
|
||||
)
|
||||
.parts(
|
||||
CompletedPart::builder()
|
||||
.part_number(2)
|
||||
.e_tag(part2.e_tag().unwrap())
|
||||
.build(),
|
||||
)
|
||||
.build();
|
||||
|
||||
let complete_resp = c
|
||||
.complete_multipart_upload()
|
||||
.bucket("test")
|
||||
.key("big.bin")
|
||||
.upload_id(upload_id)
|
||||
.multipart_upload(completed)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Verify compound ETag
|
||||
let etag = complete_resp.e_tag().unwrap();
|
||||
assert!(etag.contains("-2"), "Expected compound ETag, got: {etag}");
|
||||
|
||||
// Verify data
|
||||
let resp = c
|
||||
.get_object()
|
||||
.bucket("test")
|
||||
.key("big.bin")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
let body = resp.body.collect().await.unwrap().into_bytes();
|
||||
assert_eq!(body.len(), min_part + 1024);
|
||||
assert!(body[..min_part].iter().all(|b| *b == 0xAA));
|
||||
assert!(body[min_part..].iter().all(|b| *b == 0xBB));
|
||||
|
||||
// Cleanup
|
||||
c.delete_object()
|
||||
.bucket("test")
|
||||
.key("big.bin")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
c.delete_bucket().bucket("test").send().await.unwrap();
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fs_abort_multipart() {
|
||||
let server = FsTestServer::start().await;
|
||||
let c = &server.client;
|
||||
|
||||
c.create_bucket().bucket("test").send().await.unwrap();
|
||||
|
||||
let create = c
|
||||
.create_multipart_upload()
|
||||
.bucket("test")
|
||||
.key("aborted.bin")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
let upload_id = create.upload_id().unwrap();
|
||||
|
||||
// Upload a part
|
||||
c.upload_part()
|
||||
.bucket("test")
|
||||
.key("aborted.bin")
|
||||
.upload_id(upload_id)
|
||||
.part_number(1)
|
||||
.body(aws_sdk_s3::primitives::ByteStream::from(vec![0u8; 100]))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Abort
|
||||
c.abort_multipart_upload()
|
||||
.bucket("test")
|
||||
.key("aborted.bin")
|
||||
.upload_id(upload_id)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Verify no object was created
|
||||
let result = c
|
||||
.get_object()
|
||||
.bucket("test")
|
||||
.key("aborted.bin")
|
||||
.send()
|
||||
.await;
|
||||
assert!(result.is_err());
|
||||
|
||||
c.delete_bucket().bucket("test").send().await.unwrap();
|
||||
server.shutdown().await;
|
||||
}
|
||||
871
crates/post3-server/tests/s3_integration.rs
Normal file
871
crates/post3-server/tests/s3_integration.rs
Normal file
@@ -0,0 +1,871 @@
|
||||
mod common;
|
||||
|
||||
use aws_sdk_s3::primitives::ByteStream;
|
||||
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
|
||||
use common::TestServer;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_and_list_buckets() {
|
||||
let server = TestServer::start().await;
|
||||
|
||||
server
|
||||
.client
|
||||
.create_bucket()
|
||||
.bucket("test-bucket")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let resp = server.client.list_buckets().send().await.unwrap();
|
||||
let names: Vec<_> = resp
|
||||
.buckets()
|
||||
.iter()
|
||||
.filter_map(|b| b.name())
|
||||
.collect();
|
||||
assert!(names.contains(&"test-bucket"));
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_head_bucket() {
|
||||
let server = TestServer::start().await;
|
||||
|
||||
server
|
||||
.client
|
||||
.create_bucket()
|
||||
.bucket("hb-test")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
server
|
||||
.client
|
||||
.head_bucket()
|
||||
.bucket("hb-test")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let err = server
|
||||
.client
|
||||
.head_bucket()
|
||||
.bucket("no-such-bucket")
|
||||
.send()
|
||||
.await;
|
||||
assert!(err.is_err());
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_bucket() {
|
||||
let server = TestServer::start().await;
|
||||
|
||||
server
|
||||
.client
|
||||
.create_bucket()
|
||||
.bucket("to-delete")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
server
|
||||
.client
|
||||
.delete_bucket()
|
||||
.bucket("to-delete")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let err = server
|
||||
.client
|
||||
.head_bucket()
|
||||
.bucket("to-delete")
|
||||
.send()
|
||||
.await;
|
||||
assert!(err.is_err());
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_put_and_get_object() {
|
||||
let server = TestServer::start().await;
|
||||
|
||||
server
|
||||
.client
|
||||
.create_bucket()
|
||||
.bucket("data")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let body = ByteStream::from_static(b"hello world");
|
||||
server
|
||||
.client
|
||||
.put_object()
|
||||
.bucket("data")
|
||||
.key("greeting.txt")
|
||||
.content_type("text/plain")
|
||||
.body(body)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let resp = server
|
||||
.client
|
||||
.get_object()
|
||||
.bucket("data")
|
||||
.key("greeting.txt")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let content_type = resp.content_type().map(|s| s.to_string());
|
||||
let bytes = resp.body.collect().await.unwrap().into_bytes();
|
||||
assert_eq!(bytes.as_ref(), b"hello world");
|
||||
assert_eq!(content_type.as_deref(), Some("text/plain"));
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_put_large_object_chunked() {
|
||||
let server = TestServer::start().await;
|
||||
|
||||
server
|
||||
.client
|
||||
.create_bucket()
|
||||
.bucket("large")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// 3 MiB object => should be split into 3 blocks at 1 MiB each
|
||||
let data = vec![0x42u8; 3 * 1024 * 1024];
|
||||
let body = ByteStream::from(data.clone());
|
||||
server
|
||||
.client
|
||||
.put_object()
|
||||
.bucket("large")
|
||||
.key("big-file.bin")
|
||||
.body(body)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let resp = server
|
||||
.client
|
||||
.get_object()
|
||||
.bucket("large")
|
||||
.key("big-file.bin")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let bytes = resp.body.collect().await.unwrap().into_bytes();
|
||||
assert_eq!(bytes.len(), 3 * 1024 * 1024);
|
||||
assert_eq!(bytes.as_ref(), data.as_slice());
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_head_object() {
|
||||
let server = TestServer::start().await;
|
||||
|
||||
server
|
||||
.client
|
||||
.create_bucket()
|
||||
.bucket("meta")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let body = ByteStream::from_static(b"test");
|
||||
server
|
||||
.client
|
||||
.put_object()
|
||||
.bucket("meta")
|
||||
.key("file.txt")
|
||||
.body(body)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let resp = server
|
||||
.client
|
||||
.head_object()
|
||||
.bucket("meta")
|
||||
.key("file.txt")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(resp.content_length(), Some(4));
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_object() {
|
||||
let server = TestServer::start().await;
|
||||
|
||||
server
|
||||
.client
|
||||
.create_bucket()
|
||||
.bucket("del")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let body = ByteStream::from_static(b"bye");
|
||||
server
|
||||
.client
|
||||
.put_object()
|
||||
.bucket("del")
|
||||
.key("gone.txt")
|
||||
.body(body)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
server
|
||||
.client
|
||||
.delete_object()
|
||||
.bucket("del")
|
||||
.key("gone.txt")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let err = server
|
||||
.client
|
||||
.get_object()
|
||||
.bucket("del")
|
||||
.key("gone.txt")
|
||||
.send()
|
||||
.await;
|
||||
assert!(err.is_err());
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_list_objects_v2() {
|
||||
let server = TestServer::start().await;
|
||||
|
||||
server
|
||||
.client
|
||||
.create_bucket()
|
||||
.bucket("list-test")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
for i in 0..5 {
|
||||
let body = ByteStream::from_static(b"x");
|
||||
server
|
||||
.client
|
||||
.put_object()
|
||||
.bucket("list-test")
|
||||
.key(format!("prefix/file-{i}.txt"))
|
||||
.body(body)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let resp = server
|
||||
.client
|
||||
.list_objects_v2()
|
||||
.bucket("list-test")
|
||||
.prefix("prefix/")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(resp.key_count(), Some(5));
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_overwrite_object() {
|
||||
let server = TestServer::start().await;
|
||||
|
||||
server
|
||||
.client
|
||||
.create_bucket()
|
||||
.bucket("ow")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let body1 = ByteStream::from_static(b"version1");
|
||||
server
|
||||
.client
|
||||
.put_object()
|
||||
.bucket("ow")
|
||||
.key("file.txt")
|
||||
.body(body1)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let body2 = ByteStream::from_static(b"version2-longer");
|
||||
server
|
||||
.client
|
||||
.put_object()
|
||||
.bucket("ow")
|
||||
.key("file.txt")
|
||||
.body(body2)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let resp = server
|
||||
.client
|
||||
.get_object()
|
||||
.bucket("ow")
|
||||
.key("file.txt")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let bytes = resp.body.collect().await.unwrap().into_bytes();
|
||||
assert_eq!(bytes.as_ref(), b"version2-longer");
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_user_metadata_roundtrip() {
|
||||
let server = TestServer::start().await;
|
||||
|
||||
server
|
||||
.client
|
||||
.create_bucket()
|
||||
.bucket("meta-test")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let body = ByteStream::from_static(b"with metadata");
|
||||
server
|
||||
.client
|
||||
.put_object()
|
||||
.bucket("meta-test")
|
||||
.key("doc.txt")
|
||||
.body(body)
|
||||
.metadata("author", "test-user")
|
||||
.metadata("version", "42")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let resp = server
|
||||
.client
|
||||
.head_object()
|
||||
.bucket("meta-test")
|
||||
.key("doc.txt")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let meta = resp.metadata().unwrap();
|
||||
assert_eq!(meta.get("author").map(|s| s.as_str()), Some("test-user"));
|
||||
assert_eq!(meta.get("version").map(|s| s.as_str()), Some("42"));
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
// --- Multipart upload tests ---
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multipart_upload_basic() {
|
||||
let server = TestServer::start().await;
|
||||
|
||||
server
|
||||
.client
|
||||
.create_bucket()
|
||||
.bucket("mp-basic")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Create multipart upload
|
||||
let create_resp = server
|
||||
.client
|
||||
.create_multipart_upload()
|
||||
.bucket("mp-basic")
|
||||
.key("large-file.bin")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
let upload_id = create_resp.upload_id().unwrap().to_string();
|
||||
|
||||
// Upload 3 parts (non-last parts must be >= 5 MB per S3 spec)
|
||||
let min_part = 5 * 1024 * 1024;
|
||||
let part1_data = vec![0x11u8; min_part];
|
||||
let part2_data = vec![0x22u8; min_part];
|
||||
let part3_data = vec![0x33u8; 1024 * 1024];
|
||||
|
||||
let p1 = server
|
||||
.client
|
||||
.upload_part()
|
||||
.bucket("mp-basic")
|
||||
.key("large-file.bin")
|
||||
.upload_id(&upload_id)
|
||||
.part_number(1)
|
||||
.body(ByteStream::from(part1_data.clone()))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let p2 = server
|
||||
.client
|
||||
.upload_part()
|
||||
.bucket("mp-basic")
|
||||
.key("large-file.bin")
|
||||
.upload_id(&upload_id)
|
||||
.part_number(2)
|
||||
.body(ByteStream::from(part2_data.clone()))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let p3 = server
|
||||
.client
|
||||
.upload_part()
|
||||
.bucket("mp-basic")
|
||||
.key("large-file.bin")
|
||||
.upload_id(&upload_id)
|
||||
.part_number(3)
|
||||
.body(ByteStream::from(part3_data.clone()))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Complete multipart upload
|
||||
let completed = CompletedMultipartUpload::builder()
|
||||
.parts(
|
||||
CompletedPart::builder()
|
||||
.part_number(1)
|
||||
.e_tag(p1.e_tag().unwrap())
|
||||
.build(),
|
||||
)
|
||||
.parts(
|
||||
CompletedPart::builder()
|
||||
.part_number(2)
|
||||
.e_tag(p2.e_tag().unwrap())
|
||||
.build(),
|
||||
)
|
||||
.parts(
|
||||
CompletedPart::builder()
|
||||
.part_number(3)
|
||||
.e_tag(p3.e_tag().unwrap())
|
||||
.build(),
|
||||
)
|
||||
.build();
|
||||
|
||||
let complete_resp = server
|
||||
.client
|
||||
.complete_multipart_upload()
|
||||
.bucket("mp-basic")
|
||||
.key("large-file.bin")
|
||||
.upload_id(&upload_id)
|
||||
.multipart_upload(completed)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Verify ETag is compound format (hex-3)
|
||||
let etag = complete_resp.e_tag().unwrap();
|
||||
assert!(etag.contains("-3"), "Expected compound ETag, got: {etag}");
|
||||
|
||||
// Get and verify assembled data
|
||||
let get_resp = server
|
||||
.client
|
||||
.get_object()
|
||||
.bucket("mp-basic")
|
||||
.key("large-file.bin")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let body = get_resp.body.collect().await.unwrap().into_bytes();
|
||||
assert_eq!(body.len(), min_part * 2 + 1024 * 1024);
|
||||
|
||||
let mut expected = Vec::new();
|
||||
expected.extend_from_slice(&part1_data);
|
||||
expected.extend_from_slice(&part2_data);
|
||||
expected.extend_from_slice(&part3_data);
|
||||
assert_eq!(body.as_ref(), expected.as_slice());
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_abort_multipart_upload() {
|
||||
let server = TestServer::start().await;
|
||||
|
||||
server
|
||||
.client
|
||||
.create_bucket()
|
||||
.bucket("mp-abort")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let create_resp = server
|
||||
.client
|
||||
.create_multipart_upload()
|
||||
.bucket("mp-abort")
|
||||
.key("aborted.bin")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
let upload_id = create_resp.upload_id().unwrap().to_string();
|
||||
|
||||
// Upload a part
|
||||
server
|
||||
.client
|
||||
.upload_part()
|
||||
.bucket("mp-abort")
|
||||
.key("aborted.bin")
|
||||
.upload_id(&upload_id)
|
||||
.part_number(1)
|
||||
.body(ByteStream::from(vec![0xAAu8; 1024]))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Abort
|
||||
server
|
||||
.client
|
||||
.abort_multipart_upload()
|
||||
.bucket("mp-abort")
|
||||
.key("aborted.bin")
|
||||
.upload_id(&upload_id)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Verify object doesn't exist
|
||||
let err = server
|
||||
.client
|
||||
.get_object()
|
||||
.bucket("mp-abort")
|
||||
.key("aborted.bin")
|
||||
.send()
|
||||
.await;
|
||||
assert!(err.is_err());
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_list_parts() {
|
||||
let server = TestServer::start().await;
|
||||
|
||||
server
|
||||
.client
|
||||
.create_bucket()
|
||||
.bucket("mp-list-parts")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let create_resp = server
|
||||
.client
|
||||
.create_multipart_upload()
|
||||
.bucket("mp-list-parts")
|
||||
.key("parts.bin")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
let upload_id = create_resp.upload_id().unwrap().to_string();
|
||||
|
||||
// Upload 3 parts
|
||||
for i in 1..=3 {
|
||||
server
|
||||
.client
|
||||
.upload_part()
|
||||
.bucket("mp-list-parts")
|
||||
.key("parts.bin")
|
||||
.upload_id(&upload_id)
|
||||
.part_number(i)
|
||||
.body(ByteStream::from(vec![i as u8; 1024 * 100]))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// List parts
|
||||
let list_resp = server
|
||||
.client
|
||||
.list_parts()
|
||||
.bucket("mp-list-parts")
|
||||
.key("parts.bin")
|
||||
.upload_id(&upload_id)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let parts = list_resp.parts();
|
||||
assert_eq!(parts.len(), 3);
|
||||
assert_eq!(parts[0].part_number(), Some(1));
|
||||
assert_eq!(parts[1].part_number(), Some(2));
|
||||
assert_eq!(parts[2].part_number(), Some(3));
|
||||
for p in parts {
|
||||
assert_eq!(p.size(), Some(1024 * 100));
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
server
|
||||
.client
|
||||
.abort_multipart_upload()
|
||||
.bucket("mp-list-parts")
|
||||
.key("parts.bin")
|
||||
.upload_id(&upload_id)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_list_multipart_uploads() {
|
||||
let server = TestServer::start().await;
|
||||
|
||||
server
|
||||
.client
|
||||
.create_bucket()
|
||||
.bucket("mp-list-uploads")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Create two uploads
|
||||
let u1 = server
|
||||
.client
|
||||
.create_multipart_upload()
|
||||
.bucket("mp-list-uploads")
|
||||
.key("file-a.bin")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
let u1_id = u1.upload_id().unwrap().to_string();
|
||||
|
||||
let u2 = server
|
||||
.client
|
||||
.create_multipart_upload()
|
||||
.bucket("mp-list-uploads")
|
||||
.key("file-b.bin")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
let u2_id = u2.upload_id().unwrap().to_string();
|
||||
|
||||
// List multipart uploads
|
||||
let list_resp = server
|
||||
.client
|
||||
.list_multipart_uploads()
|
||||
.bucket("mp-list-uploads")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let uploads = list_resp.uploads();
|
||||
assert_eq!(uploads.len(), 2);
|
||||
|
||||
let keys: Vec<&str> = uploads.iter().filter_map(|u| u.key()).collect();
|
||||
assert!(keys.contains(&"file-a.bin"));
|
||||
assert!(keys.contains(&"file-b.bin"));
|
||||
|
||||
// Cleanup
|
||||
server
|
||||
.client
|
||||
.abort_multipart_upload()
|
||||
.bucket("mp-list-uploads")
|
||||
.key("file-a.bin")
|
||||
.upload_id(&u1_id)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
server
|
||||
.client
|
||||
.abort_multipart_upload()
|
||||
.bucket("mp-list-uploads")
|
||||
.key("file-b.bin")
|
||||
.upload_id(&u2_id)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_overwrite_part() {
|
||||
let server = TestServer::start().await;
|
||||
|
||||
server
|
||||
.client
|
||||
.create_bucket()
|
||||
.bucket("mp-overwrite")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let create_resp = server
|
||||
.client
|
||||
.create_multipart_upload()
|
||||
.bucket("mp-overwrite")
|
||||
.key("ow.bin")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
let upload_id = create_resp.upload_id().unwrap().to_string();
|
||||
|
||||
// Upload part 1 with data A
|
||||
server
|
||||
.client
|
||||
.upload_part()
|
||||
.bucket("mp-overwrite")
|
||||
.key("ow.bin")
|
||||
.upload_id(&upload_id)
|
||||
.part_number(1)
|
||||
.body(ByteStream::from(vec![0xAAu8; 1024]))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Re-upload part 1 with data B
|
||||
let p1 = server
|
||||
.client
|
||||
.upload_part()
|
||||
.bucket("mp-overwrite")
|
||||
.key("ow.bin")
|
||||
.upload_id(&upload_id)
|
||||
.part_number(1)
|
||||
.body(ByteStream::from(vec![0xBBu8; 1024]))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Complete with the latest etag
|
||||
let completed = CompletedMultipartUpload::builder()
|
||||
.parts(
|
||||
CompletedPart::builder()
|
||||
.part_number(1)
|
||||
.e_tag(p1.e_tag().unwrap())
|
||||
.build(),
|
||||
)
|
||||
.build();
|
||||
|
||||
server
|
||||
.client
|
||||
.complete_multipart_upload()
|
||||
.bucket("mp-overwrite")
|
||||
.key("ow.bin")
|
||||
.upload_id(&upload_id)
|
||||
.multipart_upload(completed)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Verify data B
|
||||
let get_resp = server
|
||||
.client
|
||||
.get_object()
|
||||
.bucket("mp-overwrite")
|
||||
.key("ow.bin")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let body = get_resp.body.collect().await.unwrap().into_bytes();
|
||||
assert_eq!(body.as_ref(), vec![0xBBu8; 1024].as_slice());
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multipart_with_metadata() {
|
||||
let server = TestServer::start().await;
|
||||
|
||||
server
|
||||
.client
|
||||
.create_bucket()
|
||||
.bucket("mp-meta")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Create multipart upload with metadata
|
||||
let create_resp = server
|
||||
.client
|
||||
.create_multipart_upload()
|
||||
.bucket("mp-meta")
|
||||
.key("meta-file.bin")
|
||||
.metadata("author", "test-user")
|
||||
.metadata("version", "7")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
let upload_id = create_resp.upload_id().unwrap().to_string();
|
||||
|
||||
// Upload one part
|
||||
let p1 = server
|
||||
.client
|
||||
.upload_part()
|
||||
.bucket("mp-meta")
|
||||
.key("meta-file.bin")
|
||||
.upload_id(&upload_id)
|
||||
.part_number(1)
|
||||
.body(ByteStream::from(vec![0xFFu8; 512]))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Complete
|
||||
let completed = CompletedMultipartUpload::builder()
|
||||
.parts(
|
||||
CompletedPart::builder()
|
||||
.part_number(1)
|
||||
.e_tag(p1.e_tag().unwrap())
|
||||
.build(),
|
||||
)
|
||||
.build();
|
||||
|
||||
server
|
||||
.client
|
||||
.complete_multipart_upload()
|
||||
.bucket("mp-meta")
|
||||
.key("meta-file.bin")
|
||||
.upload_id(&upload_id)
|
||||
.multipart_upload(completed)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Head object — verify metadata came through
|
||||
let head = server
|
||||
.client
|
||||
.head_object()
|
||||
.bucket("mp-meta")
|
||||
.key("meta-file.bin")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let meta = head.metadata().unwrap();
|
||||
assert_eq!(meta.get("author").map(|s| s.as_str()), Some("test-user"));
|
||||
assert_eq!(meta.get("version").map(|s| s.as_str()), Some("7"));
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
Reference in New Issue
Block a user