feat: rename service

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
2026-02-27 13:41:04 +01:00
parent 749ae245c7
commit 444c3d760b
8 changed files with 1328 additions and 107 deletions

928
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
[workspace]
members = ["crates/*", "examples/*"]
members = ["crates/*", "examples/*", "ci"]
resolver = "2"
[workspace.package]

195
README.md Normal file
View File

@@ -0,0 +1,195 @@
# SQ (SeedQueue)
A fast, durable message queue. Written in Rust.
- **Custom WAL** with configurable fsync for durability
- **Cap'n Proto data plane** for high-throughput publish/subscribe (~2M msg/s)
- **gRPC control plane** for topic management, cluster ops, and health checks
- **Simple quorum clustering** with gossip-based membership
- **S3 tiered storage** for long-term segment offloading
- **OpenTelemetry** traces and metrics out of the box
## Quickstart
```bash
# Build
cargo build --release -p sq-server
# Run a single node
./target/release/sq-server serve
# Default ports:
# 6064 Cap'n Proto data plane (producer/consumer)
# 6060 gRPC control plane
# 6062 HTTP (health, metrics)
```
Or with [mise](https://mise.jdx.dev):
```bash
mise run develop # cargo run -p sq-server -- serve
```
## SDK
Add the dependency:
```toml
[dependencies]
sq-sdk = { path = "crates/sq-sdk" }
```
Publish:
```rust
use sq_sdk::{Producer, ProducerConfig, ProducerMessage};
let mut producer = Producer::connect(ProducerConfig::default()).await?;
producer.send("orders", None, b"hello").await?;
// Or batch:
let batch = vec![
ProducerMessage::new("orders", b"msg-1"),
ProducerMessage::new("orders", b"msg-2"),
];
producer.send_batch(batch).await?;
```
Subscribe:
```rust
use sq_sdk::{Consumer, ConsumerConfig};
let mut consumer = Consumer::connect(ConsumerConfig {
topic: "orders".into(),
consumer_group: "my-group".into(),
auto_commit: true,
..Default::default()
}).await?;
loop {
let messages = consumer.poll().await?;
for msg in &messages {
println!("[{}] offset={}", msg.topic, msg.offset);
}
}
```
A `BatchProducer` is available for high-throughput fire-and-forget patterns with automatic batching.
The gRPC transport is still accessible via `GrpcProducer`, `GrpcConsumer`, etc.
## Configuration
All flags can also be set via environment variables.
| Flag | Env | Default | Description |
|------|-----|---------|-------------|
| `--grpc-host` | `SQ_GRPC_HOST` | `127.0.0.1:6060` | gRPC listen address |
| `--capnp-host` | `SQ_CAPNP_HOST` | `127.0.0.1:6064` | Cap'n Proto listen address |
| `--http-host` | `SQ_HTTP_HOST` | `127.0.0.1:6062` | HTTP listen address |
| `--node-id` | `SQ_NODE_ID` | `node-1` | Unique node identifier |
| `--data-dir` | `SQ_DATA_DIR` | `./data` | WAL storage directory |
| `--seeds` | `SQ_SEEDS` | | Comma-separated seed node addresses |
| `--cluster-id` | `SQ_CLUSTER_ID` | `default` | Cluster identifier |
| `--sync-policy` | `SQ_SYNC_POLICY` | `every-batch` | `every-batch`, `none`, or interval in ms |
| `--s3-bucket` | `SQ_S3_BUCKET` | | S3 bucket for tiered storage |
| `--s3-endpoint` | `SQ_S3_ENDPOINT` | | S3 endpoint (for MinIO, etc.) |
| `--s3-region` | `SQ_S3_REGION` | | S3 region |
Observability is activated by setting `OTEL_EXPORTER_OTLP_ENDPOINT` (e.g., `http://jaeger:4317`).
## Running a cluster
```bash
# Node 1
sq-server --node-id node-1 --grpc-host 0.0.0.0:6060 --capnp-host 0.0.0.0:6064 serve
# Node 2
sq-server --node-id node-2 --grpc-host 0.0.0.0:6070 --capnp-host 0.0.0.0:6074 \
--seeds 10.0.0.1:6060 serve
# Node 3
sq-server --node-id node-3 --grpc-host 0.0.0.0:6080 --capnp-host 0.0.0.0:6084 \
--seeds 10.0.0.1:6060,10.0.0.2:6070 serve
```
## Docker Compose
A full stack is provided in `templates/docker-compose.yaml`:
```bash
docker compose -f templates/docker-compose.yaml up -d
```
This starts a 3-node SQ cluster with Jaeger (tracing), Prometheus (metrics), Grafana (dashboards), and MinIO (S3-compatible object storage).
## Development
Requires Rust 2024 edition (nightly or stable 1.85+).
```bash
mise run check # cargo check --workspace
mise run test # cargo nextest run --workspace
mise run clippy # cargo clippy --workspace
mise run build # cargo build --workspace
mise run local:up # start docker-compose services
mise run local:down # stop docker-compose services
mise run develop # run server in dev mode
```
Proto codegen (requires [buf](https://buf.build)):
```bash
mise run generate:proto
```
## CI
CI runs via [Dagger](https://dagger.io) (containerized pipelines, no YAML). Requires the Dagger CLI.
```bash
mise run ci:pr # PR pipeline: check + test + build
mise run ci:main # Main branch pipeline
```
The pipeline runs `cargo check`, `cargo test`, and builds a release Docker image — all inside containers with dependency caching.
## Project structure
```
crates/
sq-server Server binary + library (gRPC, Cap'n Proto, HTTP)
sq-sdk Client SDK (Producer, Consumer, BatchProducer)
sq-storage WAL, storage engine, S3 object store
sq-cluster Membership and replication
sq-grpc-interface Protobuf/tonic generated types
sq-capnp-interface Cap'n Proto schema and codec
sq-models Shared domain types
sq-sim Deterministic I/O simulation for testing
ci/ Dagger CI pipeline
examples/
publish_subscribe Example publisher and subscriber
templates/
docker-compose.yaml Full observability stack
sq-server.Dockerfile Multi-stage release build
```
## Testing
```bash
# Unit tests (~100 tests, instant)
cargo test --workspace --lib
# Integration tests (cluster, data plane)
cargo test -p sq-server --test cluster_test
cargo test -p sq-server --test data_plane_test
# Stress tests (100K+ messages, benchmarks)
cargo test -p sq-server --test stress_test -- --nocapture
cargo test -p sq-server --test capnp_stress_test -- --nocapture
# Storage benchmarks
cargo bench -p sq-storage
```

11
ci/Cargo.toml Normal file
View File

@@ -0,0 +1,11 @@
[package]
name = "ci"
version = "0.1.0"
edition = "2024"
publish = false
[dependencies]
dagger-sdk = "0.20"
eyre = "0.6"
tokio = { version = "1", features = ["full"] }
clap = { version = "4", features = ["derive"] }

274
ci/src/main.rs Normal file
View File

@@ -0,0 +1,274 @@
use std::path::PathBuf;
use clap::Parser;
const BIN_NAME: &str = "sq-server";
const MOLD_VERSION: &str = "2.40.4";
#[derive(Parser)]
#[command(name = "ci")]
enum Cli {
/// Run PR validation pipeline (check + test + build)
Pr,
/// Run main branch pipeline (check + test + build)
Main,
}
#[tokio::main]
async fn main() -> eyre::Result<()> {
let cli = Cli::parse();
dagger_sdk::connect(|client| async move {
match cli {
Cli::Pr => run_pr(&client).await?,
Cli::Main => run_main(&client).await?,
}
Ok(())
})
.await?;
Ok(())
}
async fn run_pr(client: &dagger_sdk::Query) -> eyre::Result<()> {
eprintln!("==> PR pipeline: check + test + build");
let base = build_base(client).await?;
eprintln!("--- cargo check --workspace");
base.clone()
.with_exec(vec!["cargo", "check", "--workspace"])
.sync()
.await?;
eprintln!("--- running tests");
run_tests(&base).await?;
eprintln!("--- building release image");
let _image = build_release_image(client, &base).await?;
eprintln!("==> PR pipeline complete");
Ok(())
}
async fn run_main(client: &dagger_sdk::Query) -> eyre::Result<()> {
eprintln!("==> Main pipeline: check + test + build");
let base = build_base(client).await?;
eprintln!("--- cargo check --workspace");
base.clone()
.with_exec(vec!["cargo", "check", "--workspace"])
.sync()
.await?;
eprintln!("--- running tests");
run_tests(&base).await?;
eprintln!("--- building release image");
let image = build_release_image(client, &base).await?;
eprintln!("--- publishing image");
publish_image(client, &image).await?;
eprintln!("==> Main pipeline complete");
Ok(())
}
/// Load source from host, excluding build artifacts.
fn load_source(client: &dagger_sdk::Query) -> eyre::Result<dagger_sdk::Directory> {
let src = client.host().directory_opts(
".",
dagger_sdk::HostDirectoryOptsBuilder::default()
.exclude(vec!["target/", ".git/", "node_modules/", ".cuddle/"])
.build()?,
);
Ok(src)
}
/// Load dependency-only source (Cargo.toml + Cargo.lock, no src/ or tests/).
fn load_dep_source(client: &dagger_sdk::Query) -> eyre::Result<dagger_sdk::Directory> {
let src = client.host().directory_opts(
".",
dagger_sdk::HostDirectoryOptsBuilder::default()
.exclude(vec![
"target/",
".git/",
"node_modules/",
".cuddle/",
"**/src",
"**/tests",
])
.build()?,
);
Ok(src)
}
/// Create skeleton source files so cargo can resolve deps without real source.
fn create_skeleton_files(client: &dagger_sdk::Query) -> eyre::Result<dagger_sdk::Directory> {
let main_content = r#"fn main() { panic!("skeleton"); }"#;
let lib_content = r#"pub fn _skeleton() {}"#;
let crate_paths = discover_crates()?;
let mut dir = client.directory();
for crate_path in &crate_paths {
let src_dir = crate_path.join("src");
dir = dir.with_new_file(
src_dir.join("main.rs").to_string_lossy().to_string(),
main_content,
);
dir = dir.with_new_file(
src_dir.join("lib.rs").to_string_lossy().to_string(),
lib_content,
);
}
// Also add skeleton for ci/ crate itself.
dir = dir.with_new_file("ci/src/main.rs".to_string(), main_content);
Ok(dir)
}
/// Discover workspace crate directories on the host.
fn discover_crates() -> eyre::Result<Vec<PathBuf>> {
let mut crate_paths = Vec::new();
let crates_dir = PathBuf::from("crates");
if crates_dir.is_dir() {
for entry in std::fs::read_dir(&crates_dir)? {
let entry = entry?;
if entry.file_type()?.is_dir() {
crate_paths.push(entry.path());
}
}
}
let examples_dir = PathBuf::from("examples");
if examples_dir.is_dir() {
for entry in std::fs::read_dir(&examples_dir)? {
let entry = entry?;
if entry.file_type()?.is_dir() {
crate_paths.push(entry.path());
}
}
}
Ok(crate_paths)
}
/// Build the base Rust container with all deps cached.
async fn build_base(client: &dagger_sdk::Query) -> eyre::Result<dagger_sdk::Container> {
let src = load_source(client)?;
let dep_src = load_dep_source(client)?;
let skeleton = create_skeleton_files(client)?;
let dep_src_with_skeleton = dep_src.with_directory(".", skeleton);
// Base rust image with build tools (needs capnp compiler for sq-capnp-interface).
let rust_base = client
.container()
.from("rust:1.93-trixie")
.with_exec(vec!["apt", "update"])
.with_exec(vec!["apt", "install", "-y", "clang", "wget", "capnproto"])
// Install mold linker.
.with_exec(vec![
"wget",
"-q",
&format!(
"https://github.com/rui314/mold/releases/download/v{MOLD_VERSION}/mold-{MOLD_VERSION}-x86_64-linux.tar.gz"
),
])
.with_exec(vec![
"tar",
"-xf",
&format!("mold-{MOLD_VERSION}-x86_64-linux.tar.gz"),
])
.with_exec(vec![
"mv",
&format!("mold-{MOLD_VERSION}-x86_64-linux/bin/mold"),
"/usr/bin/mold",
]);
// Step 1: build deps with skeleton source (cacheable layer).
let prebuild = rust_base
.clone()
.with_workdir("/mnt/src")
.with_directory("/mnt/src", dep_src_with_skeleton)
.with_exec(vec!["cargo", "build", "--release", "--bin", BIN_NAME]);
// Step 2: copy cargo registry from prebuild (avoids re-downloading deps).
let build_container = rust_base
.with_workdir("/mnt/src")
.with_directory("/usr/local/cargo", prebuild.directory("/usr/local/cargo"))
.with_directory("/mnt/src/", src);
Ok(build_container)
}
/// Run tests (no external services needed — SQ tests are self-contained).
async fn run_tests(base: &dagger_sdk::Container) -> eyre::Result<()> {
base.clone()
.with_exec(vec!["cargo", "test", "--workspace"])
.sync()
.await?;
Ok(())
}
/// Build release binary and package into a slim image.
async fn build_release_image(
client: &dagger_sdk::Query,
base: &dagger_sdk::Container,
) -> eyre::Result<dagger_sdk::Container> {
let built = base
.clone()
.with_exec(vec!["cargo", "build", "--release", "--bin", BIN_NAME]);
let binary = built.file(format!("/mnt/src/target/release/{BIN_NAME}"));
// Distroless cc-debian13 matches the build image's glibc (trixie/2.38+)
// and includes libgcc + ca-certificates with no shell or package manager.
let final_image = client
.container()
.from("gcr.io/distroless/cc-debian13")
.with_file(format!("/usr/local/bin/{BIN_NAME}"), binary)
.with_exec(vec![BIN_NAME, "--help"]);
final_image.sync().await?;
// Set the final entrypoint for the published image.
let final_image = final_image.with_entrypoint(vec![BIN_NAME]);
eprintln!("--- release image built successfully");
Ok(final_image)
}
/// Publish image to container registry.
async fn publish_image(
client: &dagger_sdk::Query,
image: &dagger_sdk::Container,
) -> eyre::Result<()> {
let registry = std::env::var("CI_REGISTRY").unwrap_or_else(|_| "git.kjuulh.io".into());
let user = std::env::var("CI_REGISTRY_USER").unwrap_or_else(|_| "kjuulh".into());
let image_ref =
std::env::var("CI_IMAGE").unwrap_or_else(|_| format!("{registry}/{user}/sq:latest"));
let password = std::env::var("CI_REGISTRY_PASSWORD")
.map_err(|_| eyre::eyre!("CI_REGISTRY_PASSWORD must be set for publishing"))?;
image
.clone()
.with_registry_auth(
&registry,
&user,
client.set_secret("registry-password", &password),
)
.publish_opts(
&image_ref,
dagger_sdk::ContainerPublishOptsBuilder::default().build()?,
)
.await?;
eprintln!("--- published {image_ref}");
Ok(())
}

View File

@@ -14,7 +14,7 @@ mod serve;
use serve::*;
#[derive(Parser)]
#[command(author, version, about = "SQ - Stored Queue Server", long_about = None, subcommand_required = true)]
#[command(author, version, about = "SQ - SeedQueue Server", long_about = None, subcommand_required = true)]
struct Command {
#[command(subcommand)]
command: Commands,

View File

@@ -29,3 +29,11 @@ description = "Stop local dev services"
[tasks.develop]
run = "cargo run -p sq-server -- serve"
description = "Run SQ server in development mode"
[tasks."ci:pr"]
run = "cargo run -p ci -- pr"
description = "Run CI PR pipeline via Dagger"
[tasks."ci:main"]
run = "cargo run -p ci -- main"
description = "Run CI main pipeline via Dagger"

View File

@@ -1,10 +1,15 @@
FROM rust:1.84-bookworm AS builder
FROM rust:1.93-trixie AS builder
RUN apt-get update && apt-get install -y --no-install-recommends capnproto && rm -rf /var/lib/apt/lists/*
WORKDIR /app
# Copy workspace manifests first for dependency caching.
COPY Cargo.toml Cargo.lock ./
COPY crates/sq-grpc-interface/Cargo.toml crates/sq-grpc-interface/Cargo.toml
COPY crates/sq-capnp-interface/Cargo.toml crates/sq-capnp-interface/Cargo.toml
COPY crates/sq-capnp-interface/schema crates/sq-capnp-interface/schema
COPY crates/sq-capnp-interface/build.rs crates/sq-capnp-interface/build.rs
COPY crates/sq-models/Cargo.toml crates/sq-models/Cargo.toml
COPY crates/sq-storage/Cargo.toml crates/sq-storage/Cargo.toml
COPY crates/sq-cluster/Cargo.toml crates/sq-cluster/Cargo.toml
@@ -13,7 +18,7 @@ COPY crates/sq-sdk/Cargo.toml crates/sq-sdk/Cargo.toml
COPY crates/sq-sim/Cargo.toml crates/sq-sim/Cargo.toml
# Stub sources for dependency caching layer.
RUN for d in crates/sq-grpc-interface crates/sq-models crates/sq-storage crates/sq-cluster crates/sq-sdk crates/sq-sim; do \
RUN for d in crates/sq-grpc-interface crates/sq-capnp-interface crates/sq-models crates/sq-storage crates/sq-cluster crates/sq-sdk crates/sq-sim; do \
mkdir -p $d/src && echo "" > $d/src/lib.rs; \
done && \
mkdir -p crates/sq-server/src && echo "fn main() {}" > crates/sq-server/src/main.rs
@@ -28,13 +33,11 @@ RUN find crates -name "*.rs" -exec touch {} +
RUN cargo build --release -p sq-server
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates && rm -rf /var/lib/apt/lists/*
FROM gcr.io/distroless/cc-debian13
COPY --from=builder /app/target/release/sq-server /usr/local/bin/sq-server
EXPOSE 6060 6062
EXPOSE 6060 6062 6064
ENTRYPOINT ["sq-server"]
CMD ["serve"]