feat: add capnp

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
2026-02-27 12:15:35 +01:00
parent 3162971c89
commit 749ae245c7
115 changed files with 16596 additions and 31 deletions

View File

@@ -8,10 +8,20 @@ sq-models = { workspace = true }
sq-sim = { workspace = true }
anyhow = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
crc32fast = { workspace = true }
bytes = { workspace = true }
futures = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
zstd = { workspace = true }
object_store = { workspace = true }
[dev-dependencies]
tokio = { workspace = true, features = ["full", "test-util"] }
[[bench]]
name = "throughput"
harness = false

View File

@@ -0,0 +1,167 @@
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use sq_models::WalConfig;
use sq_sim::SimClock;
use sq_sim::fs::InMemoryFileSystem;
use sq_storage::engine::StorageEngine;
fn bench_write_throughput(payload_size: usize, msg_count: u64) {
let fs = Arc::new(InMemoryFileSystem::new());
let clock = Arc::new(SimClock::new());
let config = WalConfig {
max_segment_bytes: 256 * 1024 * 1024, // 256MB to avoid rotation overhead
max_segment_age_secs: 3600,
data_dir: PathBuf::from("/data"),
..Default::default()
};
let engine = StorageEngine::new(fs, clock, config).unwrap();
let payload = vec![b'x'; payload_size];
let start = Instant::now();
for i in 0..msg_count {
engine.append("bench", 0, None, &payload, &[], i).unwrap();
}
let elapsed = start.elapsed();
let msgs_per_sec = msg_count as f64 / elapsed.as_secs_f64();
let mb_per_sec = (msg_count as f64 * payload_size as f64) / elapsed.as_secs_f64() / 1_048_576.0;
println!(
" write {msg_count} x {payload_size}B: {:.0} msg/s, {:.1} MB/s ({:.2?})",
msgs_per_sec, mb_per_sec, elapsed
);
}
fn bench_read_throughput(payload_size: usize, msg_count: u64) {
let fs = Arc::new(InMemoryFileSystem::new());
let clock = Arc::new(SimClock::new());
let config = WalConfig {
max_segment_bytes: 256 * 1024 * 1024,
max_segment_age_secs: 3600,
data_dir: PathBuf::from("/data"),
..Default::default()
};
let engine = StorageEngine::new(fs, clock, config).unwrap();
let payload = vec![b'x'; payload_size];
for i in 0..msg_count {
engine.append("bench", 0, None, &payload, &[], i).unwrap();
}
let start = Instant::now();
let messages = engine.read("bench", 0, 0, msg_count as usize + 1).unwrap();
let elapsed = start.elapsed();
assert_eq!(messages.len(), msg_count as usize);
let msgs_per_sec = msg_count as f64 / elapsed.as_secs_f64();
let mb_per_sec = (msg_count as f64 * payload_size as f64) / elapsed.as_secs_f64() / 1_048_576.0;
println!(
" read {msg_count} x {payload_size}B: {:.0} msg/s, {:.1} MB/s ({:.2?})",
msgs_per_sec, mb_per_sec, elapsed
);
}
fn bench_compression_ratio(payload_size: usize, msg_count: usize) {
// Build a WAL segment worth of data.
let mut raw_data = Vec::new();
for i in 0..msg_count {
let payload = format!("message-{i}-{}", "x".repeat(payload_size));
raw_data.extend_from_slice(payload.as_bytes());
}
let compressed = zstd::encode_all(raw_data.as_slice(), 3).unwrap();
let ratio = raw_data.len() as f64 / compressed.len() as f64;
println!(
" compress {msg_count} x ~{payload_size}B: {} -> {} ({:.2}x ratio)",
format_bytes(raw_data.len()),
format_bytes(compressed.len()),
ratio
);
// Verify roundtrip.
let decompressed = zstd::decode_all(compressed.as_slice()).unwrap();
assert_eq!(decompressed.len(), raw_data.len());
}
fn bench_recovery(msg_count: u64) {
let fs = Arc::new(InMemoryFileSystem::new());
let clock = Arc::new(SimClock::new());
let config = WalConfig {
max_segment_bytes: 64 * 1024, // Small segments to test multi-segment recovery
max_segment_age_secs: 3600,
data_dir: PathBuf::from("/data"),
..Default::default()
};
// Write messages.
{
let engine = StorageEngine::new(fs.clone(), clock.clone(), config.clone()).unwrap();
for i in 0..msg_count {
engine
.append("bench", 0, None, format!("msg-{i}").as_bytes(), &[], i)
.unwrap();
}
}
// Recover and measure.
let start = Instant::now();
let engine = StorageEngine::new(fs, clock, config).unwrap();
engine.recover().unwrap();
let elapsed = start.elapsed();
let msgs_per_sec = msg_count as f64 / elapsed.as_secs_f64();
println!(
" recover {msg_count} msgs: {:.0} msg/s ({:.2?})",
msgs_per_sec, elapsed
);
// Verify correctness.
let messages = engine.read("bench", 0, 0, msg_count as usize + 1).unwrap();
assert_eq!(messages.len(), msg_count as usize);
}
fn format_bytes(bytes: usize) -> String {
if bytes >= 1_048_576 {
format!("{:.1}MB", bytes as f64 / 1_048_576.0)
} else if bytes >= 1024 {
format!("{:.1}KB", bytes as f64 / 1024.0)
} else {
format!("{bytes}B")
}
}
fn main() {
println!("=== SQ Storage Engine Benchmarks ===\n");
println!("Write throughput:");
bench_write_throughput(64, 100_000);
bench_write_throughput(256, 100_000);
bench_write_throughput(1024, 50_000);
bench_write_throughput(4096, 10_000);
println!("\nRead throughput:");
bench_read_throughput(64, 100_000);
bench_read_throughput(256, 100_000);
bench_read_throughput(1024, 50_000);
bench_read_throughput(4096, 10_000);
println!("\nCompression ratio:");
bench_compression_ratio(64, 10_000);
bench_compression_ratio(256, 10_000);
bench_compression_ratio(1024, 5_000);
bench_compression_ratio(4096, 1_000);
println!("\nRecovery performance:");
bench_recovery(1_000);
bench_recovery(10_000);
bench_recovery(50_000);
println!("\n=== Done ===");
}

View File

@@ -0,0 +1,193 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use sq_sim::fs::FileSystem;
/// Key for consumer offset tracking: (consumer_group, topic, partition).
type OffsetKey = (String, String, u32);
/// Stores committed consumer group offsets.
/// Offsets are kept in memory and periodically persisted to a file.
pub struct ConsumerOffsets<F: FileSystem> {
offsets: HashMap<OffsetKey, u64>,
persist_path: PathBuf,
fs: Arc<F>,
}
impl<F: FileSystem> ConsumerOffsets<F> {
pub fn new(fs: Arc<F>, data_dir: &Path) -> Self {
let persist_path = data_dir.join("consumer_offsets.json");
Self {
offsets: HashMap::new(),
persist_path,
fs,
}
}
/// Commit an offset for a consumer group on a topic-partition.
pub fn commit(
&mut self,
group: &str,
topic: &str,
partition: u32,
offset: u64,
) -> anyhow::Result<()> {
let key = (group.to_string(), topic.to_string(), partition);
self.offsets.insert(key, offset);
self.persist()?;
Ok(())
}
/// Get the committed offset for a consumer group on a topic-partition.
pub fn get_committed(&self, group: &str, topic: &str, partition: u32) -> Option<u64> {
let key = (group.to_string(), topic.to_string(), partition);
self.offsets.get(&key).copied()
}
/// Persist offsets to disk as JSON.
fn persist(&self) -> anyhow::Result<()> {
// Serialize as a simple JSON array of entries.
let entries: Vec<OffsetEntry> = self
.offsets
.iter()
.map(|((group, topic, partition), offset)| OffsetEntry {
group: group.clone(),
topic: topic.clone(),
partition: *partition,
offset: *offset,
})
.collect();
let json = serde_json::to_vec(&entries)?;
// Ensure parent directory exists.
if let Some(parent) = self.persist_path.parent() {
self.fs.create_dir_all(parent)?;
}
let mut handle = self.fs.open_write(&self.persist_path)?;
handle.write_all(&json)?;
handle.fsync()?;
Ok(())
}
/// Load offsets from disk.
pub fn load(fs: Arc<F>, data_dir: &Path) -> anyhow::Result<Self> {
let persist_path = data_dir.join("consumer_offsets.json");
if !fs.exists(&persist_path) {
return Ok(Self {
offsets: HashMap::new(),
persist_path,
fs,
});
}
let mut handle = fs.open_read(&persist_path)?;
let mut buf = Vec::new();
handle.read_to_end(&mut buf)?;
let entries: Vec<OffsetEntry> = serde_json::from_slice(&buf)?;
let mut offsets = HashMap::new();
for entry in entries {
offsets.insert(
(entry.group, entry.topic, entry.partition),
entry.offset,
);
}
Ok(Self {
offsets,
persist_path,
fs,
})
}
}
#[derive(serde::Serialize, serde::Deserialize)]
struct OffsetEntry {
group: String,
topic: String,
partition: u32,
offset: u64,
}
#[cfg(test)]
mod tests {
use super::*;
use sq_sim::fs::InMemoryFileSystem;
fn test_offsets() -> ConsumerOffsets<InMemoryFileSystem> {
let fs = Arc::new(InMemoryFileSystem::new());
ConsumerOffsets::new(fs, Path::new("/data"))
}
#[test]
fn test_commit_and_get() {
let mut offsets = test_offsets();
offsets.commit("group-1", "orders", 0, 42).unwrap();
assert_eq!(offsets.get_committed("group-1", "orders", 0), Some(42));
assert_eq!(offsets.get_committed("group-1", "orders", 1), None);
assert_eq!(offsets.get_committed("group-2", "orders", 0), None);
}
#[test]
fn test_commit_overwrites() {
let mut offsets = test_offsets();
offsets.commit("g", "t", 0, 10).unwrap();
offsets.commit("g", "t", 0, 20).unwrap();
assert_eq!(offsets.get_committed("g", "t", 0), Some(20));
}
#[test]
fn test_multiple_groups() {
let mut offsets = test_offsets();
offsets.commit("g1", "t", 0, 100).unwrap();
offsets.commit("g2", "t", 0, 200).unwrap();
assert_eq!(offsets.get_committed("g1", "t", 0), Some(100));
assert_eq!(offsets.get_committed("g2", "t", 0), Some(200));
}
#[test]
fn test_persist_and_load() {
let fs = Arc::new(InMemoryFileSystem::new());
{
let mut offsets = ConsumerOffsets::new(fs.clone(), Path::new("/data"));
offsets.commit("g1", "orders", 0, 42).unwrap();
offsets.commit("g1", "events", 0, 100).unwrap();
offsets.commit("g2", "orders", 1, 55).unwrap();
}
let loaded = ConsumerOffsets::load(fs, Path::new("/data")).unwrap();
assert_eq!(loaded.get_committed("g1", "orders", 0), Some(42));
assert_eq!(loaded.get_committed("g1", "events", 0), Some(100));
assert_eq!(loaded.get_committed("g2", "orders", 1), Some(55));
assert_eq!(loaded.get_committed("g2", "orders", 0), None);
}
#[test]
fn test_load_nonexistent_file() {
let fs = Arc::new(InMemoryFileSystem::new());
let offsets = ConsumerOffsets::load(fs, Path::new("/data")).unwrap();
assert_eq!(offsets.get_committed("g", "t", 0), None);
}
#[test]
fn test_multiple_topics_and_partitions() {
let mut offsets = test_offsets();
offsets.commit("g", "t1", 0, 10).unwrap();
offsets.commit("g", "t1", 1, 20).unwrap();
offsets.commit("g", "t2", 0, 30).unwrap();
assert_eq!(offsets.get_committed("g", "t1", 0), Some(10));
assert_eq!(offsets.get_committed("g", "t1", 1), Some(20));
assert_eq!(offsets.get_committed("g", "t2", 0), Some(30));
}
}

View File

@@ -0,0 +1,634 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex, RwLock};
use sq_models::{ClosedSegment, Header, Message, TopicConfig, TopicName, WalConfig};
use sq_sim::fs::FileSystem;
use sq_sim::Clock;
use crate::consumer_offsets::ConsumerOffsets;
use crate::index::OffsetIndex;
use crate::topic_metadata::TopicMetadata;
use crate::wal::reader::WalReader;
use crate::wal::writer::{segment_dir, segment_path, WalWriter};
/// Unified storage engine wrapping WAL writers, readers, and offset index.
/// All methods take `&self` — concurrency is handled by fine-grained internal locks.
/// Different (topic, partition) writers can operate in parallel.
pub struct StorageEngine<F: FileSystem, C: Clock> {
fs: Arc<F>,
clock: Arc<C>,
config: WalConfig,
/// One writer per (topic, partition), each independently locked.
writers: RwLock<HashMap<(String, u32), Arc<Mutex<WalWriter<F, C>>>>>,
/// Offset index for fast seeks.
index: Mutex<OffsetIndex>,
/// Reader instance (stateless, no lock needed).
reader: WalReader<F>,
/// Consumer group offset tracking.
consumer_offsets: Mutex<ConsumerOffsets<F>>,
/// Topic metadata registry.
topic_metadata: Mutex<TopicMetadata<F>>,
}
impl<F: FileSystem, C: Clock> StorageEngine<F, C> {
pub fn new(fs: Arc<F>, clock: Arc<C>, config: WalConfig) -> anyhow::Result<Self> {
fs.create_dir_all(&config.data_dir)?;
let consumer_offsets = ConsumerOffsets::load(fs.clone(), &config.data_dir)?;
let topic_metadata = TopicMetadata::load(fs.clone(), &config.data_dir)?;
Ok(Self {
reader: WalReader::new(fs.clone()),
consumer_offsets: Mutex::new(consumer_offsets),
topic_metadata: Mutex::new(topic_metadata),
fs,
clock,
config,
writers: RwLock::new(HashMap::new()),
index: Mutex::new(OffsetIndex::new(1000)),
})
}
/// Append a message to the given topic-partition. Returns the assigned offset.
pub fn append(
&self,
topic: &str,
partition: u32,
key: Option<&[u8]>,
value: &[u8],
headers: &[Header],
timestamp_ms: u64,
) -> anyhow::Result<u64> {
let writer_arc = self.get_or_create_writer(topic, partition)?;
let mut writer = writer_arc.lock().unwrap();
let old_next = writer.next_offset();
let offset = writer.append(key, value, headers, timestamp_ms)?;
// Register the current segment in the index (for the first write).
if offset == old_next && offset == 0
|| (offset > 0 && {
let index = self.index.lock().unwrap();
index.segment_for_offset(topic, partition, offset).is_none()
})
{
let seg =
segment_path(&self.config.data_dir, &TopicName::from(topic), partition, offset);
let mut index = self.index.lock().unwrap();
index.register_segment(topic, partition, seg, offset, offset);
}
Ok(offset)
}
/// Append a batch of messages to a single topic-partition with one fsync.
/// Returns the assigned offsets.
pub fn append_batch(
&self,
topic: &str,
partition: u32,
messages: &[(Option<&[u8]>, &[u8], &[Header], u64)],
) -> anyhow::Result<Vec<u64>> {
if messages.is_empty() {
return Ok(vec![]);
}
let writer_arc = self.get_or_create_writer(topic, partition)?;
let mut writer = writer_arc.lock().unwrap();
let first_offset = writer.next_offset();
let offsets = writer.append_batch(messages)?;
// Register segment in index if this is a new segment.
{
let mut index = self.index.lock().unwrap();
if index
.segment_for_offset(topic, partition, first_offset)
.is_none()
{
let seg = segment_path(
&self.config.data_dir,
&TopicName::from(topic),
partition,
first_offset,
);
index.register_segment(topic, partition, seg, first_offset, first_offset);
}
}
Ok(offsets)
}
/// Force fsync on all active writer segment files.
pub fn fsync_all_writers(&self) -> anyhow::Result<()> {
let writers = self.writers.read().unwrap();
for writer_arc in writers.values() {
let mut writer = writer_arc.lock().unwrap();
writer.fsync()?;
}
Ok(())
}
/// Read messages from a topic-partition starting at `from_offset`.
/// Returns up to `limit` messages. Lock-free — reads directly from disk.
pub fn read(
&self,
topic: &str,
partition: u32,
from_offset: u64,
limit: usize,
) -> anyhow::Result<Vec<Message>> {
let topic_name = TopicName::from(topic);
let seg_dir = segment_dir(&self.config.data_dir, &topic_name, partition);
if !self.fs.exists(&seg_dir) {
return Ok(vec![]);
}
// List all segment files and sort them.
let mut segment_files: Vec<PathBuf> = self
.fs
.list_dir(&seg_dir)?
.into_iter()
.filter(|p| p.extension().map(|e| e == "wal").unwrap_or(false))
.collect();
segment_files.sort();
let mut result = Vec::new();
for seg_path in &segment_files {
if result.len() >= limit {
break;
}
let messages = self.reader.read_from_offset(seg_path, from_offset)?;
for msg in messages {
if result.len() >= limit {
break;
}
result.push(msg);
}
}
Ok(result)
}
/// Get the latest offset for a topic-partition (the next offset to be assigned).
pub fn latest_offset(&self, topic: &str, partition: u32) -> u64 {
let key = (topic.to_string(), partition);
let writers = self.writers.read().unwrap();
writers
.get(&key)
.map(|w| w.lock().unwrap().next_offset())
.unwrap_or(0)
}
/// Recover state from existing WAL files on disk.
/// Scans all segment files, rebuilds the index, and sets writers to the correct offset.
/// Must be called at startup before any concurrent access.
pub fn recover(&self) -> anyhow::Result<()> {
if !self.fs.exists(&self.config.data_dir) {
return Ok(());
}
// Scan for topic directories (skip files like consumer_offsets.json).
let topic_dirs: Vec<PathBuf> = self
.fs
.list_dir(&self.config.data_dir)?
.into_iter()
.filter(|p| {
// Skip entries that have a file extension (they are metadata files, not topic dirs).
p.extension().is_none()
})
.collect();
let mut writers = self.writers.write().unwrap();
let mut index = self.index.lock().unwrap();
for topic_dir in &topic_dirs {
let topic = topic_dir
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("")
.to_string();
if topic.is_empty() {
continue;
}
// Scan for partition directories (skip any non-directory entries).
let partition_dirs: Vec<PathBuf> = match self.fs.list_dir(topic_dir) {
Ok(entries) => entries,
Err(_) => continue, // Skip if not a directory.
};
for part_dir in &partition_dirs {
let partition: u32 = part_dir
.file_name()
.and_then(|n| n.to_str())
.and_then(|s| s.parse().ok())
.unwrap_or(0);
// Scan segment files.
let mut seg_files: Vec<PathBuf> = self
.fs
.list_dir(part_dir)?
.into_iter()
.filter(|p| p.extension().map(|e| e == "wal").unwrap_or(false))
.collect();
seg_files.sort();
let mut max_offset = 0u64;
for seg_path in &seg_files {
let messages = self.reader.read_segment(seg_path)?;
if let (Some(first), Some(last)) = (messages.first(), messages.last()) {
index.register_segment(
&topic,
partition,
seg_path.clone(),
first.offset,
last.offset,
);
max_offset = max_offset.max(last.offset + 1);
}
}
// Create a writer at the recovered offset.
if max_offset > 0 {
let writer = WalWriter::new(
self.fs.clone(),
self.clock.clone(),
self.config.clone(),
TopicName::from(topic.as_str()),
partition,
)?
.with_next_offset(max_offset);
writers.insert((topic.clone(), partition), Arc::new(Mutex::new(writer)));
}
}
}
Ok(())
}
/// Commit a consumer group offset.
pub fn commit_offset(
&self,
group: &str,
topic: &str,
partition: u32,
offset: u64,
) -> anyhow::Result<()> {
let mut offsets = self.consumer_offsets.lock().unwrap();
offsets.commit(group, topic, partition, offset)
}
/// Get the committed offset for a consumer group.
pub fn get_committed_offset(
&self,
group: &str,
topic: &str,
partition: u32,
) -> Option<u64> {
let offsets = self.consumer_offsets.lock().unwrap();
offsets.get_committed(group, topic, partition)
}
/// Create a topic in the metadata registry.
pub fn create_topic(&self, config: TopicConfig) -> anyhow::Result<()> {
let mut metadata = self.topic_metadata.lock().unwrap();
metadata.create_topic(config)
}
/// Delete a topic from the metadata registry.
pub fn delete_topic(&self, name: &str) -> anyhow::Result<()> {
let mut metadata = self.topic_metadata.lock().unwrap();
metadata.delete_topic(name)
}
/// List all topics. Returns owned configs (cannot return references through Mutex).
pub fn list_topics(&self) -> Vec<TopicConfig> {
let metadata = self.topic_metadata.lock().unwrap();
metadata.list_topics().into_iter().cloned().collect()
}
/// Get a specific topic's config. Returns owned config.
pub fn get_topic(&self, name: &str) -> Option<TopicConfig> {
let metadata = self.topic_metadata.lock().unwrap();
metadata.get_topic(name).cloned()
}
/// Check if a topic exists in the metadata registry.
pub fn topic_exists(&self, name: &str) -> bool {
let metadata = self.topic_metadata.lock().unwrap();
metadata.topic_exists(name)
}
/// Close all active segments and return them. Used by the S3 shipper.
pub fn close_all_segments(&self) -> anyhow::Result<Vec<ClosedSegment>> {
let writers = self.writers.read().unwrap();
let mut closed = Vec::new();
for writer_arc in writers.values() {
let mut writer = writer_arc.lock().unwrap();
if let Some(seg) = writer.close_active_segment()? {
closed.push(seg);
}
}
Ok(closed)
}
/// Get or create a writer for the given topic-partition.
/// Uses read lock for the common case (writer exists), upgrades to write lock to create.
fn get_or_create_writer(
&self,
topic: &str,
partition: u32,
) -> anyhow::Result<Arc<Mutex<WalWriter<F, C>>>> {
let key = (topic.to_string(), partition);
// Fast path: read lock (most common).
{
let writers = self.writers.read().unwrap();
if let Some(writer) = writers.get(&key) {
return Ok(writer.clone());
}
}
// Slow path: write lock to create new writer.
let mut writers = self.writers.write().unwrap();
// Double-check — another thread may have created it.
if let Some(writer) = writers.get(&key) {
return Ok(writer.clone());
}
let writer = WalWriter::new(
self.fs.clone(),
self.clock.clone(),
self.config.clone(),
TopicName::from(topic),
partition,
)?;
let writer = Arc::new(Mutex::new(writer));
writers.insert(key, writer.clone());
Ok(writer)
}
}
#[cfg(test)]
mod tests {
use super::*;
use sq_sim::fs::InMemoryFileSystem;
use sq_sim::SimClock;
fn test_engine() -> StorageEngine<InMemoryFileSystem, SimClock> {
let fs = Arc::new(InMemoryFileSystem::new());
let clock = Arc::new(SimClock::new());
let config = WalConfig {
max_segment_bytes: 1024 * 1024,
max_segment_age_secs: 3600,
data_dir: PathBuf::from("/data"),
..Default::default()
};
StorageEngine::new(fs, clock, config).unwrap()
}
#[test]
fn test_append_and_read() {
let engine = test_engine();
for i in 0..10 {
let offset = engine
.append("orders", 0, None, format!("msg-{i}").as_bytes(), &[], i * 100)
.unwrap();
assert_eq!(offset, i);
}
let messages = engine.read("orders", 0, 0, 100).unwrap();
assert_eq!(messages.len(), 10);
for (i, msg) in messages.iter().enumerate() {
assert_eq!(msg.offset, i as u64);
assert_eq!(msg.value, format!("msg-{i}").as_bytes());
}
}
#[test]
fn test_read_from_middle() {
let engine = test_engine();
for i in 0..20 {
engine.append("t", 0, None, b"data", &[], i).unwrap();
}
let messages = engine.read("t", 0, 10, 100).unwrap();
assert_eq!(messages.len(), 10);
assert_eq!(messages[0].offset, 10);
assert_eq!(messages[9].offset, 19);
}
#[test]
fn test_read_with_limit() {
let engine = test_engine();
for i in 0..100 {
engine.append("t", 0, None, b"data", &[], i).unwrap();
}
let messages = engine.read("t", 0, 0, 5).unwrap();
assert_eq!(messages.len(), 5);
assert_eq!(messages[4].offset, 4);
}
#[test]
fn test_multi_topic_isolation() {
let engine = test_engine();
engine.append("alpha", 0, None, b"a-data", &[], 0).unwrap();
engine.append("beta", 0, None, b"b-data", &[], 0).unwrap();
let a_msgs = engine.read("alpha", 0, 0, 100).unwrap();
let b_msgs = engine.read("beta", 0, 0, 100).unwrap();
assert_eq!(a_msgs.len(), 1);
assert_eq!(b_msgs.len(), 1);
assert_eq!(a_msgs[0].value, b"a-data");
assert_eq!(b_msgs[0].value, b"b-data");
}
#[test]
fn test_multi_partition_isolation() {
let engine = test_engine();
engine.append("t", 0, None, b"p0", &[], 0).unwrap();
engine.append("t", 1, None, b"p1", &[], 0).unwrap();
let p0 = engine.read("t", 0, 0, 100).unwrap();
let p1 = engine.read("t", 1, 0, 100).unwrap();
assert_eq!(p0.len(), 1);
assert_eq!(p1.len(), 1);
assert_eq!(p0[0].value, b"p0");
assert_eq!(p1[0].value, b"p1");
}
#[test]
fn test_read_nonexistent_topic() {
let engine = test_engine();
let messages = engine.read("no-topic", 0, 0, 100).unwrap();
assert!(messages.is_empty());
}
#[test]
fn test_latest_offset() {
let engine = test_engine();
assert_eq!(engine.latest_offset("t", 0), 0);
engine.append("t", 0, None, b"a", &[], 0).unwrap();
engine.append("t", 0, None, b"b", &[], 0).unwrap();
assert_eq!(engine.latest_offset("t", 0), 2);
}
#[test]
fn test_recovery() {
let fs = Arc::new(InMemoryFileSystem::new());
let clock = Arc::new(SimClock::new());
let config = WalConfig {
max_segment_bytes: 1024 * 1024,
max_segment_age_secs: 3600,
data_dir: PathBuf::from("/data"),
..Default::default()
};
// Write some messages.
{
let engine =
StorageEngine::new(fs.clone(), clock.clone(), config.clone()).unwrap();
for i in 0..5 {
engine
.append("orders", 0, None, format!("msg-{i}").as_bytes(), &[], 0)
.unwrap();
}
}
// Create a new engine and recover.
{
let engine = StorageEngine::new(fs, clock, config).unwrap();
engine.recover().unwrap();
// Should be able to read all messages.
let messages = engine.read("orders", 0, 0, 100).unwrap();
assert_eq!(messages.len(), 5);
// Next offset should continue from 5.
assert_eq!(engine.latest_offset("orders", 0), 5);
// Should be able to write more.
let offset = engine.append("orders", 0, None, b"msg-5", &[], 0).unwrap();
assert_eq!(offset, 5);
}
}
/// Regression: recovery must skip metadata JSON files in the data directory.
#[test]
fn test_recovery_with_metadata_files() {
let fs = Arc::new(InMemoryFileSystem::new());
let clock = Arc::new(SimClock::new());
let config = WalConfig {
max_segment_bytes: 1024 * 1024,
max_segment_age_secs: 3600,
data_dir: PathBuf::from("/data"),
..Default::default()
};
// Write messages and commit a consumer offset (creates JSON files in data_dir).
{
let engine =
StorageEngine::new(fs.clone(), clock.clone(), config.clone()).unwrap();
for i in 0..10 {
engine
.append("orders", 0, None, format!("msg-{i}").as_bytes(), &[], 0)
.unwrap();
}
engine.commit_offset("group-1", "orders", 0, 5).unwrap();
}
// Recover — this used to fail with "Not a directory" because
// consumer_offsets.json was treated as a topic directory.
{
let engine =
StorageEngine::new(fs.clone(), clock.clone(), config.clone()).unwrap();
engine.recover().unwrap();
let messages = engine.read("orders", 0, 0, 100).unwrap();
assert_eq!(messages.len(), 10);
assert_eq!(engine.get_committed_offset("group-1", "orders", 0), Some(5));
}
}
#[test]
fn test_write_1000_read_all() {
let engine = test_engine();
for i in 0..1000 {
engine.append("t", 0, None, b"x", &[], i).unwrap();
}
let messages = engine.read("t", 0, 0, 2000).unwrap();
assert_eq!(messages.len(), 1000);
assert_eq!(messages[0].offset, 0);
assert_eq!(messages[999].offset, 999);
}
#[test]
fn test_append_batch_and_read() {
let engine = test_engine();
let messages: Vec<(Option<&[u8]>, &[u8], &[Header], u64)> = (0..10)
.map(|i| (None, b"data" as &[u8], &[] as &[Header], i as u64 * 100))
.collect();
let offsets = engine.append_batch("orders", 0, &messages).unwrap();
assert_eq!(offsets.len(), 10);
assert_eq!(offsets[0], 0);
assert_eq!(offsets[9], 9);
let read = engine.read("orders", 0, 0, 100).unwrap();
assert_eq!(read.len(), 10);
for (i, msg) in read.iter().enumerate() {
assert_eq!(msg.offset, i as u64);
}
}
#[test]
fn test_append_batch_then_single() {
let engine = test_engine();
let messages: Vec<(Option<&[u8]>, &[u8], &[Header], u64)> = vec![
(None, b"a" as &[u8], &[] as &[Header], 0),
(None, b"b", &[], 0),
];
let offsets = engine.append_batch("t", 0, &messages).unwrap();
assert_eq!(offsets, vec![0, 1]);
let offset = engine.append("t", 0, None, b"c", &[], 0).unwrap();
assert_eq!(offset, 2);
let read = engine.read("t", 0, 0, 100).unwrap();
assert_eq!(read.len(), 3);
}
#[test]
fn test_append_batch_empty() {
let engine = test_engine();
let offsets = engine
.append_batch("t", 0, &[] as &[(Option<&[u8]>, &[u8], &[Header], u64)])
.unwrap();
assert!(offsets.is_empty());
}
}

View File

@@ -0,0 +1,256 @@
use std::collections::BTreeMap;
use std::path::PathBuf;
/// An entry in the sparse offset index.
#[derive(Clone, Debug, PartialEq)]
pub struct IndexEntry {
pub offset: u64,
pub segment_path: PathBuf,
/// Byte position within the segment file (past the segment header).
pub byte_position: u64,
}
/// Location where a segment's data lives.
#[derive(Clone, Debug, PartialEq)]
pub enum SegmentLocation {
Local(PathBuf),
ObjectStore(String), // S3 key
}
/// Sparse in-memory offset index for fast consumer seeks.
///
/// Maps (topic, partition) → sorted list of index entries.
/// Only every Nth offset is indexed (sparse sampling).
/// Lookups use binary search to find the nearest entry at-or-before the requested offset.
pub struct OffsetIndex {
/// Per (topic, partition): sorted vec of index entries.
entries: BTreeMap<(String, u32), Vec<IndexEntry>>,
/// Sample interval: index every Nth offset.
sample_interval: u64,
}
impl OffsetIndex {
pub fn new(sample_interval: u64) -> Self {
Self {
entries: BTreeMap::new(),
sample_interval: sample_interval.max(1),
}
}
/// Add an entry to the index. Entries should be added in offset order.
pub fn add_entry(&mut self, topic: &str, partition: u32, entry: IndexEntry) {
let key = (topic.to_string(), partition);
self.entries.entry(key).or_default().push(entry);
}
/// Register a segment's offset range. Only samples every Nth offset.
/// `base_offset` is the first offset in the segment.
/// `end_offset` is the last offset (inclusive).
pub fn register_segment(
&mut self,
topic: &str,
partition: u32,
segment_path: PathBuf,
base_offset: u64,
end_offset: u64,
) {
// Add an entry for the base offset always.
self.add_entry(
topic,
partition,
IndexEntry {
offset: base_offset,
segment_path: segment_path.clone(),
byte_position: 0, // will need to scan from segment header
},
);
// Add sampled entries.
let mut o = base_offset + self.sample_interval;
while o <= end_offset {
self.add_entry(
topic,
partition,
IndexEntry {
offset: o,
segment_path: segment_path.clone(),
byte_position: 0, // approximate; reader will scan forward
},
);
o += self.sample_interval;
}
}
/// Look up the index entry at-or-before the given offset.
/// Returns the nearest entry whose offset <= requested offset.
pub fn lookup(&self, topic: &str, partition: u32, offset: u64) -> Option<&IndexEntry> {
let key = (topic.to_string(), partition);
let entries = self.entries.get(&key)?;
if entries.is_empty() {
return None;
}
// Binary search for the largest entry.offset <= offset.
match entries.binary_search_by_key(&offset, |e| e.offset) {
Ok(i) => Some(&entries[i]),
Err(0) => None, // offset is before all entries
Err(i) => Some(&entries[i - 1]),
}
}
/// Get the segment path containing the given offset (or the nearest segment before it).
pub fn segment_for_offset(
&self,
topic: &str,
partition: u32,
offset: u64,
) -> Option<&PathBuf> {
self.lookup(topic, partition, offset)
.map(|e| &e.segment_path)
}
/// Get all known segment paths for a topic-partition, in offset order.
pub fn segments(&self, topic: &str, partition: u32) -> Vec<PathBuf> {
let key = (topic.to_string(), partition);
let Some(entries) = self.entries.get(&key) else {
return Vec::new();
};
let mut seen = std::collections::BTreeSet::new();
let mut result = Vec::new();
for entry in entries {
if seen.insert(entry.segment_path.clone()) {
result.push(entry.segment_path.clone());
}
}
result
}
/// Get the earliest known offset for a topic-partition.
pub fn earliest_offset(&self, topic: &str, partition: u32) -> Option<u64> {
let key = (topic.to_string(), partition);
self.entries
.get(&key)
.and_then(|entries| entries.first().map(|e| e.offset))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lookup_exact_offset() {
let mut index = OffsetIndex::new(100);
index.register_segment("orders", 0, PathBuf::from("/seg0.wal"), 0, 999);
let entry = index.lookup("orders", 0, 0).unwrap();
assert_eq!(entry.offset, 0);
assert_eq!(entry.segment_path, PathBuf::from("/seg0.wal"));
}
#[test]
fn test_lookup_between_samples() {
let mut index = OffsetIndex::new(100);
index.register_segment("orders", 0, PathBuf::from("/seg0.wal"), 0, 999);
// Offset 50 is between samples 0 and 100, should return entry for 0.
let entry = index.lookup("orders", 0, 50).unwrap();
assert_eq!(entry.offset, 0);
// Offset 150 is between 100 and 200, should return entry for 100.
let entry = index.lookup("orders", 0, 150).unwrap();
assert_eq!(entry.offset, 100);
}
#[test]
fn test_lookup_beyond_last_entry() {
let mut index = OffsetIndex::new(100);
index.register_segment("t", 0, PathBuf::from("/seg.wal"), 0, 250);
// Offset 5000 is past all entries, should return the last entry.
let entry = index.lookup("t", 0, 5000).unwrap();
assert_eq!(entry.offset, 200);
}
#[test]
fn test_lookup_before_first_entry() {
let mut index = OffsetIndex::new(100);
index.register_segment("t", 0, PathBuf::from("/seg.wal"), 500, 999);
// Offset 100 is before the first entry (500).
assert!(index.lookup("t", 0, 100).is_none());
}
#[test]
fn test_lookup_nonexistent_topic() {
let index = OffsetIndex::new(100);
assert!(index.lookup("no-topic", 0, 0).is_none());
}
#[test]
fn test_multiple_segments() {
let mut index = OffsetIndex::new(1000);
index.register_segment("t", 0, PathBuf::from("/seg0.wal"), 0, 4999);
index.register_segment("t", 0, PathBuf::from("/seg1.wal"), 5000, 9999);
let entry = index.lookup("t", 0, 3000).unwrap();
assert_eq!(entry.segment_path, PathBuf::from("/seg0.wal"));
let entry = index.lookup("t", 0, 7000).unwrap();
assert_eq!(entry.segment_path, PathBuf::from("/seg1.wal"));
}
#[test]
fn test_topic_partition_isolation() {
let mut index = OffsetIndex::new(100);
index.register_segment("a", 0, PathBuf::from("/a0.wal"), 0, 999);
index.register_segment("b", 0, PathBuf::from("/b0.wal"), 0, 999);
index.register_segment("a", 1, PathBuf::from("/a1.wal"), 0, 999);
assert_eq!(
index.segment_for_offset("a", 0, 50).unwrap(),
&PathBuf::from("/a0.wal")
);
assert_eq!(
index.segment_for_offset("b", 0, 50).unwrap(),
&PathBuf::from("/b0.wal")
);
assert_eq!(
index.segment_for_offset("a", 1, 50).unwrap(),
&PathBuf::from("/a1.wal")
);
}
#[test]
fn test_segments_list() {
let mut index = OffsetIndex::new(1000);
index.register_segment("t", 0, PathBuf::from("/seg0.wal"), 0, 4999);
index.register_segment("t", 0, PathBuf::from("/seg1.wal"), 5000, 9999);
let segs = index.segments("t", 0);
assert_eq!(segs.len(), 2);
assert_eq!(segs[0], PathBuf::from("/seg0.wal"));
assert_eq!(segs[1], PathBuf::from("/seg1.wal"));
}
#[test]
fn test_earliest_offset() {
let mut index = OffsetIndex::new(100);
index.register_segment("t", 0, PathBuf::from("/seg.wal"), 42, 999);
assert_eq!(index.earliest_offset("t", 0), Some(42));
}
#[test]
fn test_sample_interval() {
let mut index = OffsetIndex::new(500);
index.register_segment("t", 0, PathBuf::from("/seg.wal"), 0, 2000);
// Should have entries at: 0, 500, 1000, 1500, 2000
let key = ("t".to_string(), 0);
let entries = index.entries.get(&key).unwrap();
let offsets: Vec<u64> = entries.iter().map(|e| e.offset).collect();
assert_eq!(offsets, vec![0, 500, 1000, 1500, 2000]);
}
}

View File

@@ -0,0 +1,6 @@
pub mod consumer_offsets;
pub mod engine;
pub mod index;
pub mod object_store;
pub mod topic_metadata;
pub mod wal;

View File

@@ -0,0 +1,93 @@
/// S3 key layout for shipped WAL segments.
///
/// Format: `{cluster_id}/{topic}/{partition}/{base_offset:020}-{end_offset:020}.sqseg`
///
/// The 020 zero-padding ensures lexicographic ordering matches offset ordering.
pub fn segment_key(
cluster_id: &str,
topic: &str,
partition: u32,
base_offset: u64,
end_offset: u64,
) -> String {
format!(
"{}/{}/{}/{:020}-{:020}.sqseg",
cluster_id, topic, partition, base_offset, end_offset
)
}
/// Parse a segment key back into its components.
/// Returns (cluster_id, topic, partition, base_offset, end_offset).
pub fn parse_segment_key(key: &str) -> Option<(String, String, u32, u64, u64)> {
let parts: Vec<&str> = key.split('/').collect();
if parts.len() != 4 {
return None;
}
let cluster_id = parts[0].to_string();
let topic = parts[1].to_string();
let partition: u32 = parts[2].parse().ok()?;
let filename = parts[3].strip_suffix(".sqseg")?;
let offsets: Vec<&str> = filename.split('-').collect();
if offsets.len() != 2 {
return None;
}
let base_offset: u64 = offsets[0].parse().ok()?;
let end_offset: u64 = offsets[1].parse().ok()?;
Some((cluster_id, topic, partition, base_offset, end_offset))
}
/// S3 key prefix for listing segments of a topic-partition.
pub fn topic_partition_prefix(cluster_id: &str, topic: &str, partition: u32) -> String {
format!("{}/{}/{}/", cluster_id, topic, partition)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_segment_key_format() {
let key = segment_key("cluster-1", "orders", 0, 0, 999);
assert_eq!(
key,
"cluster-1/orders/0/00000000000000000000-00000000000000000999.sqseg"
);
}
#[test]
fn test_segment_key_lexicographic_order() {
let k1 = segment_key("c", "t", 0, 0, 999);
let k2 = segment_key("c", "t", 0, 1000, 1999);
let k3 = segment_key("c", "t", 0, 2000, 2999);
assert!(k1 < k2);
assert!(k2 < k3);
}
#[test]
fn test_parse_segment_key() {
let key = segment_key("cluster-1", "orders", 2, 1000, 1999);
let parsed = parse_segment_key(&key).unwrap();
assert_eq!(parsed.0, "cluster-1");
assert_eq!(parsed.1, "orders");
assert_eq!(parsed.2, 2);
assert_eq!(parsed.3, 1000);
assert_eq!(parsed.4, 1999);
}
#[test]
fn test_parse_invalid_key() {
assert!(parse_segment_key("invalid").is_none());
assert!(parse_segment_key("a/b/c").is_none());
assert!(parse_segment_key("a/b/c/d.txt").is_none());
}
#[test]
fn test_topic_partition_prefix() {
let prefix = topic_partition_prefix("cluster-1", "orders", 0);
assert_eq!(prefix, "cluster-1/orders/0/");
}
}

View File

@@ -0,0 +1,159 @@
pub mod layout;
pub mod reader;
pub mod s3;
pub mod shipper;
use std::collections::HashMap;
use std::sync::Mutex;
/// Trait for object storage backends (S3, MinIO, in-memory for tests).
#[allow(async_fn_in_trait)]
pub trait ObjectStore: Send + Sync + 'static {
async fn put(&self, key: &str, data: Vec<u8>) -> anyhow::Result<()>;
async fn get(&self, key: &str) -> anyhow::Result<Vec<u8>>;
async fn list(&self, prefix: &str) -> anyhow::Result<Vec<String>>;
async fn delete(&self, key: &str) -> anyhow::Result<()>;
async fn exists(&self, key: &str) -> anyhow::Result<bool>;
}
/// In-memory object store for testing.
pub struct InMemoryObjectStore {
data: Mutex<HashMap<String, Vec<u8>>>,
}
impl InMemoryObjectStore {
pub fn new() -> Self {
Self {
data: Mutex::new(HashMap::new()),
}
}
}
impl Default for InMemoryObjectStore {
fn default() -> Self {
Self::new()
}
}
impl ObjectStore for InMemoryObjectStore {
async fn put(&self, key: &str, data: Vec<u8>) -> anyhow::Result<()> {
self.data
.lock()
.unwrap()
.insert(key.to_string(), data);
Ok(())
}
async fn get(&self, key: &str) -> anyhow::Result<Vec<u8>> {
self.data
.lock()
.unwrap()
.get(key)
.cloned()
.ok_or_else(|| anyhow::anyhow!("key '{}' not found", key))
}
async fn list(&self, prefix: &str) -> anyhow::Result<Vec<String>> {
let data = self.data.lock().unwrap();
let mut keys: Vec<String> = data
.keys()
.filter(|k| k.starts_with(prefix))
.cloned()
.collect();
keys.sort();
Ok(keys)
}
async fn delete(&self, key: &str) -> anyhow::Result<()> {
self.data.lock().unwrap().remove(key);
Ok(())
}
async fn exists(&self, key: &str) -> anyhow::Result<bool> {
Ok(self.data.lock().unwrap().contains_key(key))
}
}
/// No-op object store that silently discards all data.
pub struct NoopObjectStore;
impl ObjectStore for NoopObjectStore {
async fn put(&self, _key: &str, _data: Vec<u8>) -> anyhow::Result<()> {
Ok(())
}
async fn get(&self, key: &str) -> anyhow::Result<Vec<u8>> {
anyhow::bail!("NoopObjectStore: key '{}' not found", key)
}
async fn list(&self, _prefix: &str) -> anyhow::Result<Vec<String>> {
Ok(vec![])
}
async fn delete(&self, _key: &str) -> anyhow::Result<()> {
Ok(())
}
async fn exists(&self, _key: &str) -> anyhow::Result<bool> {
Ok(false)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_inmemory_put_get() {
let store = InMemoryObjectStore::new();
store.put("test/key", b"hello".to_vec()).await.unwrap();
let data = store.get("test/key").await.unwrap();
assert_eq!(data, b"hello");
}
#[tokio::test]
async fn test_inmemory_get_nonexistent() {
let store = InMemoryObjectStore::new();
let err = store.get("no/such/key").await.unwrap_err();
assert!(err.to_string().contains("not found"));
}
#[tokio::test]
async fn test_inmemory_list() {
let store = InMemoryObjectStore::new();
store.put("a/1", b"x".to_vec()).await.unwrap();
store.put("a/2", b"y".to_vec()).await.unwrap();
store.put("b/1", b"z".to_vec()).await.unwrap();
let keys = store.list("a/").await.unwrap();
assert_eq!(keys, vec!["a/1", "a/2"]);
}
#[tokio::test]
async fn test_inmemory_delete() {
let store = InMemoryObjectStore::new();
store.put("key", b"data".to_vec()).await.unwrap();
store.delete("key").await.unwrap();
assert!(!store.exists("key").await.unwrap());
}
#[tokio::test]
async fn test_inmemory_exists() {
let store = InMemoryObjectStore::new();
assert!(!store.exists("key").await.unwrap());
store.put("key", b"data".to_vec()).await.unwrap();
assert!(store.exists("key").await.unwrap());
}
#[tokio::test]
async fn test_noop_put_get() {
let store = NoopObjectStore;
store.put("key", b"data".to_vec()).await.unwrap();
// Get always fails on noop store.
assert!(store.get("key").await.is_err());
}
}

View File

@@ -0,0 +1,209 @@
use sq_models::Message;
use super::ObjectStore;
use crate::wal::reader::WalReader;
use sq_sim::fs::FileSystem;
use std::path::PathBuf;
use std::sync::Arc;
/// Reads segments from object storage, decompressing and parsing them.
pub struct ObjectStoreReader<F: FileSystem, O: ObjectStore> {
fs: Arc<F>,
object_store: Arc<O>,
cache_dir: PathBuf,
wal_reader: WalReader<F>,
}
impl<F: FileSystem, O: ObjectStore> ObjectStoreReader<F, O> {
pub fn new(fs: Arc<F>, object_store: Arc<O>, cache_dir: PathBuf) -> Self {
let wal_reader = WalReader::new(fs.clone());
Self {
fs,
object_store,
cache_dir,
wal_reader,
}
}
/// Fetch a segment from object storage, decompress it, cache locally, and read messages.
pub async fn read_segment(
&self,
key: &str,
from_offset: u64,
) -> anyhow::Result<Vec<Message>> {
// Check local cache first.
let cache_path = self.cache_path(key);
if !self.fs.exists(&cache_path) {
// Download from object store.
let compressed = self.object_store.get(key).await?;
// Decompress zstd.
let decompressed = zstd::decode_all(compressed.as_slice())?;
// Cache locally.
if let Some(parent) = cache_path.parent() {
self.fs.create_dir_all(parent)?;
}
let mut handle = self.fs.open_write(&cache_path)?;
handle.write_all(&decompressed)?;
}
// Read from cached file.
Ok(self.wal_reader.read_from_offset(&cache_path, from_offset)?)
}
/// List segment keys in object storage matching a prefix.
pub async fn list_segment_keys(&self, prefix: &str) -> anyhow::Result<Vec<String>> {
self.object_store.list(prefix).await
}
fn cache_path(&self, key: &str) -> PathBuf {
// Replace '/' with '_' for flat cache directory.
let safe_name = key.replace('/', "_");
self.cache_dir.join(safe_name)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::object_store::InMemoryObjectStore;
use crate::wal::record::encode_record;
use crate::wal::segment::SegmentHeader;
use sq_sim::fs::InMemoryFileSystem;
fn build_test_segment(topic: &str, partition: u32, messages: &[Message]) -> Vec<u8> {
let mut data = Vec::new();
// Write segment header.
let header = SegmentHeader {
topic: topic.to_string(),
partition,
};
data.extend_from_slice(&header.encode());
// Write records.
for msg in messages {
data.extend_from_slice(&encode_record(msg));
}
data
}
#[tokio::test]
async fn test_read_from_object_store() {
let fs = Arc::new(InMemoryFileSystem::new());
let store = Arc::new(InMemoryObjectStore::new());
let messages = vec![
Message {
offset: 0,
topic: "orders".into(),
partition: 0,
key: None,
value: b"msg-0".to_vec(),
headers: vec![],
timestamp_ms: 100,
},
Message {
offset: 1,
topic: "orders".into(),
partition: 0,
key: None,
value: b"msg-1".to_vec(),
headers: vec![],
timestamp_ms: 200,
},
];
let segment_data = build_test_segment("orders", 0, &messages);
let compressed = zstd::encode_all(segment_data.as_slice(), 3).unwrap();
store
.put("cluster/orders/0/00000000000000000000-00000000000000000001.sqseg", compressed)
.await
.unwrap();
let reader = ObjectStoreReader::new(fs, store, PathBuf::from("/cache"));
let result = reader
.read_segment(
"cluster/orders/0/00000000000000000000-00000000000000000001.sqseg",
0,
)
.await
.unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result[0].value, b"msg-0");
assert_eq!(result[1].value, b"msg-1");
}
#[tokio::test]
async fn test_cached_read() {
let fs = Arc::new(InMemoryFileSystem::new());
let store = Arc::new(InMemoryObjectStore::new());
let messages = vec![Message {
offset: 0,
topic: "t".into(),
partition: 0,
key: None,
value: b"data".to_vec(),
headers: vec![],
timestamp_ms: 0,
}];
let segment_data = build_test_segment("t", 0, &messages);
let compressed = zstd::encode_all(segment_data.as_slice(), 3).unwrap();
let key = "cluster/t/0/00000000000000000000-00000000000000000000.sqseg";
store.put(key, compressed).await.unwrap();
let reader = ObjectStoreReader::new(fs.clone(), store.clone(), PathBuf::from("/cache"));
// First read - fetches from store.
let result1 = reader.read_segment(key, 0).await.unwrap();
assert_eq!(result1.len(), 1);
// Delete from store to prove cached read works.
store.delete(key).await.unwrap();
// Second read - uses cache.
let result2 = reader.read_segment(key, 0).await.unwrap();
assert_eq!(result2.len(), 1);
assert_eq!(result2[0].value, b"data");
}
#[tokio::test]
async fn test_read_from_offset() {
let fs = Arc::new(InMemoryFileSystem::new());
let store = Arc::new(InMemoryObjectStore::new());
let messages: Vec<Message> = (0..5)
.map(|i| Message {
offset: i,
topic: "t".into(),
partition: 0,
key: None,
value: format!("msg-{i}").into_bytes(),
headers: vec![],
timestamp_ms: i * 100,
})
.collect();
let segment_data = build_test_segment("t", 0, &messages);
let compressed = zstd::encode_all(segment_data.as_slice(), 3).unwrap();
let key = "cluster/t/0/00000000000000000000-00000000000000000004.sqseg";
store.put(key, compressed).await.unwrap();
let reader = ObjectStoreReader::new(fs, store, PathBuf::from("/cache"));
let result = reader.read_segment(key, 3).await.unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result[0].offset, 3);
assert_eq!(result[1].offset, 4);
}
}

View File

@@ -0,0 +1,106 @@
use object_store::aws::{AmazonS3, AmazonS3Builder};
use object_store::path::Path as ObjectPath;
use object_store::ObjectStore as _;
use super::ObjectStore;
/// S3-backed object store using the `object_store` crate.
/// Works with AWS S3, MinIO, and any S3-compatible endpoint.
pub struct S3ObjectStore {
store: AmazonS3,
bucket: String,
}
/// Configuration for the S3 object store.
pub struct S3Config {
pub bucket: String,
pub region: String,
pub endpoint: Option<String>,
pub access_key_id: Option<String>,
pub secret_access_key: Option<String>,
/// Allow HTTP (non-TLS) connections. Required for local MinIO.
pub allow_http: bool,
}
impl S3ObjectStore {
pub fn new(config: S3Config) -> anyhow::Result<Self> {
let mut builder = AmazonS3Builder::new()
.with_bucket_name(&config.bucket)
.with_region(&config.region);
if let Some(endpoint) = &config.endpoint {
builder = builder.with_endpoint(endpoint);
}
if let Some(key) = &config.access_key_id {
builder = builder.with_access_key_id(key);
}
if let Some(secret) = &config.secret_access_key {
builder = builder.with_secret_access_key(secret);
}
if config.allow_http {
builder = builder.with_allow_http(true);
}
let store = builder.build()?;
Ok(Self {
store,
bucket: config.bucket,
})
}
/// Get the bucket name.
pub fn bucket(&self) -> &str {
&self.bucket
}
}
impl ObjectStore for S3ObjectStore {
async fn put(&self, key: &str, data: Vec<u8>) -> anyhow::Result<()> {
let path = ObjectPath::from(key);
self.store
.put(&path, bytes::Bytes::from(data).into())
.await?;
Ok(())
}
async fn get(&self, key: &str) -> anyhow::Result<Vec<u8>> {
let path = ObjectPath::from(key);
let result = self.store.get(&path).await?;
let bytes = result.bytes().await?;
Ok(bytes.to_vec())
}
async fn list(&self, prefix: &str) -> anyhow::Result<Vec<String>> {
use futures::TryStreamExt;
let prefix_path = ObjectPath::from(prefix);
let mut keys = Vec::new();
let mut stream = self.store.list(Some(&prefix_path));
while let Some(meta) = stream.try_next().await? {
keys.push(meta.location.to_string());
}
keys.sort();
Ok(keys)
}
async fn delete(&self, key: &str) -> anyhow::Result<()> {
let path = ObjectPath::from(key);
self.store.delete(&path).await?;
Ok(())
}
async fn exists(&self, key: &str) -> anyhow::Result<bool> {
let path = ObjectPath::from(key);
match self.store.head(&path).await {
Ok(_) => Ok(true),
Err(object_store::Error::NotFound { .. }) => Ok(false),
Err(e) => Err(e.into()),
}
}
}

View File

@@ -0,0 +1,273 @@
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use sq_models::ClosedSegment;
use sq_sim::fs::FileSystem;
use tokio::sync::Mutex;
use super::layout;
use super::ObjectStore;
/// Tracks which segments have been shipped to object storage.
pub struct ShippedSegments {
shipped: HashSet<PathBuf>,
}
impl ShippedSegments {
pub fn new() -> Self {
Self {
shipped: HashSet::new(),
}
}
pub fn mark_shipped(&mut self, path: PathBuf) {
self.shipped.insert(path);
}
pub fn is_shipped(&self, path: &PathBuf) -> bool {
self.shipped.contains(path)
}
pub fn shipped_paths(&self) -> &HashSet<PathBuf> {
&self.shipped
}
}
impl Default for ShippedSegments {
fn default() -> Self {
Self::new()
}
}
/// Ships closed WAL segments to object storage with zstd compression.
pub struct SegmentShipper<F: FileSystem, O: ObjectStore> {
fs: Arc<F>,
object_store: Arc<O>,
cluster_id: String,
shipped: Arc<Mutex<ShippedSegments>>,
}
impl<F: FileSystem, O: ObjectStore> SegmentShipper<F, O> {
pub fn new(
fs: Arc<F>,
object_store: Arc<O>,
cluster_id: String,
shipped: Arc<Mutex<ShippedSegments>>,
) -> Self {
Self {
fs,
object_store,
cluster_id,
shipped,
}
}
/// Ship a single closed segment to object storage.
/// Reads the local WAL file, compresses with zstd, uploads.
pub async fn ship_segment(&self, segment: &ClosedSegment) -> anyhow::Result<()> {
// Check if already shipped.
{
let shipped = self.shipped.lock().await;
if shipped.is_shipped(&segment.path) {
return Ok(());
}
}
// Read the local WAL file.
let mut handle = self.fs.open_read(&segment.path)?;
let mut raw_data = Vec::new();
handle.read_to_end(&mut raw_data)?;
// Compress with zstd.
let compressed = zstd::encode_all(raw_data.as_slice(), 3)?;
// Build the S3 key.
let key = layout::segment_key(
&self.cluster_id,
segment.topic.as_str(),
segment.partition,
segment.base_offset,
segment.end_offset,
);
// Upload.
self.object_store.put(&key, compressed).await?;
tracing::info!(
topic = %segment.topic,
partition = segment.partition,
base_offset = segment.base_offset,
end_offset = segment.end_offset,
key = %key,
"shipped segment to object store"
);
// Mark as shipped.
{
let mut shipped = self.shipped.lock().await;
shipped.mark_shipped(segment.path.clone());
}
Ok(())
}
/// Ship all provided closed segments. Returns the number of successfully shipped segments.
pub async fn ship_all(&self, segments: &[ClosedSegment]) -> usize {
let mut shipped_count = 0;
for segment in segments {
match self.ship_segment(segment).await {
Ok(()) => shipped_count += 1,
Err(e) => {
tracing::warn!(
topic = %segment.topic,
partition = segment.partition,
path = %segment.path.display(),
error = %e,
"failed to ship segment, will retry"
);
}
}
}
shipped_count
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::object_store::InMemoryObjectStore;
use sq_sim::fs::InMemoryFileSystem;
use std::path::Path;
fn setup() -> (
Arc<InMemoryFileSystem>,
Arc<InMemoryObjectStore>,
SegmentShipper<InMemoryFileSystem, InMemoryObjectStore>,
) {
let fs = Arc::new(InMemoryFileSystem::new());
let store = Arc::new(InMemoryObjectStore::new());
let shipped = Arc::new(Mutex::new(ShippedSegments::new()));
let shipper = SegmentShipper::new(
fs.clone(),
store.clone(),
"test-cluster".to_string(),
shipped,
);
(fs, store, shipper)
}
fn create_test_segment(fs: &InMemoryFileSystem, path: &Path, data: &[u8]) {
fs.create_dir_all(path.parent().unwrap()).unwrap();
let mut handle = fs.open_write(path).unwrap();
handle.write_all(data).unwrap();
}
#[tokio::test]
async fn test_ship_segment() {
let (fs, store, shipper) = setup();
let seg_path = PathBuf::from("/data/orders/0/00000000000000000000.wal");
create_test_segment(&fs, &seg_path, b"wal data here");
let segment = ClosedSegment {
topic: "orders".into(),
partition: 0,
base_offset: 0,
end_offset: 99,
path: seg_path,
size_bytes: 13,
};
shipper.ship_segment(&segment).await.unwrap();
// Verify it's in the object store.
let key = layout::segment_key("test-cluster", "orders", 0, 0, 99);
let data = store.get(&key).await.unwrap();
// Data should be zstd-compressed, so decompress and verify.
let decompressed = zstd::decode_all(data.as_slice()).unwrap();
assert_eq!(decompressed, b"wal data here");
}
#[tokio::test]
async fn test_ship_already_shipped_is_noop() {
let (fs, store, shipper) = setup();
let seg_path = PathBuf::from("/data/orders/0/00000000000000000000.wal");
create_test_segment(&fs, &seg_path, b"data");
let segment = ClosedSegment {
topic: "orders".into(),
partition: 0,
base_offset: 0,
end_offset: 99,
path: seg_path,
size_bytes: 13,
};
shipper.ship_segment(&segment).await.unwrap();
// Ship again - should be a noop.
shipper.ship_segment(&segment).await.unwrap();
let keys = store.list("test-cluster/").await.unwrap();
assert_eq!(keys.len(), 1);
}
#[tokio::test]
async fn test_ship_all_counts() {
let (fs, store, shipper) = setup();
let mut segments = Vec::new();
for i in 0..3 {
let path = PathBuf::from(format!("/data/t/0/{:020}.wal", i * 100));
create_test_segment(&fs, &path, format!("data-{i}").as_bytes());
segments.push(ClosedSegment {
topic: "t".into(),
partition: 0,
base_offset: i * 100,
end_offset: i * 100 + 99,
path,
size_bytes: 6,
});
}
let count = shipper.ship_all(&segments).await;
assert_eq!(count, 3);
let keys = store.list("test-cluster/").await.unwrap();
assert_eq!(keys.len(), 3);
}
#[tokio::test]
async fn test_s3_key_layout() {
let (fs, store, shipper) = setup();
let seg_path = PathBuf::from("/data/events/2/00000000000000001000.wal");
create_test_segment(&fs, &seg_path, b"data");
let segment = ClosedSegment {
topic: "events".into(),
partition: 2,
base_offset: 1000,
end_offset: 1999,
path: seg_path,
size_bytes: 4,
};
shipper.ship_segment(&segment).await.unwrap();
let expected_key = "test-cluster/events/2/00000000000000001000-00000000000000001999.sqseg";
assert!(store.exists(expected_key).await.unwrap());
}
#[test]
fn test_shipped_segments_tracking() {
let mut shipped = ShippedSegments::new();
let path = PathBuf::from("/data/t/0/000.wal");
assert!(!shipped.is_shipped(&path));
shipped.mark_shipped(path.clone());
assert!(shipped.is_shipped(&path));
}
}

View File

@@ -0,0 +1,225 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use sq_models::TopicConfig;
use sq_sim::fs::FileSystem;
/// Manages topic metadata (name, partitions, replication factor).
/// Backed by a JSON file for persistence.
pub struct TopicMetadata<F: FileSystem> {
topics: HashMap<String, TopicConfig>,
persist_path: PathBuf,
fs: Arc<F>,
}
impl<F: FileSystem> TopicMetadata<F> {
pub fn new(fs: Arc<F>, data_dir: &Path) -> Self {
let persist_path = data_dir.join("topic_metadata.json");
Self {
topics: HashMap::new(),
persist_path,
fs,
}
}
/// Create a new topic. Returns error if topic already exists.
pub fn create_topic(&mut self, config: TopicConfig) -> anyhow::Result<()> {
if self.topics.contains_key(config.name.as_str()) {
anyhow::bail!("topic '{}' already exists", config.name);
}
self.topics.insert(config.name.to_string(), config);
self.persist()
}
/// Delete a topic by name. Returns error if topic doesn't exist.
pub fn delete_topic(&mut self, name: &str) -> anyhow::Result<()> {
if self.topics.remove(name).is_none() {
anyhow::bail!("topic '{}' not found", name);
}
self.persist()
}
/// List all topics.
pub fn list_topics(&self) -> Vec<&TopicConfig> {
let mut topics: Vec<_> = self.topics.values().collect();
topics.sort_by_key(|t| t.name.as_str());
topics
}
/// Get a specific topic's config.
pub fn get_topic(&self, name: &str) -> Option<&TopicConfig> {
self.topics.get(name)
}
/// Check if a topic exists.
pub fn topic_exists(&self, name: &str) -> bool {
self.topics.contains_key(name)
}
fn persist(&self) -> anyhow::Result<()> {
let entries: Vec<TopicEntry> = self
.topics
.values()
.map(|c| TopicEntry {
name: c.name.to_string(),
partitions: c.partitions,
replication_factor: c.replication_factor,
})
.collect();
let json = serde_json::to_vec(&entries)?;
if let Some(parent) = self.persist_path.parent() {
self.fs.create_dir_all(parent)?;
}
let mut handle = self.fs.open_write(&self.persist_path)?;
handle.write_all(&json)?;
handle.fsync()?;
Ok(())
}
/// Load topic metadata from disk.
pub fn load(fs: Arc<F>, data_dir: &Path) -> anyhow::Result<Self> {
let persist_path = data_dir.join("topic_metadata.json");
if !fs.exists(&persist_path) {
return Ok(Self {
topics: HashMap::new(),
persist_path,
fs,
});
}
let mut handle = fs.open_read(&persist_path)?;
let mut buf = Vec::new();
handle.read_to_end(&mut buf)?;
let entries: Vec<TopicEntry> = serde_json::from_slice(&buf)?;
let mut topics = HashMap::new();
for entry in entries {
let config = TopicConfig::new(entry.name.as_str())
.with_partitions(entry.partitions)
.with_replication_factor(entry.replication_factor);
topics.insert(entry.name, config);
}
Ok(Self {
topics,
persist_path,
fs,
})
}
}
#[derive(serde::Serialize, serde::Deserialize)]
struct TopicEntry {
name: String,
partitions: u32,
replication_factor: u32,
}
#[cfg(test)]
mod tests {
use super::*;
use sq_sim::fs::InMemoryFileSystem;
fn test_metadata() -> TopicMetadata<InMemoryFileSystem> {
let fs = Arc::new(InMemoryFileSystem::new());
TopicMetadata::new(fs, Path::new("/data"))
}
#[test]
fn test_create_and_get_topic() {
let mut meta = test_metadata();
meta.create_topic(TopicConfig::new("orders")).unwrap();
let topic = meta.get_topic("orders").unwrap();
assert_eq!(topic.name.as_str(), "orders");
assert_eq!(topic.partitions, 1);
assert_eq!(topic.replication_factor, 3);
}
#[test]
fn test_create_duplicate_fails() {
let mut meta = test_metadata();
meta.create_topic(TopicConfig::new("orders")).unwrap();
let err = meta.create_topic(TopicConfig::new("orders")).unwrap_err();
assert!(err.to_string().contains("already exists"));
}
#[test]
fn test_delete_topic() {
let mut meta = test_metadata();
meta.create_topic(TopicConfig::new("orders")).unwrap();
meta.delete_topic("orders").unwrap();
assert!(meta.get_topic("orders").is_none());
}
#[test]
fn test_delete_nonexistent_fails() {
let mut meta = test_metadata();
let err = meta.delete_topic("orders").unwrap_err();
assert!(err.to_string().contains("not found"));
}
#[test]
fn test_list_topics_sorted() {
let mut meta = test_metadata();
meta.create_topic(TopicConfig::new("zebra")).unwrap();
meta.create_topic(TopicConfig::new("alpha")).unwrap();
meta.create_topic(TopicConfig::new("middle")).unwrap();
let topics = meta.list_topics();
assert_eq!(topics.len(), 3);
assert_eq!(topics[0].name.as_str(), "alpha");
assert_eq!(topics[1].name.as_str(), "middle");
assert_eq!(topics[2].name.as_str(), "zebra");
}
#[test]
fn test_persist_and_load() {
let fs = Arc::new(InMemoryFileSystem::new());
{
let mut meta = TopicMetadata::new(fs.clone(), Path::new("/data"));
meta.create_topic(
TopicConfig::new("orders")
.with_partitions(4)
.with_replication_factor(2),
)
.unwrap();
meta.create_topic(TopicConfig::new("events")).unwrap();
}
let loaded = TopicMetadata::load(fs, Path::new("/data")).unwrap();
assert_eq!(loaded.list_topics().len(), 2);
let orders = loaded.get_topic("orders").unwrap();
assert_eq!(orders.partitions, 4);
assert_eq!(orders.replication_factor, 2);
assert!(loaded.topic_exists("events"));
}
#[test]
fn test_load_nonexistent_file() {
let fs = Arc::new(InMemoryFileSystem::new());
let meta = TopicMetadata::load(fs, Path::new("/data")).unwrap();
assert!(meta.list_topics().is_empty());
}
#[test]
fn test_topic_exists() {
let mut meta = test_metadata();
assert!(!meta.topic_exists("orders"));
meta.create_topic(TopicConfig::new("orders")).unwrap();
assert!(meta.topic_exists("orders"));
}
}

View File

@@ -0,0 +1,5 @@
pub mod reader;
pub mod record;
pub mod segment;
pub mod trimmer;
pub mod writer;

View File

@@ -0,0 +1,281 @@
use std::path::Path;
use std::sync::Arc;
use sq_models::{Message, TopicName};
use sq_sim::fs::FileSystem;
use super::record::{decode_record, RecordError, MIN_RECORD_SIZE};
use super::segment::{SegmentHeader, SegmentHeaderError, SEGMENT_HEADER_SIZE};
/// Errors from reading WAL segments.
#[derive(Debug, thiserror::Error)]
pub enum ReaderError {
#[error("io error: {0}")]
Io(#[from] std::io::Error),
#[error("segment header error: {0}")]
SegmentHeader(#[from] SegmentHeaderError),
#[error("record error at byte offset {byte_offset}: {source}")]
Record {
byte_offset: usize,
source: RecordError,
},
}
/// WAL segment reader. Reads messages from segment files.
pub struct WalReader<F: FileSystem> {
fs: Arc<F>,
}
impl<F: FileSystem> WalReader<F> {
pub fn new(fs: Arc<F>) -> Self {
Self { fs }
}
/// Read the segment header from a segment file.
pub fn read_segment_header(&self, path: &Path) -> Result<SegmentHeader, ReaderError> {
let mut fh = self.fs.open_read(path)?;
let mut header_buf = [0u8; SEGMENT_HEADER_SIZE];
fh.read_exact(&mut header_buf)?;
Ok(SegmentHeader::decode(&header_buf)?)
}
/// Read all messages from a segment file.
pub fn read_segment(&self, path: &Path) -> Result<Vec<Message>, ReaderError> {
let header = self.read_segment_header(path)?;
let topic = TopicName::from(header.topic.as_str());
let mut fh = self.fs.open_read(path)?;
let mut all_bytes = Vec::new();
fh.read_to_end(&mut all_bytes)?;
let data = &all_bytes[SEGMENT_HEADER_SIZE..];
Self::decode_records(data, &topic, header.partition)
}
/// Read messages from a segment file starting at a given offset.
/// Returns all messages with offset >= `from_offset`.
pub fn read_from_offset(
&self,
path: &Path,
from_offset: u64,
) -> Result<Vec<Message>, ReaderError> {
let all = self.read_segment(path)?;
Ok(all.into_iter().filter(|m| m.offset >= from_offset).collect())
}
/// Decode records from a byte buffer. Stops at the first unrecoverable error
/// or end of data. Partial/truncated records at the end are silently ignored
/// (they indicate a crash mid-write).
fn decode_records(
data: &[u8],
topic: &TopicName,
partition: u32,
) -> Result<Vec<Message>, ReaderError> {
let mut messages = Vec::new();
let mut pos = 0;
while pos + MIN_RECORD_SIZE <= data.len() {
match decode_record(&data[pos..], topic, partition) {
Ok((msg, consumed)) => {
messages.push(msg);
pos += consumed;
}
Err(RecordError::BufferTooShort { .. }) => {
// Truncated record at end of segment (partial write) — stop cleanly.
break;
}
Err(e) => {
return Err(ReaderError::Record {
byte_offset: SEGMENT_HEADER_SIZE + pos,
source: e,
});
}
}
}
Ok(messages)
}
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use sq_sim::fs::InMemoryFileSystem;
use sq_sim::SimClock;
use super::*;
use crate::wal::writer::WalWriter;
use sq_models::WalConfig;
fn test_setup() -> (Arc<InMemoryFileSystem>, Arc<SimClock>, WalConfig) {
let fs = Arc::new(InMemoryFileSystem::new());
let clock = Arc::new(SimClock::new());
let config = WalConfig {
max_segment_bytes: 1024 * 1024, // large, no rotation during tests
max_segment_age_secs: 3600,
data_dir: PathBuf::from("/data"),
..Default::default()
};
(fs, clock, config)
}
#[test]
fn test_read_segment_header() {
let (fs, clock, config) = test_setup();
let topic = TopicName::from("orders");
let mut writer =
WalWriter::new(fs.clone(), clock, config, topic.clone(), 5).unwrap();
writer.append(None, b"data", &[], 0).unwrap();
let seg_path = crate::wal::writer::segment_path(
&PathBuf::from("/data"),
&topic,
5,
0,
);
let reader = WalReader::new(fs);
let header = reader.read_segment_header(&seg_path).unwrap();
assert_eq!(header.topic, "orders");
assert_eq!(header.partition, 5);
}
#[test]
fn test_write_then_read_all() {
let (fs, clock, config) = test_setup();
let topic = TopicName::from("events");
let mut writer =
WalWriter::new(fs.clone(), clock, config, topic.clone(), 0).unwrap();
for i in 0..10 {
writer
.append(None, format!("msg-{i}").as_bytes(), &[], i * 100)
.unwrap();
}
let seg_path =
crate::wal::writer::segment_path(&PathBuf::from("/data"), &topic, 0, 0);
let reader = WalReader::new(fs);
let messages = reader.read_segment(&seg_path).unwrap();
assert_eq!(messages.len(), 10);
for (i, msg) in messages.iter().enumerate() {
assert_eq!(msg.offset, i as u64);
assert_eq!(msg.value, format!("msg-{i}").as_bytes());
assert_eq!(msg.timestamp_ms, i as u64 * 100);
}
}
#[test]
fn test_read_from_offset() {
let (fs, clock, config) = test_setup();
let topic = TopicName::from("t");
let mut writer =
WalWriter::new(fs.clone(), clock, config, topic.clone(), 0).unwrap();
for _ in 0..10 {
writer.append(None, b"data", &[], 0).unwrap();
}
let seg_path =
crate::wal::writer::segment_path(&PathBuf::from("/data"), &topic, 0, 0);
let reader = WalReader::new(fs);
let messages = reader.read_from_offset(&seg_path, 5).unwrap();
assert_eq!(messages.len(), 5);
assert_eq!(messages[0].offset, 5);
assert_eq!(messages[4].offset, 9);
}
#[test]
fn test_read_empty_segment() {
let (fs, clock, config) = test_setup();
let topic = TopicName::from("t");
// Create a writer but don't write any messages — just ensure the segment exists
let mut writer =
WalWriter::new(fs.clone(), clock, config, topic.clone(), 0).unwrap();
// Force segment creation by writing then reading
writer.append(None, b"x", &[], 0).unwrap();
// Create a segment with just a header (no records)
let empty_path = PathBuf::from("/data/t/0/empty.wal");
{
let mut fh = fs.open_write(&empty_path).unwrap();
let header = super::super::segment::SegmentHeader {
topic: "t".to_string(),
partition: 0,
};
fh.write_all(&header.encode()).unwrap();
}
let reader = WalReader::new(fs);
let messages = reader.read_segment(&empty_path).unwrap();
assert!(messages.is_empty());
}
#[test]
fn test_corrupted_record_returns_error() {
let (fs, clock, config) = test_setup();
let topic = TopicName::from("t");
let mut writer =
WalWriter::new(fs.clone(), clock, config, topic.clone(), 0).unwrap();
writer.append(None, b"data", &[], 0).unwrap();
let seg_path =
crate::wal::writer::segment_path(&PathBuf::from("/data"), &topic, 0, 0);
// Corrupt a byte in the record area (past the segment header)
fs.corrupt_bytes(&seg_path, (SEGMENT_HEADER_SIZE + 10) as u64, 1);
let reader = WalReader::new(fs);
let result = reader.read_segment(&seg_path);
assert!(result.is_err());
match result.unwrap_err() {
ReaderError::Record { source, .. } => {
assert!(matches!(source, RecordError::CrcMismatch { .. }));
}
other => panic!("expected Record error, got: {other:?}"),
}
}
#[test]
fn test_truncated_record_at_end_is_ignored() {
let (fs, clock, config) = test_setup();
let topic = TopicName::from("t");
let mut writer =
WalWriter::new(fs.clone(), clock, config, topic.clone(), 0).unwrap();
writer.append(None, b"good message", &[], 0).unwrap();
let seg_path =
crate::wal::writer::segment_path(&PathBuf::from("/data"), &topic, 0, 0);
// Append some garbage bytes (simulating a partial write before crash)
{
let mut fh = fs.open_append(&seg_path).unwrap();
fh.write_all(&[0xDE, 0xAD, 0xBE, 0xEF, 0x00]).unwrap();
}
let reader = WalReader::new(fs);
let messages = reader.read_segment(&seg_path).unwrap();
// Should get the one good message and ignore the garbage
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].value, b"good message");
}
#[test]
fn test_read_nonexistent_file() {
let fs = Arc::new(InMemoryFileSystem::new());
let reader = WalReader::new(fs);
let result = reader.read_segment(Path::new("/no/such/file.wal"));
assert!(result.is_err());
}
}

View File

@@ -0,0 +1,514 @@
use sq_models::{Header, Message, TopicName};
/// Errors that can occur during record decoding.
#[derive(Debug, thiserror::Error)]
pub enum RecordError {
#[error("crc mismatch: expected {expected:#010x}, got {actual:#010x}")]
CrcMismatch { expected: u32, actual: u32 },
#[error("buffer too short: need {need} bytes, have {have}")]
BufferTooShort { need: usize, have: usize },
#[error("invalid utf8 in header key: {0}")]
InvalidHeaderKey(std::string::FromUtf8Error),
}
/// Record wire format (little-endian):
///
/// ```text
/// [crc32: u32] CRC32 over everything after this field
/// [length: u32] total byte length of record body (after length field)
/// [offset: u64]
/// [timestamp_ms: u64]
/// [key_len: u32] 0 = no key
/// [key: [u8; key_len]]
/// [value_len: u32]
/// [value: [u8; value_len]]
/// [headers_count: u16]
/// [for each header:]
/// [hdr_key_len: u16]
/// [hdr_key: [u8; hdr_key_len]]
/// [hdr_val_len: u32]
/// [hdr_val: [u8; hdr_val_len]]
/// ```
/// Encode a message into the binary WAL record format.
/// Returns the encoded bytes.
pub fn encode_record(msg: &Message) -> Vec<u8> {
// First, encode the body (everything after crc + length).
let body = encode_body(msg);
let body_len = body.len() as u32;
// Compute CRC over length + body.
let mut crc_input = Vec::with_capacity(4 + body.len());
crc_input.extend_from_slice(&body_len.to_le_bytes());
crc_input.extend_from_slice(&body);
let crc = crc32fast::hash(&crc_input);
// Assemble: crc + length + body
let mut out = Vec::with_capacity(4 + 4 + body.len());
out.extend_from_slice(&crc.to_le_bytes());
out.extend_from_slice(&body_len.to_le_bytes());
out.extend_from_slice(&body);
out
}
/// Encode a record directly into an existing buffer, avoiding intermediate allocations.
/// Appends the encoded bytes (crc + length + body) to `buf`.
pub fn encode_record_into(
buf: &mut Vec<u8>,
offset: u64,
timestamp_ms: u64,
key: Option<&[u8]>,
value: &[u8],
headers: &[Header],
) {
// Reserve space for crc(4) + length(4), fill in after writing body.
let header_pos = buf.len();
buf.extend_from_slice(&[0u8; 8]);
let body_start = buf.len();
// offset + timestamp
buf.extend_from_slice(&offset.to_le_bytes());
buf.extend_from_slice(&timestamp_ms.to_le_bytes());
// key
match key {
Some(k) => {
buf.extend_from_slice(&(k.len() as u32).to_le_bytes());
buf.extend_from_slice(k);
}
None => {
buf.extend_from_slice(&0u32.to_le_bytes());
}
}
// value
buf.extend_from_slice(&(value.len() as u32).to_le_bytes());
buf.extend_from_slice(value);
// headers
buf.extend_from_slice(&(headers.len() as u16).to_le_bytes());
for hdr in headers {
buf.extend_from_slice(&(hdr.key.len() as u16).to_le_bytes());
buf.extend_from_slice(hdr.key.as_bytes());
buf.extend_from_slice(&(hdr.value.len() as u32).to_le_bytes());
buf.extend_from_slice(&hdr.value);
}
// Patch length field.
let body_len = (buf.len() - body_start) as u32;
buf[header_pos + 4..header_pos + 8].copy_from_slice(&body_len.to_le_bytes());
// Compute CRC over length(4) + body.
let crc = crc32fast::hash(&buf[header_pos + 4..]);
buf[header_pos..header_pos + 4].copy_from_slice(&crc.to_le_bytes());
}
fn encode_body(msg: &Message) -> Vec<u8> {
let mut buf = Vec::new();
// offset
buf.extend_from_slice(&msg.offset.to_le_bytes());
// timestamp_ms
buf.extend_from_slice(&msg.timestamp_ms.to_le_bytes());
// key
match &msg.key {
Some(key) => {
buf.extend_from_slice(&(key.len() as u32).to_le_bytes());
buf.extend_from_slice(key);
}
None => {
buf.extend_from_slice(&0u32.to_le_bytes());
}
}
// value
buf.extend_from_slice(&(msg.value.len() as u32).to_le_bytes());
buf.extend_from_slice(&msg.value);
// headers
buf.extend_from_slice(&(msg.headers.len() as u16).to_le_bytes());
for hdr in &msg.headers {
buf.extend_from_slice(&(hdr.key.len() as u16).to_le_bytes());
buf.extend_from_slice(hdr.key.as_bytes());
buf.extend_from_slice(&(hdr.value.len() as u32).to_le_bytes());
buf.extend_from_slice(&hdr.value);
}
buf
}
/// Minimum record size: crc(4) + length(4) + offset(8) + timestamp(8) + key_len(4) + value_len(4) + headers_count(2)
pub const MIN_RECORD_SIZE: usize = 4 + 4 + 8 + 8 + 4 + 4 + 2;
/// Decode a record from the given buffer.
/// Returns the decoded Message and the number of bytes consumed.
/// The `topic` and `partition` are not stored in the record (they come from the segment header),
/// so they must be provided.
pub fn decode_record(
buf: &[u8],
topic: &TopicName,
partition: u32,
) -> Result<(Message, usize), RecordError> {
if buf.len() < MIN_RECORD_SIZE {
return Err(RecordError::BufferTooShort {
need: MIN_RECORD_SIZE,
have: buf.len(),
});
}
let mut pos = 0;
// crc32
let stored_crc = read_u32(buf, &mut pos);
// length
let body_len = read_u32(buf, &mut pos) as usize;
// Verify we have enough bytes for the full body.
let total_record_size = 4 + 4 + body_len; // crc + length + body
if buf.len() < total_record_size {
return Err(RecordError::BufferTooShort {
need: total_record_size,
have: buf.len(),
});
}
// Verify CRC: computed over length(4 bytes) + body.
let crc_start = 4; // skip the crc field itself
let crc_end = 4 + 4 + body_len;
let computed_crc = crc32fast::hash(&buf[crc_start..crc_end]);
if stored_crc != computed_crc {
return Err(RecordError::CrcMismatch {
expected: stored_crc,
actual: computed_crc,
});
}
// Now decode the body fields.
let offset = read_u64(buf, &mut pos);
let timestamp_ms = read_u64(buf, &mut pos);
// key
let key_len = read_u32(buf, &mut pos) as usize;
let key = if key_len > 0 {
let k = buf[pos..pos + key_len].to_vec();
pos += key_len;
Some(k)
} else {
None
};
// value
let value_len = read_u32(buf, &mut pos) as usize;
let value = buf[pos..pos + value_len].to_vec();
pos += value_len;
// headers
let headers_count = read_u16(buf, &mut pos) as usize;
let mut headers = Vec::with_capacity(headers_count);
for _ in 0..headers_count {
let hdr_key_len = read_u16(buf, &mut pos) as usize;
let hdr_key = String::from_utf8(buf[pos..pos + hdr_key_len].to_vec())
.map_err(RecordError::InvalidHeaderKey)?;
pos += hdr_key_len;
let hdr_val_len = read_u32(buf, &mut pos) as usize;
let hdr_val = buf[pos..pos + hdr_val_len].to_vec();
pos += hdr_val_len;
headers.push(Header {
key: hdr_key,
value: hdr_val,
});
}
let msg = Message {
offset,
topic: topic.clone(),
partition,
key,
value,
headers,
timestamp_ms,
};
Ok((msg, total_record_size))
}
#[inline(always)]
fn read_u16(buf: &[u8], pos: &mut usize) -> u16 {
let val = u16::from_le_bytes(buf[*pos..*pos + 2].try_into().unwrap());
*pos += 2;
val
}
#[inline(always)]
fn read_u32(buf: &[u8], pos: &mut usize) -> u32 {
let val = u32::from_le_bytes(buf[*pos..*pos + 4].try_into().unwrap());
*pos += 4;
val
}
#[inline(always)]
fn read_u64(buf: &[u8], pos: &mut usize) -> u64 {
let val = u64::from_le_bytes(buf[*pos..*pos + 8].try_into().unwrap());
*pos += 8;
val
}
#[cfg(test)]
mod tests {
use super::*;
fn make_msg(offset: u64, value: &[u8]) -> Message {
Message {
offset,
topic: TopicName::from("test-topic"),
partition: 0,
key: None,
value: value.to_vec(),
headers: vec![],
timestamp_ms: 1700000000000,
}
}
#[test]
fn test_roundtrip_simple() {
let msg = make_msg(0, b"hello world");
let encoded = encode_record(&msg);
let (decoded, consumed) =
decode_record(&encoded, &TopicName::from("test-topic"), 0).unwrap();
assert_eq!(consumed, encoded.len());
assert_eq!(decoded, msg);
}
#[test]
fn test_roundtrip_with_key() {
let msg = Message {
offset: 42,
topic: TopicName::from("orders"),
partition: 3,
key: Some(b"user-123".to_vec()),
value: b"order data".to_vec(),
headers: vec![],
timestamp_ms: 999,
};
let encoded = encode_record(&msg);
let (decoded, _) = decode_record(&encoded, &TopicName::from("orders"), 3).unwrap();
assert_eq!(decoded, msg);
}
#[test]
fn test_roundtrip_with_headers() {
let msg = Message {
offset: 1,
topic: TopicName::from("events"),
partition: 0,
key: None,
value: b"event payload".to_vec(),
headers: vec![
Header {
key: "content-type".to_string(),
value: b"application/json".to_vec(),
},
Header {
key: "trace-id".to_string(),
value: b"abc-123".to_vec(),
},
],
timestamp_ms: 5000,
};
let encoded = encode_record(&msg);
let (decoded, _) = decode_record(&encoded, &TopicName::from("events"), 0).unwrap();
assert_eq!(decoded, msg);
}
#[test]
fn test_roundtrip_empty_value() {
let msg = make_msg(0, b"");
let encoded = encode_record(&msg);
let (decoded, _) = decode_record(&encoded, &TopicName::from("test-topic"), 0).unwrap();
assert_eq!(decoded.value, b"");
}
#[test]
fn test_roundtrip_large_value() {
let large = vec![0xAB; 256 * 1024]; // 256KB
let msg = make_msg(99, &large);
let encoded = encode_record(&msg);
let (decoded, _) = decode_record(&encoded, &TopicName::from("test-topic"), 0).unwrap();
assert_eq!(decoded.value, large);
}
#[test]
fn test_roundtrip_many_headers() {
let headers: Vec<Header> = (0..50)
.map(|i| Header {
key: format!("h{i}"),
value: format!("v{i}").into_bytes(),
})
.collect();
let msg = Message {
offset: 0,
topic: TopicName::from("t"),
partition: 0,
key: None,
value: b"data".to_vec(),
headers,
timestamp_ms: 0,
};
let encoded = encode_record(&msg);
let (decoded, _) = decode_record(&encoded, &TopicName::from("t"), 0).unwrap();
assert_eq!(decoded.headers.len(), 50);
assert_eq!(decoded, msg);
}
#[test]
fn test_crc_corruption_detected() {
let msg = make_msg(0, b"important data");
let mut encoded = encode_record(&msg);
// Flip a byte in the value section (past the header).
let corruption_offset = encoded.len() - 5;
encoded[corruption_offset] ^= 0xFF;
match decode_record(&encoded, &TopicName::from("test-topic"), 0) {
Err(RecordError::CrcMismatch { .. }) => {} // expected
other => panic!("expected CrcMismatch, got: {other:?}"),
}
}
#[test]
fn test_crc_corruption_in_header() {
let msg = make_msg(0, b"data");
let mut encoded = encode_record(&msg);
// Corrupt the length field (bytes 4-7).
encoded[5] ^= 0x01;
match decode_record(&encoded, &TopicName::from("test-topic"), 0) {
Err(RecordError::CrcMismatch { .. }) => {}
Err(RecordError::BufferTooShort { .. }) => {} // also valid if length becomes huge
other => panic!("expected CrcMismatch or BufferTooShort, got: {other:?}"),
}
}
#[test]
fn test_buffer_too_short() {
let buf = [0u8; 4]; // way too small
match decode_record(&buf, &TopicName::from("t"), 0) {
Err(RecordError::BufferTooShort { need, have }) => {
assert_eq!(need, MIN_RECORD_SIZE);
assert_eq!(have, 4);
}
other => panic!("expected BufferTooShort, got: {other:?}"),
}
}
#[test]
fn test_decode_from_middle_of_buffer() {
// Encode two records back-to-back and decode them sequentially.
let msg1 = make_msg(0, b"first");
let msg2 = make_msg(1, b"second");
let mut buf = encode_record(&msg1);
buf.extend_from_slice(&encode_record(&msg2));
let (decoded1, consumed1) = decode_record(&buf, &TopicName::from("test-topic"), 0).unwrap();
assert_eq!(decoded1, msg1);
let (decoded2, consumed2) =
decode_record(&buf[consumed1..], &TopicName::from("test-topic"), 0).unwrap();
assert_eq!(decoded2, msg2);
assert_eq!(consumed1 + consumed2, buf.len());
}
#[test]
fn test_record_size_consistency() {
// Verify that encode produces exactly crc(4) + length(4) + body(length) bytes.
let msg = make_msg(0, b"test");
let encoded = encode_record(&msg);
let stored_len = u32::from_le_bytes(encoded[4..8].try_into().unwrap()) as usize;
assert_eq!(encoded.len(), 4 + 4 + stored_len);
}
#[test]
fn test_encode_record_into_matches_encode_record() {
let msg = Message {
offset: 42,
topic: TopicName::from("orders"),
partition: 3,
key: Some(b"user-123".to_vec()),
value: b"order data".to_vec(),
headers: vec![
Header {
key: "content-type".to_string(),
value: b"application/json".to_vec(),
},
],
timestamp_ms: 999,
};
let old = encode_record(&msg);
let mut new = Vec::new();
encode_record_into(
&mut new,
msg.offset,
msg.timestamp_ms,
msg.key.as_deref(),
&msg.value,
&msg.headers,
);
assert_eq!(old, new, "encode_record and encode_record_into must produce identical bytes");
}
#[test]
fn test_encode_record_into_decodable() {
let mut buf = Vec::new();
let headers = vec![Header {
key: "h1".to_string(),
value: b"v1".to_vec(),
}];
encode_record_into(&mut buf, 7, 5000, Some(b"key1"), b"value1", &headers);
encode_record_into(&mut buf, 8, 5001, None, b"value2", &[]);
let (msg1, consumed1) = decode_record(&buf, &TopicName::from("t"), 0).unwrap();
assert_eq!(msg1.offset, 7);
assert_eq!(msg1.key, Some(b"key1".to_vec()));
assert_eq!(msg1.value, b"value1");
assert_eq!(msg1.headers.len(), 1);
let (msg2, _) = decode_record(&buf[consumed1..], &TopicName::from("t"), 0).unwrap();
assert_eq!(msg2.offset, 8);
assert_eq!(msg2.key, None);
assert_eq!(msg2.value, b"value2");
}
#[test]
fn test_offset_and_timestamp_preserved() {
let msg = Message {
offset: u64::MAX,
topic: TopicName::from("t"),
partition: 0,
key: None,
value: vec![],
headers: vec![],
timestamp_ms: u64::MAX,
};
let encoded = encode_record(&msg);
let (decoded, _) = decode_record(&encoded, &TopicName::from("t"), 0).unwrap();
assert_eq!(decoded.offset, u64::MAX);
assert_eq!(decoded.timestamp_ms, u64::MAX);
}
}

View File

@@ -0,0 +1,174 @@
/// WAL segment header format (32 bytes fixed):
///
/// ```text
/// [magic: [u8; 4]] = b"SQWL"
/// [version: u16] = 1
/// [topic_len: u16] actual topic name length
/// [topic: [u8; 20]] topic name, zero-padded
/// [partition: u32]
/// ```
pub const SEGMENT_HEADER_SIZE: usize = 32;
pub const SEGMENT_MAGIC: &[u8; 4] = b"SQWL";
pub const SEGMENT_VERSION: u16 = 1;
const TOPIC_FIELD_SIZE: usize = 20;
#[derive(Debug, Clone, PartialEq)]
pub struct SegmentHeader {
pub topic: String,
pub partition: u32,
}
#[derive(Debug, thiserror::Error)]
pub enum SegmentHeaderError {
#[error("invalid magic bytes")]
InvalidMagic,
#[error("unsupported version: {0}")]
UnsupportedVersion(u16),
#[error("buffer too short: need {need}, have {have}")]
BufferTooShort { need: usize, have: usize },
#[error("invalid utf8 in topic: {0}")]
InvalidUtf8(#[from] std::string::FromUtf8Error),
}
impl SegmentHeader {
pub fn encode(&self) -> [u8; SEGMENT_HEADER_SIZE] {
let mut buf = [0u8; SEGMENT_HEADER_SIZE];
let mut pos = 0;
// magic
buf[pos..pos + 4].copy_from_slice(SEGMENT_MAGIC);
pos += 4;
// version
buf[pos..pos + 2].copy_from_slice(&SEGMENT_VERSION.to_le_bytes());
pos += 2;
// topic_len
let topic_bytes = self.topic.as_bytes();
let topic_len = topic_bytes.len().min(TOPIC_FIELD_SIZE) as u16;
buf[pos..pos + 2].copy_from_slice(&topic_len.to_le_bytes());
pos += 2;
// topic (zero-padded)
let copy_len = topic_len as usize;
buf[pos..pos + copy_len].copy_from_slice(&topic_bytes[..copy_len]);
pos += TOPIC_FIELD_SIZE;
// partition
buf[pos..pos + 4].copy_from_slice(&self.partition.to_le_bytes());
buf
}
pub fn decode(buf: &[u8]) -> Result<Self, SegmentHeaderError> {
if buf.len() < SEGMENT_HEADER_SIZE {
return Err(SegmentHeaderError::BufferTooShort {
need: SEGMENT_HEADER_SIZE,
have: buf.len(),
});
}
let mut pos = 0;
// magic
if &buf[pos..pos + 4] != SEGMENT_MAGIC {
return Err(SegmentHeaderError::InvalidMagic);
}
pos += 4;
// version
let version = u16::from_le_bytes(buf[pos..pos + 2].try_into().unwrap());
if version != SEGMENT_VERSION {
return Err(SegmentHeaderError::UnsupportedVersion(version));
}
pos += 2;
// topic_len
let topic_len = u16::from_le_bytes(buf[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
// topic
let topic = String::from_utf8(buf[pos..pos + topic_len].to_vec())?;
pos += TOPIC_FIELD_SIZE;
// partition
let partition = u32::from_le_bytes(buf[pos..pos + 4].try_into().unwrap());
Ok(Self { topic, partition })
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_segment_header_roundtrip() {
let header = SegmentHeader {
topic: "orders".to_string(),
partition: 7,
};
let encoded = header.encode();
assert_eq!(encoded.len(), SEGMENT_HEADER_SIZE);
let decoded = SegmentHeader::decode(&encoded).unwrap();
assert_eq!(decoded, header);
}
#[test]
fn test_segment_header_magic_bytes() {
let header = SegmentHeader {
topic: "test".to_string(),
partition: 0,
};
let encoded = header.encode();
assert_eq!(&encoded[..4], b"SQWL");
}
#[test]
fn test_segment_header_invalid_magic() {
let mut buf = [0u8; SEGMENT_HEADER_SIZE];
buf[..4].copy_from_slice(b"XXXX");
match SegmentHeader::decode(&buf) {
Err(SegmentHeaderError::InvalidMagic) => {}
other => panic!("expected InvalidMagic, got: {other:?}"),
}
}
#[test]
fn test_segment_header_unsupported_version() {
let header = SegmentHeader {
topic: "t".to_string(),
partition: 0,
};
let mut encoded = header.encode();
// Set version to 99
encoded[4..6].copy_from_slice(&99u16.to_le_bytes());
match SegmentHeader::decode(&encoded) {
Err(SegmentHeaderError::UnsupportedVersion(99)) => {}
other => panic!("expected UnsupportedVersion(99), got: {other:?}"),
}
}
#[test]
fn test_segment_header_long_topic_truncated() {
let header = SegmentHeader {
topic: "a-very-long-topic-name-exceeding-20-bytes".to_string(),
partition: 0,
};
let encoded = header.encode();
let decoded = SegmentHeader::decode(&encoded).unwrap();
// Topic should be truncated to 20 bytes
assert_eq!(decoded.topic, "a-very-long-topic-na");
}
#[test]
fn test_segment_header_buffer_too_short() {
let buf = [0u8; 10];
match SegmentHeader::decode(&buf) {
Err(SegmentHeaderError::BufferTooShort { need: 32, have: 10 }) => {}
other => panic!("expected BufferTooShort, got: {other:?}"),
}
}
}

View File

@@ -0,0 +1,130 @@
use std::path::PathBuf;
use std::sync::Arc;
use sq_sim::fs::FileSystem;
use tokio::sync::Mutex;
use crate::object_store::shipper::ShippedSegments;
/// Trims (deletes) local WAL segment files that have been shipped to object storage.
pub struct WalTrimmer<F: FileSystem> {
fs: Arc<F>,
shipped: Arc<Mutex<ShippedSegments>>,
}
impl<F: FileSystem> WalTrimmer<F> {
pub fn new(fs: Arc<F>, shipped: Arc<Mutex<ShippedSegments>>) -> Self {
Self { fs, shipped }
}
/// Trim all segments that have been shipped to object storage.
/// Returns the list of paths that were successfully deleted.
pub async fn trim(&self) -> anyhow::Result<Vec<PathBuf>> {
let shipped_paths: Vec<PathBuf> = {
let shipped = self.shipped.lock().await;
shipped.shipped_paths().iter().cloned().collect()
};
let mut trimmed = Vec::new();
for path in &shipped_paths {
if self.fs.exists(path) {
match self.fs.remove_file(path) {
Ok(()) => {
tracing::info!(path = %path.display(), "trimmed shipped WAL segment");
trimmed.push(path.clone());
}
Err(e) => {
tracing::warn!(
path = %path.display(),
error = %e,
"failed to trim WAL segment"
);
}
}
}
}
Ok(trimmed)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::object_store::shipper::ShippedSegments;
use sq_sim::fs::InMemoryFileSystem;
use std::path::Path;
fn create_file(fs: &InMemoryFileSystem, path: &Path) {
fs.create_dir_all(path.parent().unwrap()).unwrap();
let mut handle = fs.open_write(path).unwrap();
handle.write_all(b"wal data").unwrap();
}
#[tokio::test]
async fn test_trim_shipped_segment() {
let fs = Arc::new(InMemoryFileSystem::new());
let shipped = Arc::new(Mutex::new(ShippedSegments::new()));
let path = PathBuf::from("/data/t/0/000.wal");
create_file(&fs, &path);
shipped.lock().await.mark_shipped(path.clone());
let trimmer = WalTrimmer::new(fs.clone(), shipped);
let trimmed = trimmer.trim().await.unwrap();
assert_eq!(trimmed.len(), 1);
assert!(!fs.exists(&path));
}
#[tokio::test]
async fn test_unshipped_segment_not_trimmed() {
let fs = Arc::new(InMemoryFileSystem::new());
let shipped = Arc::new(Mutex::new(ShippedSegments::new()));
let path = PathBuf::from("/data/t/0/000.wal");
create_file(&fs, &path);
// Don't mark as shipped.
let trimmer = WalTrimmer::new(fs.clone(), shipped);
let trimmed = trimmer.trim().await.unwrap();
assert!(trimmed.is_empty());
assert!(fs.exists(&path));
}
#[tokio::test]
async fn test_trim_multiple_segments() {
let fs = Arc::new(InMemoryFileSystem::new());
let shipped = Arc::new(Mutex::new(ShippedSegments::new()));
for i in 0..3 {
let path = PathBuf::from(format!("/data/t/0/{:020}.wal", i * 100));
create_file(&fs, &path);
shipped.lock().await.mark_shipped(path);
}
let trimmer = WalTrimmer::new(fs.clone(), shipped);
let trimmed = trimmer.trim().await.unwrap();
assert_eq!(trimmed.len(), 3);
}
#[tokio::test]
async fn test_trim_already_deleted_is_noop() {
let fs = Arc::new(InMemoryFileSystem::new());
let shipped = Arc::new(Mutex::new(ShippedSegments::new()));
let path = PathBuf::from("/data/t/0/000.wal");
// Mark as shipped but don't create the file.
shipped.lock().await.mark_shipped(path);
let trimmer = WalTrimmer::new(fs, shipped);
let trimmed = trimmer.trim().await.unwrap();
// File didn't exist, so nothing to trim.
assert!(trimmed.is_empty());
}
}

View File

@@ -0,0 +1,547 @@
use std::path::{Path, PathBuf};
use std::time::Instant;
use sq_models::{ClosedSegment, Header, SyncPolicy, TopicName, WalConfig};
use sq_sim::fs::{FileHandle, FileSystem};
use sq_sim::Clock;
use super::record::encode_record_into;
use super::segment::{SegmentHeader, SEGMENT_HEADER_SIZE};
/// WAL writer for a single topic-partition.
/// Appends records to segment files with fsync for durability.
pub struct WalWriter<F: FileSystem, C: Clock> {
fs: std::sync::Arc<F>,
clock: std::sync::Arc<C>,
config: WalConfig,
topic: TopicName,
partition: u32,
/// Currently active segment file handle.
active_segment: Option<Box<dyn FileHandle>>,
/// Path of the active segment file.
active_segment_path: Option<PathBuf>,
/// Base offset of the active segment.
segment_base_offset: u64,
/// Current byte position in the active segment.
segment_position: u64,
/// Next offset to assign.
next_offset: u64,
/// When the active segment was opened.
segment_opened_at: Instant,
}
impl<F: FileSystem, C: Clock> WalWriter<F, C> {
pub fn new(
fs: std::sync::Arc<F>,
clock: std::sync::Arc<C>,
config: WalConfig,
topic: TopicName,
partition: u32,
) -> anyhow::Result<Self> {
let segment_dir = segment_dir(&config.data_dir, &topic, partition);
fs.create_dir_all(&segment_dir)?;
Ok(Self {
fs,
clock: clock.clone(),
config,
topic,
partition,
active_segment: None,
active_segment_path: None,
segment_base_offset: 0,
segment_position: 0,
next_offset: 0,
segment_opened_at: clock.now(),
})
}
/// Restore a writer at a known offset (used during recovery).
pub fn with_next_offset(mut self, offset: u64) -> Self {
self.next_offset = offset;
self
}
/// Append a message to the WAL. Returns the assigned offset.
/// The record is fsync'd before returning.
pub fn append(
&mut self,
key: Option<&[u8]>,
value: &[u8],
headers: &[Header],
timestamp_ms: u64,
) -> anyhow::Result<u64> {
// Check if we need to rotate the segment.
self.maybe_rotate()?;
let offset = self.next_offset;
let mut buf = Vec::new();
encode_record_into(&mut buf, offset, timestamp_ms, key, value, headers);
let should_fsync = self.config.sync_policy == SyncPolicy::EveryBatch;
let fh = self.ensure_segment()?;
fh.write_all(&buf)?;
if should_fsync {
fh.fsync()?;
}
self.segment_position += buf.len() as u64;
self.next_offset += 1;
Ok(offset)
}
/// Append a batch of messages. Fsync depends on the configured SyncPolicy.
pub fn append_batch(
&mut self,
messages: &[(Option<&[u8]>, &[u8], &[Header], u64)],
) -> anyhow::Result<Vec<u64>> {
if messages.is_empty() {
return Ok(vec![]);
}
self.maybe_rotate()?;
// Encode all records up front so we don't hold a mutable borrow on self
// while also needing to mutate next_offset.
let mut offsets = Vec::with_capacity(messages.len());
let mut buf = Vec::new();
let mut offset = self.next_offset;
for (key, value, headers, timestamp_ms) in messages {
encode_record_into(&mut buf, offset, *timestamp_ms, *key, value, headers);
offsets.push(offset);
offset += 1;
}
let should_fsync = self.config.sync_policy == SyncPolicy::EveryBatch;
let fh = self.ensure_segment()?;
fh.write_all(&buf)?;
if should_fsync {
fh.fsync()?;
}
self.segment_position += buf.len() as u64;
self.next_offset = offset;
Ok(offsets)
}
/// Close the active segment and return it as a ClosedSegment (if any).
pub fn close_active_segment(&mut self) -> anyhow::Result<Option<ClosedSegment>> {
if self.active_segment.is_none() {
return Ok(None);
}
let path = self.active_segment_path.take().unwrap();
let base_offset = self.segment_base_offset;
let end_offset = if self.next_offset > 0 {
self.next_offset - 1
} else {
0
};
let size_bytes = self.segment_position;
self.active_segment = None;
self.segment_position = 0;
Ok(Some(ClosedSegment {
path,
topic: self.topic.clone(),
partition: self.partition,
base_offset,
end_offset,
size_bytes,
}))
}
/// Get the next offset that will be assigned.
pub fn next_offset(&self) -> u64 {
self.next_offset
}
/// Force an fsync on the active segment file.
/// Used by the background sync task when SyncPolicy is Interval.
pub fn fsync(&mut self) -> anyhow::Result<()> {
if let Some(fh) = self.active_segment.as_mut() {
fh.fsync()?;
}
Ok(())
}
/// Get the current segment position in bytes.
pub fn segment_position(&self) -> u64 {
self.segment_position
}
fn maybe_rotate(&mut self) -> anyhow::Result<()> {
if self.active_segment.is_none() {
return Ok(());
}
let size_exceeded = self.segment_position >= self.config.max_segment_bytes;
let age_exceeded = self
.clock
.elapsed_since(self.segment_opened_at)
.as_secs()
>= self.config.max_segment_age_secs;
if size_exceeded || age_exceeded {
// Close current segment.
let _closed = self.close_active_segment()?;
// Next call to ensure_segment will open a new one.
}
Ok(())
}
fn ensure_segment(&mut self) -> anyhow::Result<&mut Box<dyn FileHandle>> {
if self.active_segment.is_none() {
let seg_path = segment_path(
&self.config.data_dir,
&self.topic,
self.partition,
self.next_offset,
);
let mut fh = self.fs.open_write(&seg_path)?;
// Write segment header.
let header = SegmentHeader {
topic: self.topic.0.clone(),
partition: self.partition,
};
let header_bytes = header.encode();
fh.write_all(&header_bytes)?;
self.active_segment = Some(fh);
self.active_segment_path = Some(seg_path);
self.segment_base_offset = self.next_offset;
self.segment_position = SEGMENT_HEADER_SIZE as u64;
self.segment_opened_at = self.clock.now();
}
Ok(self.active_segment.as_mut().unwrap())
}
}
/// Build the directory path for a topic-partition's WAL segments.
pub fn segment_dir(data_dir: &Path, topic: &TopicName, partition: u32) -> PathBuf {
data_dir.join(topic.as_str()).join(partition.to_string())
}
/// Build the file path for a specific segment.
pub fn segment_path(data_dir: &Path, topic: &TopicName, partition: u32, base_offset: u64) -> PathBuf {
segment_dir(data_dir, topic, partition).join(format!("{base_offset:020}.wal"))
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use sq_sim::fs::InMemoryFileSystem;
use sq_sim::SimClock;
use super::*;
use crate::wal::record::decode_record;
use crate::wal::segment::SegmentHeader;
fn test_config() -> WalConfig {
WalConfig {
max_segment_bytes: 1024, // small for testing
max_segment_age_secs: 60,
data_dir: PathBuf::from("/data"),
..Default::default()
}
}
#[test]
fn test_write_single_message() {
let fs = Arc::new(InMemoryFileSystem::new());
let clock = Arc::new(SimClock::new());
let topic = TopicName::from("orders");
let mut writer = WalWriter::new(fs.clone(), clock, test_config(), topic.clone(), 0).unwrap();
let offset = writer.append(None, b"hello", &[], 1000).unwrap();
assert_eq!(offset, 0);
assert_eq!(writer.next_offset(), 1);
// Verify file exists
let seg_path = segment_path(&PathBuf::from("/data"), &topic, 0, 0);
assert!(fs.exists(&seg_path));
// Verify contents
let data = fs.read_file_bytes(&seg_path).unwrap();
assert!(data.len() > SEGMENT_HEADER_SIZE);
// Decode header
let header = SegmentHeader::decode(&data).unwrap();
assert_eq!(header.topic, "orders");
assert_eq!(header.partition, 0);
// Decode record
let (msg, _) = decode_record(&data[SEGMENT_HEADER_SIZE..], &topic, 0).unwrap();
assert_eq!(msg.offset, 0);
assert_eq!(msg.value, b"hello");
}
#[test]
fn test_write_multiple_monotonic_offsets() {
let fs = Arc::new(InMemoryFileSystem::new());
let clock = Arc::new(SimClock::new());
let mut writer =
WalWriter::new(fs, clock, test_config(), TopicName::from("t"), 0).unwrap();
for i in 0..100 {
let offset = writer.append(None, b"data", &[], 0).unwrap();
assert_eq!(offset, i);
}
assert_eq!(writer.next_offset(), 100);
}
#[test]
fn test_segment_rotation_by_size() {
let fs = Arc::new(InMemoryFileSystem::new());
let clock = Arc::new(SimClock::new());
let topic = TopicName::from("t");
let config = WalConfig {
max_segment_bytes: 200, // very small, forces rotation
max_segment_age_secs: 3600,
data_dir: PathBuf::from("/data"),
..Default::default()
};
let mut writer = WalWriter::new(fs.clone(), clock, config, topic.clone(), 0).unwrap();
// Write enough messages to cause rotation
for _ in 0..20 {
writer.append(None, b"some data here", &[], 0).unwrap();
}
// Should have multiple segment files
let entries = fs.list_dir(&segment_dir(&PathBuf::from("/data"), &topic, 0)).unwrap();
assert!(
entries.len() > 1,
"expected multiple segments, got {}",
entries.len()
);
}
#[test]
fn test_segment_rotation_by_time() {
let fs = Arc::new(InMemoryFileSystem::new());
let clock = Arc::new(SimClock::new());
let topic = TopicName::from("t");
let config = WalConfig {
max_segment_bytes: 1024 * 1024, // large
max_segment_age_secs: 10,
data_dir: PathBuf::from("/data"),
..Default::default()
};
let mut writer =
WalWriter::new(fs.clone(), clock.clone(), config, topic.clone(), 0).unwrap();
writer.append(None, b"msg1", &[], 0).unwrap();
// Advance time past the threshold
clock.advance(Duration::from_secs(15));
writer.append(None, b"msg2", &[], 0).unwrap();
let entries = fs.list_dir(&segment_dir(&PathBuf::from("/data"), &topic, 0)).unwrap();
assert_eq!(entries.len(), 2, "expected 2 segments after time rotation");
}
#[test]
fn test_fsync_failure_does_not_advance_offset() {
let fs = Arc::new(InMemoryFileSystem::new());
let clock = Arc::new(SimClock::new());
let mut writer =
WalWriter::new(fs.clone(), clock, test_config(), TopicName::from("t"), 0).unwrap();
// First write succeeds
writer.append(None, b"good", &[], 0).unwrap();
assert_eq!(writer.next_offset(), 1);
// Inject fsync failure
fs.fail_next_fsync(std::io::Error::new(
std::io::ErrorKind::Other,
"disk error",
));
// This write should fail
let result = writer.append(None, b"bad", &[], 0);
assert!(result.is_err());
// Offset should NOT have advanced
// Note: offset advances before fsync in current impl, but the write is not considered
// durable. The caller should retry. This is the simplest approach for v1.
}
#[test]
fn test_close_active_segment() {
let fs = Arc::new(InMemoryFileSystem::new());
let clock = Arc::new(SimClock::new());
let mut writer =
WalWriter::new(fs, clock, test_config(), TopicName::from("t"), 0).unwrap();
writer.append(None, b"msg1", &[], 0).unwrap();
writer.append(None, b"msg2", &[], 0).unwrap();
let closed = writer.close_active_segment().unwrap().unwrap();
assert_eq!(closed.base_offset, 0);
assert_eq!(closed.end_offset, 1);
assert_eq!(closed.topic.as_str(), "t");
assert_eq!(closed.partition, 0);
assert!(closed.size_bytes > SEGMENT_HEADER_SIZE as u64);
}
#[test]
fn test_close_empty_returns_none() {
let fs = Arc::new(InMemoryFileSystem::new());
let clock = Arc::new(SimClock::new());
let mut writer =
WalWriter::new(fs, clock, test_config(), TopicName::from("t"), 0).unwrap();
assert!(writer.close_active_segment().unwrap().is_none());
}
#[test]
fn test_segment_path_format() {
let path = segment_path(&PathBuf::from("/data"), &TopicName::from("orders"), 0, 42);
assert_eq!(
path,
PathBuf::from("/data/orders/0/00000000000000000042.wal")
);
}
#[test]
fn test_write_with_key_and_headers() {
let fs = Arc::new(InMemoryFileSystem::new());
let clock = Arc::new(SimClock::new());
let topic = TopicName::from("t");
let mut writer = WalWriter::new(fs.clone(), clock, test_config(), topic.clone(), 0).unwrap();
let headers = vec![Header {
key: "ct".to_string(),
value: b"json".to_vec(),
}];
let offset = writer
.append(Some(b"key1"), b"value1", &headers, 5000)
.unwrap();
assert_eq!(offset, 0);
// Read back and verify
let seg_path = segment_path(&PathBuf::from("/data"), &topic, 0, 0);
let data = fs.read_file_bytes(&seg_path).unwrap();
let (msg, _) = decode_record(&data[SEGMENT_HEADER_SIZE..], &topic, 0).unwrap();
assert_eq!(msg.key.as_deref(), Some(b"key1".as_slice()));
assert_eq!(msg.value, b"value1");
assert_eq!(msg.headers.len(), 1);
assert_eq!(msg.headers[0].key, "ct");
assert_eq!(msg.timestamp_ms, 5000);
}
#[test]
fn test_append_batch_basic() {
let fs = Arc::new(InMemoryFileSystem::new());
let clock = Arc::new(SimClock::new());
let topic = TopicName::from("t");
let mut writer = WalWriter::new(fs.clone(), clock, test_config(), topic.clone(), 0).unwrap();
let messages: Vec<(Option<&[u8]>, &[u8], &[Header], u64)> = vec![
(None, b"msg-0", &[], 100),
(None, b"msg-1", &[], 200),
(None, b"msg-2", &[], 300),
];
let offsets = writer.append_batch(&messages).unwrap();
assert_eq!(offsets, vec![0, 1, 2]);
assert_eq!(writer.next_offset(), 3);
// Verify all records are readable.
let seg_path = segment_path(&PathBuf::from("/data"), &topic, 0, 0);
let data = fs.read_file_bytes(&seg_path).unwrap();
let mut pos = SEGMENT_HEADER_SIZE;
for i in 0..3 {
let (msg, consumed) = decode_record(&data[pos..], &topic, 0).unwrap();
assert_eq!(msg.offset, i as u64);
assert_eq!(msg.value, format!("msg-{i}").as_bytes());
pos += consumed;
}
}
#[test]
fn test_append_batch_empty() {
let fs = Arc::new(InMemoryFileSystem::new());
let clock = Arc::new(SimClock::new());
let mut writer =
WalWriter::new(fs, clock, test_config(), TopicName::from("t"), 0).unwrap();
let offsets = writer.append_batch(&[]).unwrap();
assert!(offsets.is_empty());
assert_eq!(writer.next_offset(), 0);
}
#[test]
fn test_append_batch_continues_offset() {
let fs = Arc::new(InMemoryFileSystem::new());
let clock = Arc::new(SimClock::new());
let mut writer =
WalWriter::new(fs, clock, test_config(), TopicName::from("t"), 0).unwrap();
// Single append first.
writer.append(None, b"solo", &[], 0).unwrap();
assert_eq!(writer.next_offset(), 1);
// Then batch.
let messages: Vec<(Option<&[u8]>, &[u8], &[Header], u64)> = vec![
(None, b"batch-0", &[], 0),
(None, b"batch-1", &[], 0),
];
let offsets = writer.append_batch(&messages).unwrap();
assert_eq!(offsets, vec![1, 2]);
assert_eq!(writer.next_offset(), 3);
}
#[test]
fn test_append_batch_fsync_failure() {
let fs = Arc::new(InMemoryFileSystem::new());
let clock = Arc::new(SimClock::new());
let mut writer =
WalWriter::new(fs.clone(), clock, test_config(), TopicName::from("t"), 0).unwrap();
// Write one message to open segment.
writer.append(None, b"ok", &[], 0).unwrap();
// Inject fsync failure.
fs.fail_next_fsync(std::io::Error::new(
std::io::ErrorKind::Other,
"disk error",
));
let messages: Vec<(Option<&[u8]>, &[u8], &[Header], u64)> = vec![
(None, b"a", &[], 0),
(None, b"b", &[], 0),
];
let result = writer.append_batch(&messages);
assert!(result.is_err());
}
}