feat: add initial

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
2026-02-26 21:52:50 +01:00
commit 3162971c89
48 changed files with 3041 additions and 0 deletions

View File

@@ -0,0 +1,46 @@
# SQ-000: Workspace Skeleton
**Status:** `[x] DONE`
**Blocked by:** None
**Priority:** Critical (everything depends on this)
## Description
Bootstrap the entire SQ workspace: Cargo workspace, all crate stubs, buf protobuf config, dev tooling, and gitignore. Every crate should compile with an empty `lib.rs` or `main.rs`.
## Files to Create
### Root
- `Cargo.toml` - workspace root with all members and shared dependencies
- `Cargo.lock` - (generated)
- `buf.yaml` - buf module definition (`buf.build/rawpotion/sq`)
- `buf.gen.yaml` - prost + tonic codegen config
- `mise.toml` - dev tasks (build, test, generate:proto, local:up/down)
- `.gitignore` - target/, .env, etc.
- `.env` - local dev environment vars
### Proto
- `interface/proto/sq/v1/health.proto` - minimal Status service
### Crate stubs
- `crates/sq-grpc-interface/Cargo.toml` + `src/lib.rs`
- `crates/sq-models/Cargo.toml` + `src/lib.rs`
- `crates/sq-storage/Cargo.toml` + `src/lib.rs`
- `crates/sq-cluster/Cargo.toml` + `src/lib.rs`
- `crates/sq-server/Cargo.toml` + `src/main.rs`
- `crates/sq-sdk/Cargo.toml` + `src/lib.rs`
- `crates/sq-sim/Cargo.toml` + `src/lib.rs`
## Acceptance Criteria
- [x] `cargo check --workspace` passes
- [x] `cargo test --workspace` passes (no tests yet, but no errors)
- [x] All 7 crates are listed in workspace members
- [x] Workspace dependencies match forest/fungus conventions (tonic 0.14.2, prost 0.14.1, etc.)
## Notes
- Follow forest's Cargo.toml pattern exactly: `/home/kjuulh/git/src.rawpotion.io/rawpotion/forest/Cargo.toml`
- Follow forest's buf.gen.yaml: `/home/kjuulh/git/src.rawpotion.io/rawpotion/forest/buf.gen.yaml`
- Use Rust edition 2024
- sq-server is the only binary crate; all others are libraries

View File

@@ -0,0 +1,54 @@
# SQ-001: Domain Types
**Status:** `[ ] TODO`
**Blocked by:** SQ-000
**Priority:** High
## Description
Define the core domain types in `sq-models`: Message, Header, TopicName, Offset, and configuration types. These are the foundational types used throughout the entire system.
## Files to Create/Modify
- `crates/sq-models/src/lib.rs` - re-exports
- `crates/sq-models/src/message.rs` - Message, Header, TopicName, Offset types
- `crates/sq-models/src/config.rs` - TopicConfig, WalConfig
## Key Types
```rust
pub struct Message {
pub offset: u64,
pub topic: TopicName,
pub key: Option<Vec<u8>>,
pub value: Vec<u8>,
pub headers: Vec<Header>,
pub timestamp_ms: u64,
}
pub struct Header {
pub key: String,
pub value: Vec<u8>,
}
pub struct TopicName(pub String);
pub struct WalConfig {
pub max_segment_bytes: u64, // default 64MB
pub max_segment_age_secs: u64, // default 60s
pub data_dir: PathBuf,
}
pub struct TopicConfig {
pub name: TopicName,
pub partitions: u32, // default 1
pub replication_factor: u32, // default 3
}
```
## Acceptance Criteria
- [ ] All types compile and are public
- [ ] `Message` implements `Clone`, `Debug`, `PartialEq`
- [ ] Unit test: construct a Message, verify all fields
- [ ] Property test (optional): arbitrary Message roundtrip through Debug/PartialEq

View File

@@ -0,0 +1,49 @@
# SQ-002: WAL Record Encoding/Decoding
**Status:** `[ ] TODO`
**Blocked by:** SQ-001
**Priority:** High
## Description
Implement binary encoding and decoding of individual WAL records, independent of file I/O. Each record is CRC32-protected for corruption detection.
## Files to Create/Modify
- `crates/sq-storage/src/wal/mod.rs` - module declaration
- `crates/sq-storage/src/wal/record.rs` - encode_record, decode_record, CRC validation
- `crates/sq-storage/src/lib.rs` - re-export wal module
## Record Binary Format
```
[crc32: u32] - CRC32 over everything after this field
[length: u32] - total byte length of record (excluding crc32 and length)
[offset: u64] - monotonic offset
[timestamp_ms: u64] - wall clock millis
[key_len: u32] - 0 = no key
[key: [u8; key_len]]
[value_len: u32]
[value: [u8; value_len]]
[headers_count: u16]
[for each header:]
[hdr_key_len: u16]
[hdr_key: [u8; hdr_key_len]]
[hdr_val_len: u32]
[hdr_val: [u8; hdr_val_len]]
```
## Acceptance Criteria
- [ ] `encode_record(&Message) -> Vec<u8>` produces correct binary
- [ ] `decode_record(&[u8]) -> Result<(Message, usize)>` parses correctly (returns bytes consumed)
- [ ] Roundtrip: encode then decode, verify equality
- [ ] Corruption test: flip a byte, decode returns CRC error
- [ ] Edge cases: empty value, empty key, no headers, many headers, max-size value
- [ ] Uses `crc32fast` crate for CRC computation
## Notes
- Use little-endian byte order throughout
- The `length` field allows skipping records without full parsing
- CRC is over the bytes AFTER the CRC field itself

View File

@@ -0,0 +1,66 @@
# SQ-003: Simulation I/O Traits
**Status:** `[ ] TODO`
**Blocked by:** SQ-000
**Priority:** High
## Description
Define the trait abstractions for Clock and FileSystem that allow swapping real I/O for deterministic simulated I/O. This is the foundation of TigerBeetle-style testing.
## Files to Create/Modify
- `crates/sq-sim/src/lib.rs` - re-exports
- `crates/sq-sim/src/clock.rs` - Clock trait + RealClock + SimClock
- `crates/sq-sim/src/fs.rs` - FileSystem trait + FileHandle trait + RealFileSystem + InMemoryFileSystem
## Key Traits
```rust
pub trait Clock: Send + Sync {
fn now(&self) -> std::time::Instant;
async fn sleep(&self, duration: Duration);
}
pub trait FileSystem: Send + Sync {
async fn create_dir_all(&self, path: &Path) -> Result<()>;
async fn open_read(&self, path: &Path) -> Result<Box<dyn FileHandle>>;
async fn open_write(&self, path: &Path) -> Result<Box<dyn FileHandle>>;
async fn open_append(&self, path: &Path) -> Result<Box<dyn FileHandle>>;
async fn remove_file(&self, path: &Path) -> Result<()>;
async fn list_dir(&self, path: &Path) -> Result<Vec<PathBuf>>;
async fn exists(&self, path: &Path) -> bool;
}
pub trait FileHandle: Send + Sync {
async fn write_all(&mut self, buf: &[u8]) -> Result<usize>;
async fn read_exact(&mut self, buf: &mut [u8]) -> Result<usize>;
async fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize>;
async fn fsync(&mut self) -> Result<()>;
fn position(&self) -> u64;
async fn seek(&mut self, pos: u64) -> Result<()>;
}
```
## Fault Injection (InMemoryFileSystem)
```rust
impl InMemoryFileSystem {
pub fn fail_next_fsync(&self, error: io::Error);
pub fn simulate_disk_full(&self);
pub fn corrupt_bytes(&self, path: &Path, offset: u64, len: usize);
pub fn clear_faults(&self);
}
```
## Acceptance Criteria
- [ ] InMemoryFileSystem: write, read back, verify content
- [ ] InMemoryFileSystem: create_dir_all, list_dir
- [ ] InMemoryFileSystem: fsync succeeds normally
- [ ] InMemoryFileSystem: fail_next_fsync causes next fsync to error
- [ ] InMemoryFileSystem: simulate_disk_full causes writes to fail
- [ ] SimClock: starts at time 0, advance(Duration) changes now()
- [ ] SimClock: sleep returns immediately when time is advanced
- [ ] RealClock: delegates to std::time
- [ ] RealFileSystem: delegates to tokio::fs (basic smoke test)

View File

@@ -0,0 +1,63 @@
# SQ-004: WAL Segment Writer
**Status:** `[ ] TODO`
**Blocked by:** SQ-002, SQ-003
**Priority:** High
## Description
Implement the WAL segment writer that appends records to segment files with fsync for durability. Handles segment rotation when size or time thresholds are exceeded.
## Files to Create/Modify
- `crates/sq-storage/src/wal/writer.rs` - WalWriter with append + fsync + rotation
- `crates/sq-storage/src/wal/segment.rs` - segment header encoding/decoding
## Segment Header Format (32 bytes)
```
[magic: [u8; 4]] = b"SQWL"
[version: u16] = 1
[topic_len: u16]
[topic: [u8; 20]] (padded/truncated)
[partition: u32]
```
## WalWriter API
```rust
pub struct WalWriter<F: FileSystem> {
fs: Arc<F>,
config: WalConfig,
topic: TopicName,
partition: u32,
active_segment: Option<Box<dyn FileHandle>>,
segment_base_offset: u64,
segment_position: u64,
next_offset: u64,
segment_opened_at: Instant,
}
impl<F: FileSystem> WalWriter<F> {
pub async fn new(fs: Arc<F>, config: WalConfig, topic: TopicName, partition: u32) -> Result<Self>;
pub async fn append(&mut self, key: Option<&[u8]>, value: &[u8], headers: &[Header], timestamp_ms: u64) -> Result<u64>; // returns offset
pub async fn close_active_segment(&mut self) -> Result<Option<ClosedSegment>>;
pub fn next_offset(&self) -> u64;
}
```
## Acceptance Criteria (using InMemoryFileSystem)
- [ ] Write 1 message, verify segment file exists with correct header + record
- [ ] Write 100 messages, verify all offsets are monotonically increasing (0, 1, 2, ...)
- [ ] Segment rotation: write until size > max_segment_bytes, verify new segment created
- [ ] Segment rotation: advance clock past max_segment_age, verify rotation on next write
- [ ] fsync failure: set fault on InMemoryFS, verify append() returns error
- [ ] fsync failure: offset is NOT advanced (can retry the write)
- [ ] Segment directory structure: `{data_dir}/{topic}/{partition}/{base_offset}.wal`
## Notes
- sq-storage depends on sq-sim for the FileSystem trait
- Writer must call fsync after every append (or batch of appends)
- `ClosedSegment` contains the path and offset range of the completed segment

View File

@@ -0,0 +1,37 @@
# SQ-005: WAL Segment Reader
**Status:** `[ ] TODO`
**Blocked by:** SQ-004
**Priority:** High
## Description
Implement the WAL segment reader that reads messages from segment files, supporting seek-to-offset and forward scanning.
## Files to Create/Modify
- `crates/sq-storage/src/wal/reader.rs` - WalReader with open, read_from, iterator
## WalReader API
```rust
pub struct WalReader<F: FileSystem> {
fs: Arc<F>,
}
impl<F: FileSystem> WalReader<F> {
pub fn new(fs: Arc<F>) -> Self;
pub async fn read_segment(&self, path: &Path) -> Result<Vec<Message>>;
pub async fn read_from_offset(&self, path: &Path, offset: u64) -> Result<Vec<Message>>;
pub async fn read_segment_header(&self, path: &Path) -> Result<SegmentHeader>;
}
```
## Acceptance Criteria (using InMemoryFileSystem)
- [ ] Write N messages with writer, read all back with reader, verify equality
- [ ] Read from a specific offset in the middle of a segment
- [ ] Corrupted record mid-segment: reader returns error for that record, can report partial results
- [ ] Empty segment (header only): reader yields zero messages
- [ ] Invalid magic bytes: reader returns descriptive error
- [ ] Truncated record at end of segment (partial write): reader stops cleanly at last complete record

View File

@@ -0,0 +1,44 @@
# SQ-006: Sparse Offset Index
**Status:** `[ ] TODO`
**Blocked by:** SQ-005
**Priority:** Medium
## Description
In-memory sparse offset index that maps offsets to segment file locations for fast consumer seeks.
## Files to Create/Modify
- `crates/sq-storage/src/index.rs` - OffsetIndex
## OffsetIndex API
```rust
pub struct OffsetIndex {
// Per topic-partition: sorted vec of (offset, segment_path, byte_position)
entries: BTreeMap<(TopicName, u32), Vec<IndexEntry>>,
sample_interval: u64, // e.g. every 1000th offset
}
pub struct IndexEntry {
pub offset: u64,
pub segment_path: PathBuf,
pub byte_position: u64,
}
impl OffsetIndex {
pub fn new(sample_interval: u64) -> Self;
pub fn add_entry(&mut self, topic: &TopicName, partition: u32, entry: IndexEntry);
pub fn lookup(&self, topic: &TopicName, partition: u32, offset: u64) -> Option<&IndexEntry>;
pub fn build_from_segments<F: FileSystem>(fs: &F, segments: &[PathBuf]) -> Result<Self>;
}
```
## Acceptance Criteria
- [ ] Build index from a set of written segments, look up first offset -> correct segment
- [ ] Look up offset in the middle -> returns nearest lower indexed entry
- [ ] Look up offset beyond all segments -> returns None
- [ ] Multiple topic-partitions are isolated
- [ ] Sample interval works: only every Nth offset is indexed

View File

@@ -0,0 +1,42 @@
# SQ-007: Storage Engine Facade
**Status:** `[ ] TODO`
**Blocked by:** SQ-006
**Priority:** High
## Description
Unified read/write interface wrapping WAL writer + reader + offset index. This is the single entry point for all storage operations used by the server.
## Files to Create/Modify
- `crates/sq-storage/src/engine.rs` - StorageEngine
## StorageEngine API
```rust
pub struct StorageEngine<F: FileSystem, C: Clock> {
fs: Arc<F>,
clock: Arc<C>,
config: WalConfig,
writers: HashMap<(String, u32), WalWriter<F>>,
index: OffsetIndex,
}
impl<F: FileSystem, C: Clock> StorageEngine<F, C> {
pub async fn new(fs: Arc<F>, clock: Arc<C>, config: WalConfig) -> Result<Self>;
pub async fn append(&mut self, topic: &str, partition: u32, key: Option<&[u8]>, value: &[u8], headers: &[Header]) -> Result<u64>;
pub async fn read(&self, topic: &str, partition: u32, from_offset: u64, limit: usize) -> Result<Vec<Message>>;
pub async fn recover(&mut self) -> Result<()>; // Rebuild state from existing WAL files on startup
pub fn closed_segments(&self) -> Vec<ClosedSegment>; // For S3 shipper
}
```
## Acceptance Criteria
- [ ] Write 1000 messages, read from offset 0, verify all present
- [ ] Write, read from offset 500, verify correct slice returned
- [ ] Read with limit, verify at most N messages returned
- [ ] Write to multiple topics/partitions, verify isolation (no cross-contamination)
- [ ] Recovery: write messages, drop engine, create new engine, call recover(), read all messages back
- [ ] Segment rotation happens transparently during append

View File

@@ -0,0 +1,42 @@
# SQ-008: Protobuf API Definitions
**Status:** `[ ] TODO`
**Blocked by:** SQ-000
**Priority:** High
## Description
Define all protobuf service definitions and generate the Rust gRPC code via buf.
## Files to Create/Modify
- `interface/proto/sq/v1/data_plane.proto` - Publish, Subscribe, Ack RPCs
- `interface/proto/sq/v1/control_plane.proto` - CreateTopic, DeleteTopic, ListTopics, DescribeTopic, CreateConsumerGroup
- `interface/proto/sq/v1/health.proto` - Status service (may already exist from Phase 0)
- `interface/proto/sq/v1/cluster.proto` - ReplicateEntries, Join, Heartbeat, FetchSegment (internal)
- `crates/sq-grpc-interface/src/lib.rs` - updated module re-exports
- Regenerated code in `crates/sq-grpc-interface/src/grpc/`
## Key Service Definitions
### DataPlaneService
- `Publish(PublishRequest) -> PublishResponse`
- `Subscribe(SubscribeRequest) -> stream SubscribeResponse`
- `Ack(AckRequest) -> AckResponse`
### ControlPlaneService
- `CreateTopic`, `DeleteTopic`, `ListTopics`, `DescribeTopic`, `CreateConsumerGroup`
### ClusterService (internal)
- `ReplicateEntries`, `Join`, `Heartbeat`, `FetchSegment` (streaming)
### StatusService
- `Status(GetStatusRequest) -> GetStatusResponse`
## Acceptance Criteria
- [ ] `buf lint` passes on all proto files
- [ ] `buf generate` produces code in sq-grpc-interface
- [ ] `cargo check -p sq-grpc-interface` passes
- [ ] All service traits are generated (DataPlaneService, ControlPlaneService, ClusterService, StatusService)
- [ ] AckMode enum has ALL, LOCAL, NONE variants

View File

@@ -0,0 +1,43 @@
# SQ-009: Server Skeleton
**Status:** `[ ] TODO`
**Blocked by:** SQ-008
**Priority:** High
## Description
Running `sq-server` binary with CLI, notmad lifecycle management, health gRPC endpoint, and Axum HTTP health endpoint. Follows the fungus-server pattern exactly.
## Files to Create/Modify
- `crates/sq-server/src/main.rs` - entry point with logging setup
- `crates/sq-server/src/cli.rs` - clap Command enum with Serve subcommand
- `crates/sq-server/src/cli/serve.rs` - ServeCommand with notmad builder
- `crates/sq-server/src/state.rs` - State struct with Config
- `crates/sq-server/src/grpc/mod.rs` - GrpcServer as notmad::Component
- `crates/sq-server/src/grpc/health.rs` - StatusService impl
- `crates/sq-server/src/grpc/error.rs` - gRPC error mapping
- `crates/sq-server/src/servehttp.rs` - Axum health routes
## Configuration
```
SQ_HOST=127.0.0.1:6060 # gRPC listen address
SQ_HTTP_HOST=127.0.0.1:6062 # HTTP listen address
SQ_DATA_DIR=./data # WAL storage directory
SQ_NODE_ID=node-1 # Unique node identifier
LOG_LEVEL=pretty # pretty|json|short
```
## Acceptance Criteria
- [ ] `cargo run -p sq-server -- serve` starts and listens on configured ports
- [ ] gRPC Status RPC returns node_id
- [ ] HTTP GET / returns 200 with health message
- [ ] Graceful shutdown on SIGINT via notmad cancellation token
- [ ] Logging setup matches forest/fungus pattern (pretty/json/short)
## Reference Files
- `/home/kjuulh/git/src.rawpotion.io/rawpotion/fungus/crates/fungus-server/src/cli/serve.rs`
- `/home/kjuulh/git/src.rawpotion.io/rawpotion/fungus/crates/fungus-server/src/state.rs`

View File

@@ -0,0 +1,32 @@
# SQ-010: Publish Endpoint (Single Node)
**Status:** `[ ] TODO`
**Blocked by:** SQ-007, SQ-009
**Priority:** High
## Description
Implement the Publish gRPC RPC. Clients send messages which are durably written to the local WAL via StorageEngine.
## Files to Create/Modify
- `crates/sq-server/src/grpc/data_plane.rs` - DataPlaneService Publish impl
- `crates/sq-server/src/state.rs` - add StorageEngine to State
## Behavior
1. Receive PublishRequest with batch of messages
2. For each message: call StorageEngine::append
3. Ack mode handling:
- ACK_MODE_ALL: return after fsync (default single-node behavior)
- ACK_MODE_LOCAL: same as ALL for single node
- ACK_MODE_NONE: return immediately, write async
4. Return PublishResponse with offset assignments
## Acceptance Criteria
- [ ] Publish 1 message, verify WAL file exists and contains the message
- [ ] Publish batch of 100 messages, verify all acked with sequential offsets
- [ ] ACK_MODE_NONE: response is immediate (no waiting for fsync)
- [ ] Invalid request (empty topic): returns InvalidArgument gRPC status
- [ ] Auto-create topic on first publish (if topic doesn't exist)

View File

@@ -0,0 +1,29 @@
# SQ-011: Subscribe Endpoint (Single Node)
**Status:** `[ ] TODO`
**Blocked by:** SQ-010
**Priority:** High
## Description
Implement the Subscribe gRPC RPC with server-streaming. Server pushes messages to the client as they become available.
## Files to Create/Modify
- `crates/sq-server/src/grpc/data_plane.rs` - DataPlaneService Subscribe impl
## Behavior
1. Client sends SubscribeRequest with topic, partition, optional start_offset
2. Server reads from StorageEngine starting at the given offset
3. Server streams batches of ConsumedMessage to the client
4. When caught up, server polls for new messages at a configurable interval (e.g. 100ms)
5. Stream continues until client disconnects or cancellation
## Acceptance Criteria
- [ ] Publish 10 messages, then subscribe from offset 0, receive all 10
- [ ] Subscribe from offset 5, receive messages 5-9 only
- [ ] Subscribe to empty topic from offset 0, then publish, receive new messages
- [ ] Client disconnect: server-side stream cleans up without error
- [ ] Subscribe to nonexistent topic: returns NotFound gRPC status

View File

@@ -0,0 +1,40 @@
# SQ-012: Consumer Groups & Offset Tracking
**Status:** `[ ] TODO`
**Blocked by:** SQ-011
**Priority:** Medium
## Description
Consumer group offset management: store committed offsets, use them as default start position for Subscribe, and implement the Ack RPC.
## Files to Create/Modify
- `crates/sq-storage/src/consumer_offsets.rs` - in-memory map with file persistence
- `crates/sq-server/src/grpc/data_plane.rs` - Ack RPC impl; update Subscribe to use committed offset
- `crates/sq-storage/src/engine.rs` - add consumer offset methods
## ConsumerOffsets API
```rust
pub struct ConsumerOffsets {
offsets: HashMap<(String, String, u32), u64>, // (group, topic, partition) -> offset
persist_path: PathBuf,
}
impl ConsumerOffsets {
pub fn commit(&mut self, group: &str, topic: &str, partition: u32, offset: u64) -> Result<()>;
pub fn get_committed(&self, group: &str, topic: &str, partition: u32) -> Option<u64>;
pub async fn persist(&self) -> Result<()>;
pub async fn load(path: &Path) -> Result<Self>;
}
```
## Acceptance Criteria
- [ ] Commit offset, query it back, verify correct
- [ ] Commit offset, persist to file, load from file, verify preserved
- [ ] Ack RPC: commit offset via gRPC, verify stored
- [ ] Subscribe without start_offset uses committed offset for the consumer group
- [ ] Subscribe with explicit start_offset overrides committed offset
- [ ] Two consumers in same group: both see same committed offset

View File

@@ -0,0 +1,25 @@
# SQ-013: Control Plane - Topic Management
**Status:** `[ ] TODO`
**Blocked by:** SQ-012
**Priority:** Medium
## Description
Implement the ControlPlane gRPC service for topic CRUD operations.
## Files to Create/Modify
- `crates/sq-server/src/grpc/control_plane.rs` - ControlPlaneService impl
- `crates/sq-storage/src/topic_metadata.rs` - topic registry (file-backed)
- `crates/sq-server/src/grpc/mod.rs` - register ControlPlane service
## Acceptance Criteria
- [ ] CreateTopic: creates topic with specified partitions and replication factor
- [ ] CreateTopic: duplicate name returns AlreadyExists
- [ ] ListTopics: returns all created topics
- [ ] DescribeTopic: returns partition info with earliest/latest offsets
- [ ] DeleteTopic: removes topic from registry
- [ ] Publish to deleted topic: returns NotFound
- [ ] Topic metadata persists across server restarts

View File

@@ -0,0 +1,42 @@
# SQ-014: SDK Producer
**Status:** `[ ] TODO`
**Blocked by:** SQ-010
**Priority:** Medium
## Description
Ergonomic Rust producer client in sq-sdk. Handles connection management, batching, and retry logic.
## Files to Create/Modify
- `crates/sq-sdk/src/lib.rs` - re-exports
- `crates/sq-sdk/src/connection.rs` - gRPC channel management
- `crates/sq-sdk/src/producer.rs` - Producer with batching, linger timer, retry
- `crates/sq-sdk/src/error.rs` - SqError type
## Producer API
```rust
pub struct ProducerConfig {
pub server_addresses: Vec<String>,
pub default_ack_mode: AckMode,
pub max_retries: u32,
pub retry_backoff_ms: u64,
}
pub struct Producer { /* ... */ }
impl Producer {
pub async fn connect(config: ProducerConfig) -> Result<Self, SqError>;
pub async fn send(&self, topic: &str, key: Option<&[u8]>, value: &[u8]) -> Result<SendResult, SqError>;
pub async fn send_batch(&self, messages: Vec<ProducerMessage>) -> Result<Vec<SendResult>, SqError>;
}
```
## Acceptance Criteria
- [ ] Producer connects to running server, sends message, gets offset back
- [ ] Send batch: all messages get sequential offsets
- [ ] Connection failure: returns appropriate error
- [ ] Multiple server addresses: round-robin or failover

View File

@@ -0,0 +1,42 @@
# SQ-015: SDK Consumer
**Status:** `[ ] TODO`
**Blocked by:** SQ-014, SQ-012
**Priority:** Medium
## Description
Ergonomic Rust consumer client in sq-sdk. Wraps the server-streaming Subscribe RPC with poll-based interface and auto-commit support.
## Files to Create/Modify
- `crates/sq-sdk/src/consumer.rs` - Consumer with poll loop and auto-commit
## Consumer API
```rust
pub struct ConsumerConfig {
pub server_addresses: Vec<String>,
pub consumer_group: String,
pub topics: Vec<String>,
pub auto_commit: bool,
pub auto_commit_interval_ms: u64,
pub max_poll_records: u32,
}
pub struct Consumer { /* ... */ }
impl Consumer {
pub async fn connect(config: ConsumerConfig) -> Result<Self, SqError>;
pub async fn poll(&mut self) -> Result<Vec<ConsumedMessage>, SqError>;
pub async fn commit(&self, topic: &str, partition: u32, offset: u64) -> Result<(), SqError>;
}
```
## Acceptance Criteria
- [ ] End-to-end: produce 100 messages with Producer, consume all with Consumer
- [ ] Auto-commit: consumed offsets are committed after interval
- [ ] Manual commit: explicit commit stores offset
- [ ] Poll returns empty vec when no new messages (non-blocking)
- [ ] Consumer group: two consumers resume from committed offset

View File

@@ -0,0 +1,45 @@
# SQ-016: Object Store Shipping
**Status:** `[ ] TODO`
**Blocked by:** SQ-007
**Priority:** Medium
## Description
Background process that ships closed WAL segments to S3-compatible object storage for long-term durability.
## Files to Create/Modify
- `crates/sq-storage/src/object_store/mod.rs` - ObjectStore trait + S3 impl + Noop impl
- `crates/sq-storage/src/object_store/shipper.rs` - SegmentShipper as notmad::Component
- `crates/sq-storage/src/object_store/layout.rs` - S3 key naming convention
## ObjectStore Trait
```rust
pub trait ObjectStore: Send + Sync {
async fn put(&self, key: &str, data: Vec<u8>) -> Result<()>;
async fn get(&self, key: &str) -> Result<Vec<u8>>;
async fn list(&self, prefix: &str) -> Result<Vec<String>>;
async fn delete(&self, key: &str) -> Result<()>;
}
```
## S3 Key Layout
`{cluster_id}/{topic}/{partition}/{base_offset}-{end_offset}.sqseg`
## Acceptance Criteria
- [ ] Closed segment is detected and uploaded to object store
- [ ] S3 key matches expected layout
- [ ] Noop object store works for testing (stores in memory)
- [ ] Upload failure: segment stays local, retried on next cycle
- [ ] Successful upload is recorded (segment marked as "shipped")
- [ ] Uses zstd compression before upload
## Notes
- Uses `object_store` crate with AWS S3 features (same as nostore)
- Shipper runs as a notmad::Component in the background
- Poll interval: every 5 seconds check for closed segments

View File

@@ -0,0 +1,27 @@
# SQ-017: WAL Trimming
**Status:** `[ ] TODO`
**Blocked by:** SQ-016
**Priority:** Medium
## Description
Garbage collect local WAL segments after they have been confirmed in object storage.
## Files to Create/Modify
- `crates/sq-storage/src/wal/trimmer.rs` - WalTrimmer
## Behavior
1. Periodically scan for segments marked as "shipped"
2. Verify the segment exists in object storage (optional double-check)
3. Delete the local WAL segment file
4. Update the offset index to point to S3 location instead
## Acceptance Criteria
- [ ] Segment marked as shipped -> trimmer deletes local file
- [ ] Segment NOT marked as shipped -> trimmer leaves it
- [ ] After trimming, index entries point to S3 location
- [ ] Trimmer respects a minimum retention period (keep recent segments locally even if shipped)

View File

@@ -0,0 +1,29 @@
# SQ-018: S3 Read Fallback
**Status:** `[ ] TODO`
**Blocked by:** SQ-017
**Priority:** Medium
## Description
When a consumer requests an offset from a trimmed segment, fetch it from S3 instead.
## Files to Create/Modify
- `crates/sq-storage/src/engine.rs` - update read path with S3 fallback
- `crates/sq-storage/src/object_store/reader.rs` - download + decompress segment from S3
## Behavior
1. Read path checks local WAL first
2. If segment not found locally, check if it's in S3 via the index
3. Download segment from S3, decompress (zstd)
4. Read messages from the downloaded segment
5. Optionally cache downloaded segment locally for subsequent reads
## Acceptance Criteria
- [ ] Write messages, ship to S3, trim locally, read from S3 -> messages correct
- [ ] CRC validation on S3-fetched data passes
- [ ] S3 fetch failure: returns appropriate error
- [ ] Performance: subsequent reads of same trimmed segment use local cache

View File

@@ -0,0 +1,45 @@
# SQ-019: Virtual Network for Simulation
**Status:** `[ ] TODO`
**Blocked by:** SQ-003
**Priority:** Medium
## Description
Virtual network layer for multi-node simulation testing. Enables partition, latency, and packet drop injection.
## Files to Create/Modify
- `crates/sq-sim/src/network.rs` - VirtualNetwork with message queues and fault injection
## VirtualNetwork API
```rust
pub struct VirtualNetwork {
queues: HashMap<(NodeId, NodeId), VecDeque<Vec<u8>>>,
partitions: HashSet<(NodeId, NodeId)>,
latency: Option<(Duration, Duration)>, // min, max
drop_probability: f64,
}
impl VirtualNetwork {
pub fn new() -> Self;
pub fn partition(&mut self, a: NodeId, b: NodeId);
pub fn heal(&mut self, a: NodeId, b: NodeId);
pub fn heal_all(&mut self);
pub fn set_latency(&mut self, min: Duration, max: Duration);
pub fn set_drop_probability(&mut self, prob: f64);
pub async fn send(&self, from: NodeId, to: NodeId, msg: Vec<u8>) -> Result<()>;
pub async fn recv(&self, node: NodeId) -> Result<(NodeId, Vec<u8>)>;
pub fn deliver_pending(&mut self); // Process queued messages
}
```
## Acceptance Criteria
- [ ] Two virtual nodes exchange messages successfully
- [ ] Partition: messages from A to B are dropped
- [ ] Heal: messages resume flowing after heal
- [ ] Latency injection: messages are delayed
- [ ] Drop probability: some messages are randomly dropped
- [ ] Bidirectional partition: neither direction works

View File

@@ -0,0 +1,42 @@
# SQ-020: Cluster Membership (Gossip)
**Status:** `[ ] TODO`
**Blocked by:** SQ-009, SQ-019
**Priority:** Medium
## Description
Nodes discover each other via seed list and maintain a membership list through periodic heartbeats.
## Files to Create/Modify
- `crates/sq-cluster/src/lib.rs` - module exports
- `crates/sq-cluster/src/membership.rs` - seed list, join, heartbeat, failure detection
- `crates/sq-server/src/grpc/cluster.rs` - ClusterService Join/Heartbeat RPC impl
- `crates/sq-server/src/cli/serve.rs` - add --seeds CLI flag
## Configuration
```
SQ_SEEDS=node1:6060,node2:6060 # Seed node addresses
SQ_NODE_ID=node-1 # Unique node ID
SQ_HEARTBEAT_INTERVAL_MS=5000 # Heartbeat every 5s
SQ_FAILURE_THRESHOLD=3 # Missed heartbeats before suspected
```
## Membership State Machine
```
Unknown -> Alive (on Join response or Heartbeat)
Alive -> Suspected (missed 3 heartbeats)
Suspected -> Dead (suspected for 30 seconds)
Dead -> Alive (on successful re-Join)
```
## Acceptance Criteria
- [ ] Start 3 nodes with seed list, all discover each other
- [ ] Status RPC shows all 3 nodes as "alive"
- [ ] Stop one node, others detect it as "suspected" then "dead"
- [ ] Restart dead node, it re-joins and becomes "alive"
- [ ] Node with no seeds starts as single-node cluster

View File

@@ -0,0 +1,33 @@
# SQ-021: Write Replication
**Status:** `[ ] TODO`
**Blocked by:** SQ-020, SQ-010
**Priority:** High
## Description
Writes are replicated to N peers before ack to client. Simple quorum approach: coordinator writes locally, sends to peers, waits for majority ack.
## Files to Create/Modify
- `crates/sq-cluster/src/replication.rs` - Replicator with quorum logic
- `crates/sq-server/src/grpc/cluster.rs` - ReplicateEntries RPC impl
- `crates/sq-server/src/grpc/data_plane.rs` - update Publish to use Replicator
## Replication Flow
1. Coordinator receives Publish request
2. Coordinator writes to local WAL, assigns offset
3. Coordinator sends ReplicateEntries to all known alive peers
4. Coordinator waits for W acks (W = floor(N/2) + 1, where N = replication factor)
5. On quorum reached: ack to client
6. On quorum timeout: return error to client
## Acceptance Criteria
- [ ] 3-node cluster: publish message, verify all 3 nodes have it in WAL
- [ ] 3-node cluster, 1 node down: publish succeeds (2/3 quorum)
- [ ] 3-node cluster, 2 nodes down: publish fails (no quorum)
- [ ] ACK_MODE_LOCAL: ack after local WAL only (skip replication)
- [ ] ACK_MODE_NONE: return immediately, replicate async
- [ ] Replication timeout: configurable, default 5 seconds

View File

@@ -0,0 +1,49 @@
# SQ-022: Multi-Node Simulation Tests
**Status:** `[ ] TODO`
**Blocked by:** SQ-021, SQ-019
**Priority:** High
## Description
Full TigerBeetle-inspired simulation test suite. Spin up multiple nodes with virtual I/O, inject faults, verify invariants.
## Files to Create/Modify
- `crates/sq-sim/src/runtime.rs` - test harness for multi-node simulation
- `crates/sq-sim/tests/invariants.rs` - invariant checker functions
- `crates/sq-sim/tests/scenarios/mod.rs`
- `crates/sq-sim/tests/scenarios/single_node.rs` - S01-S04
- `crates/sq-sim/tests/scenarios/multi_node.rs` - S05-S08
- `crates/sq-sim/tests/scenarios/failures.rs` - S09-S12
## Scenarios
- **S01:** Single node, single producer, single consumer - baseline
- **S02:** Single node, concurrent producers - offset ordering
- **S03:** Single node, disk full during write - graceful error
- **S04:** Single node, crash and restart - WAL recovery
- **S05:** Three nodes, normal operation - replication works
- **S06:** Three nodes, one crashes - remaining two continue
- **S07:** Three nodes, network partition (2+1) - majority continues
- **S08:** Three nodes, S3 outage - local WAL accumulates
- **S09:** Consumer group, offset preservation
- **S10:** High throughput burst - no message loss
- **S11:** Slow consumer with WAL trimming - falls back to S3
- **S12:** Node rejoins after long absence - catches up
## Invariants (checked after every step)
1. No acked message is ever lost
2. Offsets strictly monotonic, no gaps
3. CRC integrity on all reads
4. Consumer group offsets never regress
5. After network heal, replicas converge
6. WAL never trimmed before S3 confirmation
## Acceptance Criteria
- [ ] All 12 scenarios pass
- [ ] Each scenario runs with multiple random seeds (at least 10)
- [ ] Invariant violations produce clear diagnostic output
- [ ] Tests complete in < 60 seconds total

View File

@@ -0,0 +1,31 @@
# SQ-023: Node Recovery / Catch-Up
**Status:** `[ ] TODO`
**Blocked by:** SQ-021, SQ-018
**Priority:** Medium
## Description
A node that was offline catches up from peers or S3 when it rejoins.
## Files to Create/Modify
- `crates/sq-cluster/src/recovery.rs` - on-join catch-up logic
- `crates/sq-server/src/grpc/cluster.rs` - FetchSegment RPC impl
- `crates/sq-cluster/src/replication.rs` - integrate recovery into join flow
## Recovery Flow
1. Rejoining node contacts peers via seed list
2. For each topic-partition, compare local latest offset with peers
3. If peer has newer data: fetch missing segments via FetchSegment RPC
4. If peer has also trimmed: fetch from S3
5. Replay fetched segments into local WAL
6. Mark node as "caught up" and start accepting writes
## Acceptance Criteria
- [ ] Node joins late: fetches missing data from peer, all messages readable
- [ ] Node catches up from S3 when peer has trimmed that segment
- [ ] Recovery doesn't block existing cluster operations
- [ ] Recovery progress is logged

View File

@@ -0,0 +1,32 @@
# SQ-024: Docker Compose & E2E Example
**Status:** `[ ] TODO`
**Blocked by:** SQ-023
**Priority:** Low
## Description
Docker Compose setup for running a 3-node SQ cluster with MinIO, plus an example publish/subscribe program.
## Files to Create/Modify
- `templates/docker-compose.yaml` - 3 sq-server instances + MinIO
- `templates/sq-server.Dockerfile` - multi-stage build
- `examples/publish_subscribe/Cargo.toml`
- `examples/publish_subscribe/src/main.rs`
- `scripts/grpc.sh` - grpcurl testing helper
## Docker Compose Services
- `minio` - S3-compatible object storage
- `sq-1` - SQ node 1 (seeds: sq-2, sq-3)
- `sq-2` - SQ node 2 (seeds: sq-1, sq-3)
- `sq-3` - SQ node 3 (seeds: sq-1, sq-2)
## Acceptance Criteria
- [ ] `docker compose up` starts all 4 services
- [ ] All 3 SQ nodes discover each other (verify via Status RPC)
- [ ] Example program publishes and consumes messages successfully
- [ ] Kill one container, cluster continues operating
- [ ] Restart container, node catches up

View File

@@ -0,0 +1,29 @@
# SQ-025: Compression & Performance Tuning
**Status:** `[ ] TODO`
**Blocked by:** SQ-024
**Priority:** Low
## Description
Add zstd compression to S3 segment shipping and create a benchmark suite.
## Files to Create/Modify
- `crates/sq-storage/src/object_store/shipper.rs` - add zstd compression
- `crates/sq-storage/src/object_store/reader.rs` - add zstd decompression
- `crates/sq-storage/benches/` - throughput benchmarks
## Benchmarks
- Write throughput: messages/sec at various payload sizes
- Read throughput: messages/sec sequential scan
- Compression ratio: raw vs compressed segment size
- S3 round-trip: write, ship, trim, read from S3
## Acceptance Criteria
- [ ] Compressed segments round-trip correctly (write -> compress -> upload -> download -> decompress -> read)
- [ ] Compression ratio metrics are logged
- [ ] Benchmarks produce readable output
- [ ] No correctness regressions (all existing tests still pass)