1.4 KiB
1.4 KiB
SQ-012: Consumer Groups & Offset Tracking
Status: [x] DONE
Blocked by: SQ-011
Priority: Medium
Description
Consumer group offset management: store committed offsets, use them as default start position for Subscribe, and implement the Ack RPC.
Files to Create/Modify
crates/sq-storage/src/consumer_offsets.rs- in-memory map with file persistencecrates/sq-server/src/grpc/data_plane.rs- Ack RPC impl; update Subscribe to use committed offsetcrates/sq-storage/src/engine.rs- add consumer offset methods
ConsumerOffsets API
pub struct ConsumerOffsets {
offsets: HashMap<(String, String, u32), u64>, // (group, topic, partition) -> offset
persist_path: PathBuf,
}
impl ConsumerOffsets {
pub fn commit(&mut self, group: &str, topic: &str, partition: u32, offset: u64) -> Result<()>;
pub fn get_committed(&self, group: &str, topic: &str, partition: u32) -> Option<u64>;
pub async fn persist(&self) -> Result<()>;
pub async fn load(path: &Path) -> Result<Self>;
}
Acceptance Criteria
- Commit offset, query it back, verify correct
- Commit offset, persist to file, load from file, verify preserved
- Ack RPC: commit offset via gRPC, verify stored
- Subscribe without start_offset uses committed offset for the consumer group
- Subscribe with explicit start_offset overrides committed offset
- Two consumers in same group: both see same committed offset