41 lines
1.4 KiB
Markdown
41 lines
1.4 KiB
Markdown
# SQ-012: Consumer Groups & Offset Tracking
|
|
|
|
**Status:** `[x] DONE`
|
|
**Blocked by:** SQ-011
|
|
**Priority:** Medium
|
|
|
|
## Description
|
|
|
|
Consumer group offset management: store committed offsets, use them as default start position for Subscribe, and implement the Ack RPC.
|
|
|
|
## Files to Create/Modify
|
|
|
|
- `crates/sq-storage/src/consumer_offsets.rs` - in-memory map with file persistence
|
|
- `crates/sq-server/src/grpc/data_plane.rs` - Ack RPC impl; update Subscribe to use committed offset
|
|
- `crates/sq-storage/src/engine.rs` - add consumer offset methods
|
|
|
|
## ConsumerOffsets API
|
|
|
|
```rust
|
|
pub struct ConsumerOffsets {
|
|
offsets: HashMap<(String, String, u32), u64>, // (group, topic, partition) -> offset
|
|
persist_path: PathBuf,
|
|
}
|
|
|
|
impl ConsumerOffsets {
|
|
pub fn commit(&mut self, group: &str, topic: &str, partition: u32, offset: u64) -> Result<()>;
|
|
pub fn get_committed(&self, group: &str, topic: &str, partition: u32) -> Option<u64>;
|
|
pub async fn persist(&self) -> Result<()>;
|
|
pub async fn load(path: &Path) -> Result<Self>;
|
|
}
|
|
```
|
|
|
|
## Acceptance Criteria
|
|
|
|
- [ ] Commit offset, query it back, verify correct
|
|
- [ ] Commit offset, persist to file, load from file, verify preserved
|
|
- [ ] Ack RPC: commit offset via gRPC, verify stored
|
|
- [ ] Subscribe without start_offset uses committed offset for the consumer group
|
|
- [ ] Subscribe with explicit start_offset overrides committed offset
|
|
- [ ] Two consumers in same group: both see same committed offset
|