168 lines
5.2 KiB
Rust
168 lines
5.2 KiB
Rust
use std::path::PathBuf;
|
|
use std::sync::Arc;
|
|
use std::time::Instant;
|
|
|
|
use sq_models::WalConfig;
|
|
use sq_sim::SimClock;
|
|
use sq_sim::fs::InMemoryFileSystem;
|
|
use sq_storage::engine::StorageEngine;
|
|
|
|
fn bench_write_throughput(payload_size: usize, msg_count: u64) {
|
|
let fs = Arc::new(InMemoryFileSystem::new());
|
|
let clock = Arc::new(SimClock::new());
|
|
let config = WalConfig {
|
|
max_segment_bytes: 256 * 1024 * 1024, // 256MB to avoid rotation overhead
|
|
max_segment_age_secs: 3600,
|
|
data_dir: PathBuf::from("/data"),
|
|
..Default::default()
|
|
};
|
|
let engine = StorageEngine::new(fs, clock, config).unwrap();
|
|
|
|
let payload = vec![b'x'; payload_size];
|
|
|
|
let start = Instant::now();
|
|
for i in 0..msg_count {
|
|
engine.append("bench", 0, None, &payload, &[], i).unwrap();
|
|
}
|
|
let elapsed = start.elapsed();
|
|
|
|
let msgs_per_sec = msg_count as f64 / elapsed.as_secs_f64();
|
|
let mb_per_sec = (msg_count as f64 * payload_size as f64) / elapsed.as_secs_f64() / 1_048_576.0;
|
|
|
|
println!(
|
|
" write {msg_count} x {payload_size}B: {:.0} msg/s, {:.1} MB/s ({:.2?})",
|
|
msgs_per_sec, mb_per_sec, elapsed
|
|
);
|
|
}
|
|
|
|
fn bench_read_throughput(payload_size: usize, msg_count: u64) {
|
|
let fs = Arc::new(InMemoryFileSystem::new());
|
|
let clock = Arc::new(SimClock::new());
|
|
let config = WalConfig {
|
|
max_segment_bytes: 256 * 1024 * 1024,
|
|
max_segment_age_secs: 3600,
|
|
data_dir: PathBuf::from("/data"),
|
|
..Default::default()
|
|
};
|
|
let engine = StorageEngine::new(fs, clock, config).unwrap();
|
|
|
|
let payload = vec![b'x'; payload_size];
|
|
for i in 0..msg_count {
|
|
engine.append("bench", 0, None, &payload, &[], i).unwrap();
|
|
}
|
|
|
|
let start = Instant::now();
|
|
let messages = engine.read("bench", 0, 0, msg_count as usize + 1).unwrap();
|
|
let elapsed = start.elapsed();
|
|
|
|
assert_eq!(messages.len(), msg_count as usize);
|
|
|
|
let msgs_per_sec = msg_count as f64 / elapsed.as_secs_f64();
|
|
let mb_per_sec = (msg_count as f64 * payload_size as f64) / elapsed.as_secs_f64() / 1_048_576.0;
|
|
|
|
println!(
|
|
" read {msg_count} x {payload_size}B: {:.0} msg/s, {:.1} MB/s ({:.2?})",
|
|
msgs_per_sec, mb_per_sec, elapsed
|
|
);
|
|
}
|
|
|
|
fn bench_compression_ratio(payload_size: usize, msg_count: usize) {
|
|
// Build a WAL segment worth of data.
|
|
let mut raw_data = Vec::new();
|
|
for i in 0..msg_count {
|
|
let payload = format!("message-{i}-{}", "x".repeat(payload_size));
|
|
raw_data.extend_from_slice(payload.as_bytes());
|
|
}
|
|
|
|
let compressed = zstd::encode_all(raw_data.as_slice(), 3).unwrap();
|
|
let ratio = raw_data.len() as f64 / compressed.len() as f64;
|
|
|
|
println!(
|
|
" compress {msg_count} x ~{payload_size}B: {} -> {} ({:.2}x ratio)",
|
|
format_bytes(raw_data.len()),
|
|
format_bytes(compressed.len()),
|
|
ratio
|
|
);
|
|
|
|
// Verify roundtrip.
|
|
let decompressed = zstd::decode_all(compressed.as_slice()).unwrap();
|
|
assert_eq!(decompressed.len(), raw_data.len());
|
|
}
|
|
|
|
fn bench_recovery(msg_count: u64) {
|
|
let fs = Arc::new(InMemoryFileSystem::new());
|
|
let clock = Arc::new(SimClock::new());
|
|
let config = WalConfig {
|
|
max_segment_bytes: 64 * 1024, // Small segments to test multi-segment recovery
|
|
max_segment_age_secs: 3600,
|
|
data_dir: PathBuf::from("/data"),
|
|
..Default::default()
|
|
};
|
|
|
|
// Write messages.
|
|
{
|
|
let engine = StorageEngine::new(fs.clone(), clock.clone(), config.clone()).unwrap();
|
|
for i in 0..msg_count {
|
|
engine
|
|
.append("bench", 0, None, format!("msg-{i}").as_bytes(), &[], i)
|
|
.unwrap();
|
|
}
|
|
}
|
|
|
|
// Recover and measure.
|
|
let start = Instant::now();
|
|
let engine = StorageEngine::new(fs, clock, config).unwrap();
|
|
engine.recover().unwrap();
|
|
let elapsed = start.elapsed();
|
|
|
|
let msgs_per_sec = msg_count as f64 / elapsed.as_secs_f64();
|
|
|
|
println!(
|
|
" recover {msg_count} msgs: {:.0} msg/s ({:.2?})",
|
|
msgs_per_sec, elapsed
|
|
);
|
|
|
|
// Verify correctness.
|
|
let messages = engine.read("bench", 0, 0, msg_count as usize + 1).unwrap();
|
|
assert_eq!(messages.len(), msg_count as usize);
|
|
}
|
|
|
|
fn format_bytes(bytes: usize) -> String {
|
|
if bytes >= 1_048_576 {
|
|
format!("{:.1}MB", bytes as f64 / 1_048_576.0)
|
|
} else if bytes >= 1024 {
|
|
format!("{:.1}KB", bytes as f64 / 1024.0)
|
|
} else {
|
|
format!("{bytes}B")
|
|
}
|
|
}
|
|
|
|
fn main() {
|
|
println!("=== SQ Storage Engine Benchmarks ===\n");
|
|
|
|
println!("Write throughput:");
|
|
bench_write_throughput(64, 100_000);
|
|
bench_write_throughput(256, 100_000);
|
|
bench_write_throughput(1024, 50_000);
|
|
bench_write_throughput(4096, 10_000);
|
|
|
|
println!("\nRead throughput:");
|
|
bench_read_throughput(64, 100_000);
|
|
bench_read_throughput(256, 100_000);
|
|
bench_read_throughput(1024, 50_000);
|
|
bench_read_throughput(4096, 10_000);
|
|
|
|
println!("\nCompression ratio:");
|
|
bench_compression_ratio(64, 10_000);
|
|
bench_compression_ratio(256, 10_000);
|
|
bench_compression_ratio(1024, 5_000);
|
|
bench_compression_ratio(4096, 1_000);
|
|
|
|
println!("\nRecovery performance:");
|
|
bench_recovery(1_000);
|
|
bench_recovery(10_000);
|
|
bench_recovery(50_000);
|
|
|
|
println!("\n=== Done ===");
|
|
}
|