966 lines
32 KiB
Rust
966 lines
32 KiB
Rust
use std::net::SocketAddr;
|
|
use std::sync::Arc;
|
|
use std::time::{Duration, Instant};
|
|
|
|
use sq_cluster::membership::{Membership, MembershipConfig};
|
|
use sq_grpc_interface::{
|
|
cluster_service_server::ClusterServiceServer,
|
|
control_plane_service_server::ControlPlaneServiceServer,
|
|
data_plane_service_server::DataPlaneServiceServer,
|
|
status_service_client::StatusServiceClient,
|
|
status_service_server::StatusServiceServer,
|
|
GetStatusRequest, SubscribeRequest,
|
|
};
|
|
use sq_grpc_interface::data_plane_service_client::DataPlaneServiceClient;
|
|
use sq_sdk::{
|
|
BatchProducer, BatchProducerConfig, Consumer, ConsumerConfig, Producer, ProducerConfig,
|
|
ProducerMessage,
|
|
};
|
|
use sq_server::capnp::CapnpServer;
|
|
use sq_server::grpc::{cluster, control_plane, data_plane, health};
|
|
use sq_server::state::{Config, State};
|
|
use tempfile::TempDir;
|
|
use tokio_stream::StreamExt;
|
|
use tokio_util::sync::CancellationToken;
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Test harness (shared with cluster_test.rs, inlined here for simplicity)
|
|
// ---------------------------------------------------------------------------
|
|
|
|
struct TestNode {
|
|
grpc_addr: SocketAddr,
|
|
capnp_addr: SocketAddr,
|
|
cancel: CancellationToken,
|
|
pipeline_cancel: CancellationToken,
|
|
_temp_dir: TempDir,
|
|
_server_handle: tokio::task::JoinHandle<()>,
|
|
_capnp_handle: tokio::task::JoinHandle<()>,
|
|
}
|
|
|
|
impl TestNode {
|
|
/// Cap'n Proto endpoint (default data plane).
|
|
fn endpoint(&self) -> String {
|
|
self.capnp_addr.to_string()
|
|
}
|
|
|
|
/// gRPC endpoint (health checks, subscribe verification).
|
|
fn grpc_endpoint(&self) -> String {
|
|
format!("http://{}", self.grpc_addr)
|
|
}
|
|
}
|
|
|
|
struct TestCluster {
|
|
nodes: Vec<TestNode>,
|
|
}
|
|
|
|
impl TestCluster {
|
|
async fn start(n: usize) -> Self {
|
|
let mut grpc_listeners = Vec::new();
|
|
let mut capnp_listeners = Vec::new();
|
|
let mut grpc_addrs = Vec::new();
|
|
let mut capnp_addrs = Vec::new();
|
|
|
|
for _ in 0..n {
|
|
let grpc_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
|
|
let capnp_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
|
|
grpc_addrs.push(grpc_listener.local_addr().unwrap());
|
|
capnp_addrs.push(capnp_listener.local_addr().unwrap());
|
|
grpc_listeners.push(grpc_listener);
|
|
capnp_listeners.push(capnp_listener);
|
|
}
|
|
|
|
let mut nodes = Vec::new();
|
|
for (i, (grpc_listener, capnp_listener)) in
|
|
grpc_listeners.into_iter().zip(capnp_listeners).enumerate()
|
|
{
|
|
let grpc_addr = grpc_addrs[i];
|
|
let capnp_addr = capnp_addrs[i];
|
|
let node_id = format!("stress-node-{}", i + 1);
|
|
let temp_dir = TempDir::new().unwrap();
|
|
|
|
let seeds: Vec<String> = grpc_addrs
|
|
.iter()
|
|
.enumerate()
|
|
.filter(|(j, _)| *j != i)
|
|
.map(|(_, a)| a.to_string())
|
|
.collect();
|
|
|
|
let config = Config {
|
|
node_id: node_id.clone(),
|
|
data_dir: temp_dir.path().to_path_buf(),
|
|
seeds: seeds.clone(),
|
|
grpc_address: grpc_addr.to_string(),
|
|
cluster_id: "test-cluster".to_string(),
|
|
s3_bucket: None,
|
|
s3_endpoint: None,
|
|
s3_region: None,
|
|
sync_policy: sq_models::SyncPolicy::EveryBatch,
|
|
};
|
|
|
|
let (state, mut pipeline) = State::new(config).unwrap();
|
|
|
|
let pipeline_cancel = CancellationToken::new();
|
|
let pipeline_cancel_clone = pipeline_cancel.clone();
|
|
tokio::spawn(async move {
|
|
tokio::select! {
|
|
() = pipeline.run() => {}
|
|
() = pipeline_cancel_clone.cancelled() => {}
|
|
}
|
|
});
|
|
|
|
let membership = Arc::new(Membership::new(MembershipConfig {
|
|
node_id: node_id.clone(),
|
|
address: grpc_addr.to_string(),
|
|
seeds,
|
|
..Default::default()
|
|
}));
|
|
|
|
let cancel = CancellationToken::new();
|
|
|
|
// Spawn gRPC server.
|
|
let cancel_clone = cancel.clone();
|
|
let state_clone = state.clone();
|
|
let membership_clone = membership.clone();
|
|
let incoming = tokio_stream::wrappers::TcpListenerStream::new(grpc_listener);
|
|
let server_handle = tokio::spawn(async move {
|
|
tonic::transport::Server::builder()
|
|
.add_service(StatusServiceServer::new(health::HealthServer {
|
|
state: state_clone.clone(),
|
|
}))
|
|
.add_service(DataPlaneServiceServer::new(data_plane::DataPlaneServer {
|
|
state: state_clone.clone(),
|
|
}))
|
|
.add_service(ControlPlaneServiceServer::new(
|
|
control_plane::ControlPlaneServer {
|
|
state: state_clone.clone(),
|
|
},
|
|
))
|
|
.add_service(ClusterServiceServer::new(cluster::ClusterServer {
|
|
state: state_clone,
|
|
membership: membership_clone,
|
|
}))
|
|
.serve_with_incoming_shutdown(incoming, async move {
|
|
cancel_clone.cancelled().await;
|
|
})
|
|
.await
|
|
.unwrap();
|
|
});
|
|
|
|
// Spawn capnp server.
|
|
let cancel_clone = cancel.clone();
|
|
let capnp_state = state.clone();
|
|
let capnp_handle = tokio::spawn(async move {
|
|
let server = CapnpServer {
|
|
host: capnp_addr,
|
|
state: capnp_state,
|
|
};
|
|
drop(capnp_listener);
|
|
let _ = notmad::Component::run(&server, cancel_clone).await;
|
|
});
|
|
|
|
nodes.push(TestNode {
|
|
grpc_addr,
|
|
capnp_addr,
|
|
cancel,
|
|
pipeline_cancel,
|
|
_temp_dir: temp_dir,
|
|
_server_handle: server_handle,
|
|
_capnp_handle: capnp_handle,
|
|
});
|
|
}
|
|
|
|
for node in &nodes {
|
|
wait_for_ready(&node.grpc_endpoint()).await;
|
|
}
|
|
// Give capnp server a moment to bind.
|
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
|
|
|
TestCluster { nodes }
|
|
}
|
|
|
|
fn node(&self, index: usize) -> &TestNode {
|
|
&self.nodes[index]
|
|
}
|
|
}
|
|
|
|
impl Drop for TestCluster {
|
|
fn drop(&mut self) {
|
|
for node in &self.nodes {
|
|
node.pipeline_cancel.cancel();
|
|
node.cancel.cancel();
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn wait_for_ready(endpoint: &str) {
|
|
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(5);
|
|
loop {
|
|
if tokio::time::Instant::now() > deadline {
|
|
panic!("Server at {} did not become ready in time", endpoint);
|
|
}
|
|
if let Ok(mut client) = StatusServiceClient::connect(endpoint.to_string()).await {
|
|
if client
|
|
.status(tonic::Request::new(GetStatusRequest {}))
|
|
.await
|
|
.is_ok()
|
|
{
|
|
return;
|
|
}
|
|
}
|
|
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Stress test 1: High-volume publish — 100K messages from a single producer
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test]
|
|
async fn stress_single_producer_100k() {
|
|
let cluster = TestCluster::start(1).await;
|
|
let endpoint = cluster.node(0).endpoint();
|
|
let grpc_ep = cluster.node(0).grpc_endpoint();
|
|
|
|
let mut producer = Producer::connect(ProducerConfig {
|
|
address: endpoint.clone(),
|
|
..Default::default()
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
let total = 100_000u64;
|
|
let batch_size = 500;
|
|
let payload = vec![0u8; 128]; // 128-byte messages
|
|
|
|
let start = Instant::now();
|
|
|
|
for batch_start in (0..total).step_by(batch_size) {
|
|
let batch_end = (batch_start + batch_size as u64).min(total);
|
|
let batch: Vec<ProducerMessage> = (batch_start..batch_end)
|
|
.map(|_| ProducerMessage::new("stress-topic", payload.clone()))
|
|
.collect();
|
|
producer.send_batch(batch).await.unwrap();
|
|
}
|
|
|
|
let publish_duration = start.elapsed();
|
|
let msgs_per_sec = total as f64 / publish_duration.as_secs_f64();
|
|
|
|
eprintln!(
|
|
"stress_single_producer_100k: published {} messages in {:.2}s ({:.0} msg/s, {:.1} MB/s)",
|
|
total,
|
|
publish_duration.as_secs_f64(),
|
|
msgs_per_sec,
|
|
(total as f64 * 128.0) / (1024.0 * 1024.0) / publish_duration.as_secs_f64()
|
|
);
|
|
|
|
// Verify: read back all messages via gRPC subscribe.
|
|
let mut client = DataPlaneServiceClient::connect(grpc_ep)
|
|
.await
|
|
.unwrap();
|
|
let response = client
|
|
.subscribe(tonic::Request::new(SubscribeRequest {
|
|
topic: "stress-topic".to_string(),
|
|
partition: 0,
|
|
consumer_group: String::new(),
|
|
start_offset: Some(0),
|
|
max_batch_size: 1000,
|
|
}))
|
|
.await
|
|
.unwrap();
|
|
|
|
let mut stream = response.into_inner();
|
|
let mut consumed = 0u64;
|
|
let consume_start = Instant::now();
|
|
|
|
while consumed < total {
|
|
match tokio::time::timeout(Duration::from_secs(10), stream.next()).await {
|
|
Ok(Some(Ok(batch))) => consumed += batch.messages.len() as u64,
|
|
_ => break,
|
|
}
|
|
}
|
|
|
|
let consume_duration = consume_start.elapsed();
|
|
let consume_per_sec = consumed as f64 / consume_duration.as_secs_f64();
|
|
|
|
eprintln!(
|
|
"stress_single_producer_100k: consumed {} messages in {:.2}s ({:.0} msg/s)",
|
|
consumed,
|
|
consume_duration.as_secs_f64(),
|
|
consume_per_sec
|
|
);
|
|
|
|
assert_eq!(consumed, total, "expected all messages to be consumed");
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Stress test 2: Concurrent producers — 10 producers, 10K messages each
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test]
|
|
async fn stress_concurrent_producers() {
|
|
let cluster = TestCluster::start(1).await;
|
|
let endpoint = cluster.node(0).endpoint();
|
|
let grpc_ep = cluster.node(0).grpc_endpoint();
|
|
|
|
let num_producers = 10;
|
|
let msgs_per_producer = 10_000u64;
|
|
let payload = vec![0u8; 64];
|
|
|
|
let start = Instant::now();
|
|
|
|
let mut handles = Vec::new();
|
|
for p in 0..num_producers {
|
|
let ep = endpoint.clone();
|
|
let pl = payload.clone();
|
|
handles.push(tokio::spawn(async move {
|
|
let mut producer = Producer::connect(ProducerConfig {
|
|
address: ep,
|
|
producer_id: format!("producer-{p}"),
|
|
..Default::default()
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
let topic = format!("concurrent-topic-{p}");
|
|
for batch_start in (0..msgs_per_producer).step_by(100) {
|
|
let batch_end = (batch_start + 100).min(msgs_per_producer);
|
|
let batch: Vec<ProducerMessage> = (batch_start..batch_end)
|
|
.map(|_| ProducerMessage::new(topic.clone(), pl.clone()))
|
|
.collect();
|
|
producer.send_batch(batch).await.unwrap();
|
|
}
|
|
}));
|
|
}
|
|
|
|
for handle in handles {
|
|
handle.await.unwrap();
|
|
}
|
|
|
|
let duration = start.elapsed();
|
|
let total = num_producers as u64 * msgs_per_producer;
|
|
let msgs_per_sec = total as f64 / duration.as_secs_f64();
|
|
|
|
eprintln!(
|
|
"stress_concurrent_producers: {} producers x {} msgs = {} total in {:.2}s ({:.0} msg/s)",
|
|
num_producers,
|
|
msgs_per_producer,
|
|
total,
|
|
duration.as_secs_f64(),
|
|
msgs_per_sec
|
|
);
|
|
|
|
// Verify each topic has the right count via gRPC.
|
|
for p in 0..num_producers {
|
|
let topic = format!("concurrent-topic-{p}");
|
|
let mut client = DataPlaneServiceClient::connect(grpc_ep.clone())
|
|
.await
|
|
.unwrap();
|
|
let response = client
|
|
.subscribe(tonic::Request::new(SubscribeRequest {
|
|
topic: topic.clone(),
|
|
partition: 0,
|
|
consumer_group: String::new(),
|
|
start_offset: Some(0),
|
|
max_batch_size: 1000,
|
|
}))
|
|
.await
|
|
.unwrap();
|
|
|
|
let mut stream = response.into_inner();
|
|
let mut count = 0u64;
|
|
while count < msgs_per_producer {
|
|
match tokio::time::timeout(Duration::from_secs(5), stream.next()).await {
|
|
Ok(Some(Ok(batch))) => count += batch.messages.len() as u64,
|
|
_ => break,
|
|
}
|
|
}
|
|
assert_eq!(
|
|
count, msgs_per_producer,
|
|
"topic {topic} expected {msgs_per_producer} messages, got {count}"
|
|
);
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Stress test 3: Concurrent consumers — publish then read in parallel
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test]
|
|
async fn stress_concurrent_consumers() {
|
|
let cluster = TestCluster::start(1).await;
|
|
let endpoint = cluster.node(0).endpoint();
|
|
let grpc_ep = cluster.node(0).grpc_endpoint();
|
|
let total = 50_000u64;
|
|
let payload = vec![0u8; 64];
|
|
|
|
// Pre-publish messages.
|
|
let mut producer = Producer::connect(ProducerConfig {
|
|
address: endpoint.clone(),
|
|
..Default::default()
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
for batch_start in (0..total).step_by(500) {
|
|
let batch_end = (batch_start + 500).min(total);
|
|
let batch: Vec<ProducerMessage> = (batch_start..batch_end)
|
|
.map(|_| ProducerMessage::new("consume-stress", payload.clone()))
|
|
.collect();
|
|
producer.send_batch(batch).await.unwrap();
|
|
}
|
|
|
|
// Consume in parallel from 5 independent consumers via gRPC (no consumer group — each reads all).
|
|
let num_consumers = 5;
|
|
let start = Instant::now();
|
|
|
|
let mut handles = Vec::new();
|
|
for _ in 0..num_consumers {
|
|
let ep = grpc_ep.clone();
|
|
handles.push(tokio::spawn(async move {
|
|
let mut client = DataPlaneServiceClient::connect(ep).await.unwrap();
|
|
let response = client
|
|
.subscribe(tonic::Request::new(SubscribeRequest {
|
|
topic: "consume-stress".to_string(),
|
|
partition: 0,
|
|
consumer_group: String::new(),
|
|
start_offset: Some(0),
|
|
max_batch_size: 1000,
|
|
}))
|
|
.await
|
|
.unwrap();
|
|
|
|
let mut stream = response.into_inner();
|
|
let mut count = 0u64;
|
|
while count < total {
|
|
match tokio::time::timeout(Duration::from_secs(10), stream.next()).await {
|
|
Ok(Some(Ok(batch))) => count += batch.messages.len() as u64,
|
|
_ => break,
|
|
}
|
|
}
|
|
count
|
|
}));
|
|
}
|
|
|
|
for handle in handles {
|
|
let count = handle.await.unwrap();
|
|
assert_eq!(count, total, "each consumer should read all {total} messages");
|
|
}
|
|
|
|
let duration = start.elapsed();
|
|
eprintln!(
|
|
"stress_concurrent_consumers: {} consumers each read {} msgs in {:.2}s",
|
|
num_consumers,
|
|
total,
|
|
duration.as_secs_f64()
|
|
);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Stress test 4: Sustained load — publish+consume simultaneously over time
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test]
|
|
async fn stress_sustained_load() {
|
|
let cluster = TestCluster::start(1).await;
|
|
let endpoint = cluster.node(0).endpoint();
|
|
let grpc_ep = cluster.node(0).grpc_endpoint();
|
|
let sustain_duration = Duration::from_secs(3);
|
|
let payload = vec![0u8; 256];
|
|
|
|
let ep = endpoint.clone();
|
|
let pl = payload.clone();
|
|
|
|
// Producer: publish as fast as possible for the sustained duration.
|
|
let producer_handle = tokio::spawn(async move {
|
|
let mut producer = Producer::connect(ProducerConfig {
|
|
address: ep,
|
|
..Default::default()
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
let start = Instant::now();
|
|
let mut total = 0u64;
|
|
while start.elapsed() < sustain_duration {
|
|
let batch: Vec<ProducerMessage> = (0..100)
|
|
.map(|_| ProducerMessage::new("sustained-topic", pl.clone()))
|
|
.collect();
|
|
producer.send_batch(batch).await.unwrap();
|
|
total += 100;
|
|
}
|
|
(total, start.elapsed())
|
|
});
|
|
|
|
// Give producer a head start.
|
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
|
|
|
// Consumer: read as fast as possible via gRPC subscribe.
|
|
let ep = grpc_ep.clone();
|
|
let consumer_handle = tokio::spawn(async move {
|
|
let mut client = DataPlaneServiceClient::connect(ep).await.unwrap();
|
|
let response = client
|
|
.subscribe(tonic::Request::new(SubscribeRequest {
|
|
topic: "sustained-topic".to_string(),
|
|
partition: 0,
|
|
consumer_group: String::new(),
|
|
start_offset: Some(0),
|
|
max_batch_size: 1000,
|
|
}))
|
|
.await
|
|
.unwrap();
|
|
|
|
let mut stream = response.into_inner();
|
|
let mut count = 0u64;
|
|
let start = Instant::now();
|
|
|
|
// Read for longer than the producer runs to drain everything.
|
|
let read_deadline = sustain_duration + Duration::from_secs(5);
|
|
while start.elapsed() < read_deadline {
|
|
match tokio::time::timeout(Duration::from_secs(2), stream.next()).await {
|
|
Ok(Some(Ok(batch))) => count += batch.messages.len() as u64,
|
|
_ => break,
|
|
}
|
|
}
|
|
count
|
|
});
|
|
|
|
let (published, pub_duration) = producer_handle.await.unwrap();
|
|
let consumed = consumer_handle.await.unwrap();
|
|
|
|
let pub_rate = published as f64 / pub_duration.as_secs_f64();
|
|
let throughput_mb =
|
|
(published as f64 * 256.0) / (1024.0 * 1024.0) / pub_duration.as_secs_f64();
|
|
|
|
eprintln!(
|
|
"stress_sustained_load: published {} in {:.2}s ({:.0} msg/s, {:.1} MB/s), consumed {}",
|
|
published,
|
|
pub_duration.as_secs_f64(),
|
|
pub_rate,
|
|
throughput_mb,
|
|
consumed
|
|
);
|
|
|
|
assert!(
|
|
published > 0,
|
|
"should have published messages during sustained load"
|
|
);
|
|
assert_eq!(consumed, published, "consumer should eventually read all published messages");
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Stress test 5: Multi-topic fan-out — publish to many topics simultaneously
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test]
|
|
async fn stress_multi_topic_fanout() {
|
|
let cluster = TestCluster::start(1).await;
|
|
let endpoint = cluster.node(0).endpoint();
|
|
let grpc_ep = cluster.node(0).grpc_endpoint();
|
|
let num_topics = 50;
|
|
let msgs_per_topic = 1_000u64;
|
|
let payload = vec![0u8; 64];
|
|
|
|
let start = Instant::now();
|
|
|
|
let mut producer = Producer::connect(ProducerConfig {
|
|
address: endpoint.clone(),
|
|
..Default::default()
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
// Publish to many topics in round-robin batches.
|
|
for batch_start in (0..msgs_per_topic).step_by(100) {
|
|
let batch_end = (batch_start + 100).min(msgs_per_topic);
|
|
for t in 0..num_topics {
|
|
let topic = format!("fanout-{t}");
|
|
let batch: Vec<ProducerMessage> = (batch_start..batch_end)
|
|
.map(|_| ProducerMessage::new(topic.clone(), payload.clone()))
|
|
.collect();
|
|
producer.send_batch(batch).await.unwrap();
|
|
}
|
|
}
|
|
|
|
let duration = start.elapsed();
|
|
let total = num_topics as u64 * msgs_per_topic;
|
|
eprintln!(
|
|
"stress_multi_topic_fanout: {} topics x {} msgs = {} total in {:.2}s ({:.0} msg/s)",
|
|
num_topics,
|
|
msgs_per_topic,
|
|
total,
|
|
duration.as_secs_f64(),
|
|
total as f64 / duration.as_secs_f64()
|
|
);
|
|
|
|
// Spot-check a few topics via gRPC.
|
|
for t in [0, num_topics / 2, num_topics - 1] {
|
|
let topic = format!("fanout-{t}");
|
|
let mut client = DataPlaneServiceClient::connect(grpc_ep.clone())
|
|
.await
|
|
.unwrap();
|
|
let response = client
|
|
.subscribe(tonic::Request::new(SubscribeRequest {
|
|
topic: topic.clone(),
|
|
partition: 0,
|
|
consumer_group: String::new(),
|
|
start_offset: Some(0),
|
|
max_batch_size: 1000,
|
|
}))
|
|
.await
|
|
.unwrap();
|
|
|
|
let mut stream = response.into_inner();
|
|
let mut count = 0u64;
|
|
while count < msgs_per_topic {
|
|
match tokio::time::timeout(Duration::from_secs(5), stream.next()).await {
|
|
Ok(Some(Ok(batch))) => count += batch.messages.len() as u64,
|
|
_ => break,
|
|
}
|
|
}
|
|
assert_eq!(
|
|
count, msgs_per_topic,
|
|
"topic {topic} expected {msgs_per_topic} messages, got {count}"
|
|
);
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Stress test 6: Large message bodies — 10K messages with 4KB payloads
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test]
|
|
async fn stress_large_messages() {
|
|
let cluster = TestCluster::start(1).await;
|
|
let endpoint = cluster.node(0).endpoint();
|
|
let grpc_ep = cluster.node(0).grpc_endpoint();
|
|
|
|
let total = 10_000u64;
|
|
let payload = vec![0xABu8; 4096]; // 4KB messages
|
|
|
|
let mut producer = Producer::connect(ProducerConfig {
|
|
address: endpoint.clone(),
|
|
..Default::default()
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
let start = Instant::now();
|
|
|
|
for batch_start in (0..total).step_by(50) {
|
|
let batch_end = (batch_start + 50).min(total);
|
|
let batch: Vec<ProducerMessage> = (batch_start..batch_end)
|
|
.map(|_| ProducerMessage::new("large-msgs", payload.clone()))
|
|
.collect();
|
|
producer.send_batch(batch).await.unwrap();
|
|
}
|
|
|
|
let pub_duration = start.elapsed();
|
|
let data_mb = (total as f64 * 4096.0) / (1024.0 * 1024.0);
|
|
eprintln!(
|
|
"stress_large_messages: published {} x 4KB = {:.1}MB in {:.2}s ({:.1} MB/s)",
|
|
total,
|
|
data_mb,
|
|
pub_duration.as_secs_f64(),
|
|
data_mb / pub_duration.as_secs_f64()
|
|
);
|
|
|
|
// Verify all data reads back correctly via gRPC.
|
|
let mut client = DataPlaneServiceClient::connect(grpc_ep).await.unwrap();
|
|
let response = client
|
|
.subscribe(tonic::Request::new(SubscribeRequest {
|
|
topic: "large-msgs".to_string(),
|
|
partition: 0,
|
|
consumer_group: String::new(),
|
|
start_offset: Some(0),
|
|
max_batch_size: 200,
|
|
}))
|
|
.await
|
|
.unwrap();
|
|
|
|
let mut stream = response.into_inner();
|
|
let mut count = 0u64;
|
|
while count < total {
|
|
match tokio::time::timeout(Duration::from_secs(10), stream.next()).await {
|
|
Ok(Some(Ok(batch))) => {
|
|
for msg in &batch.messages {
|
|
assert_eq!(msg.value.len(), 4096, "message body should be 4KB");
|
|
assert!(msg.value.iter().all(|&b| b == 0xAB), "data integrity check");
|
|
}
|
|
count += batch.messages.len() as u64;
|
|
}
|
|
_ => break,
|
|
}
|
|
}
|
|
|
|
assert_eq!(count, total, "all large messages should be consumed");
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Stress test 7: Consumer group offset tracking under load
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test]
|
|
async fn stress_consumer_group_resume() {
|
|
let cluster = TestCluster::start(1).await;
|
|
let endpoint = cluster.node(0).endpoint();
|
|
let total = 10_000u64;
|
|
let payload = vec![0u8; 32];
|
|
|
|
// Publish all messages.
|
|
let mut producer = Producer::connect(ProducerConfig {
|
|
address: endpoint.clone(),
|
|
..Default::default()
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
for batch_start in (0..total).step_by(500) {
|
|
let batch_end = (batch_start + 500).min(total);
|
|
let batch: Vec<ProducerMessage> = (batch_start..batch_end)
|
|
.map(|_| ProducerMessage::new("cg-stress", payload.clone()))
|
|
.collect();
|
|
producer.send_batch(batch).await.unwrap();
|
|
}
|
|
|
|
// Consume first half with auto-commit.
|
|
let half = total / 2;
|
|
{
|
|
let mut consumer = Consumer::connect(ConsumerConfig {
|
|
address: endpoint.clone(),
|
|
consumer_group: "stress-group".to_string(),
|
|
topic: "cg-stress".to_string(),
|
|
auto_commit: true,
|
|
max_poll_records: 500,
|
|
..Default::default()
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
let mut consumed = 0u64;
|
|
while consumed < half {
|
|
let msgs = tokio::time::timeout(Duration::from_secs(5), consumer.poll())
|
|
.await
|
|
.unwrap()
|
|
.unwrap();
|
|
consumed += msgs.len() as u64;
|
|
}
|
|
assert!(consumed >= half, "should have consumed at least half");
|
|
}
|
|
|
|
// Reconnect — should resume from the committed offset.
|
|
{
|
|
let mut consumer = Consumer::connect(ConsumerConfig {
|
|
address: endpoint.clone(),
|
|
consumer_group: "stress-group".to_string(),
|
|
topic: "cg-stress".to_string(),
|
|
auto_commit: true,
|
|
max_poll_records: 500,
|
|
..Default::default()
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
let msgs = tokio::time::timeout(Duration::from_secs(5), consumer.poll())
|
|
.await
|
|
.unwrap()
|
|
.unwrap();
|
|
|
|
// First message after reconnect should be at or after the halfway point.
|
|
assert!(
|
|
!msgs.is_empty(),
|
|
"should receive messages after resume"
|
|
);
|
|
let first_offset = msgs[0].offset;
|
|
assert!(
|
|
first_offset >= half - 500, // Allow some re-delivery due to batch commit
|
|
"first offset after resume should be near {half}, got {first_offset}"
|
|
);
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Stress test 8: BatchProducer — 100K messages from a single batching producer
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test]
|
|
async fn stress_batch_producer_100k() {
|
|
let cluster = TestCluster::start(1).await;
|
|
let endpoint = cluster.node(0).endpoint();
|
|
let grpc_ep = cluster.node(0).grpc_endpoint();
|
|
|
|
let producer = BatchProducer::connect(BatchProducerConfig {
|
|
address: endpoint.clone(),
|
|
max_batch_size: 1000,
|
|
flush_interval_ms: 5,
|
|
channel_capacity: 20_000,
|
|
..Default::default()
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
let producer = Arc::new(producer);
|
|
let total = 100_000u64;
|
|
let payload = vec![0u8; 128];
|
|
|
|
let start = Instant::now();
|
|
|
|
// Spawn a task per message to fully saturate the batch pipeline.
|
|
let mut handles = Vec::with_capacity(total as usize);
|
|
for _ in 0..total {
|
|
let p = producer.clone();
|
|
let pl = payload.clone();
|
|
handles.push(tokio::spawn(async move {
|
|
p.send(ProducerMessage::new("batch-stress", pl))
|
|
.await
|
|
.unwrap();
|
|
}));
|
|
}
|
|
|
|
for handle in handles {
|
|
handle.await.unwrap();
|
|
}
|
|
|
|
let publish_duration = start.elapsed();
|
|
let msgs_per_sec = total as f64 / publish_duration.as_secs_f64();
|
|
|
|
eprintln!(
|
|
"stress_batch_producer_100k: published {} messages in {:.2}s ({:.0} msg/s, {:.1} MB/s)",
|
|
total,
|
|
publish_duration.as_secs_f64(),
|
|
msgs_per_sec,
|
|
(total as f64 * 128.0) / (1024.0 * 1024.0) / publish_duration.as_secs_f64()
|
|
);
|
|
|
|
// Verify: read back all messages via gRPC.
|
|
let mut client = DataPlaneServiceClient::connect(grpc_ep).await.unwrap();
|
|
let response = client
|
|
.subscribe(tonic::Request::new(SubscribeRequest {
|
|
topic: "batch-stress".to_string(),
|
|
partition: 0,
|
|
consumer_group: String::new(),
|
|
start_offset: Some(0),
|
|
max_batch_size: 1000,
|
|
}))
|
|
.await
|
|
.unwrap();
|
|
|
|
let mut stream = response.into_inner();
|
|
let mut consumed = 0u64;
|
|
|
|
while consumed < total {
|
|
match tokio::time::timeout(Duration::from_secs(10), stream.next()).await {
|
|
Ok(Some(Ok(batch))) => consumed += batch.messages.len() as u64,
|
|
_ => break,
|
|
}
|
|
}
|
|
|
|
assert_eq!(consumed, total, "expected all messages to be consumed");
|
|
|
|
// Close the producer (flushes remaining).
|
|
Arc::try_unwrap(producer).ok().unwrap().close().await;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Stress test 9: BatchProducer concurrent — 10 batching producers, 10K each
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test]
|
|
async fn stress_batch_concurrent_producers() {
|
|
let cluster = TestCluster::start(1).await;
|
|
let endpoint = cluster.node(0).endpoint();
|
|
let grpc_ep = cluster.node(0).grpc_endpoint();
|
|
|
|
let num_producers = 10;
|
|
let msgs_per_producer = 10_000u64;
|
|
let payload = vec![0u8; 64];
|
|
|
|
let start = Instant::now();
|
|
|
|
let mut handles = Vec::new();
|
|
for p in 0..num_producers {
|
|
let ep = endpoint.clone();
|
|
let pl = payload.clone();
|
|
handles.push(tokio::spawn(async move {
|
|
let producer = Arc::new(
|
|
BatchProducer::connect(BatchProducerConfig {
|
|
address: ep,
|
|
producer_id: format!("batch-producer-{p}"),
|
|
max_batch_size: 500,
|
|
flush_interval_ms: 5,
|
|
..Default::default()
|
|
})
|
|
.await
|
|
.unwrap(),
|
|
);
|
|
|
|
let topic = format!("batch-concurrent-{p}");
|
|
let mut send_handles = Vec::new();
|
|
|
|
// Fire all sends concurrently within each producer.
|
|
for _ in 0..msgs_per_producer {
|
|
let p = producer.clone();
|
|
let t = topic.clone();
|
|
let pl = pl.clone();
|
|
send_handles.push(tokio::spawn(async move {
|
|
p.send(ProducerMessage::new(t, pl)).await.unwrap();
|
|
}));
|
|
}
|
|
|
|
// Await all acks.
|
|
for handle in send_handles {
|
|
handle.await.unwrap();
|
|
}
|
|
|
|
Arc::try_unwrap(producer).ok().unwrap().close().await;
|
|
}));
|
|
}
|
|
|
|
for handle in handles {
|
|
handle.await.unwrap();
|
|
}
|
|
|
|
let duration = start.elapsed();
|
|
let total = num_producers as u64 * msgs_per_producer;
|
|
let msgs_per_sec = total as f64 / duration.as_secs_f64();
|
|
|
|
eprintln!(
|
|
"stress_batch_concurrent_producers: {} producers x {} msgs = {} total in {:.2}s ({:.0} msg/s)",
|
|
num_producers,
|
|
msgs_per_producer,
|
|
total,
|
|
duration.as_secs_f64(),
|
|
msgs_per_sec
|
|
);
|
|
|
|
// Verify each topic has the right count via gRPC.
|
|
for p in 0..num_producers {
|
|
let topic = format!("batch-concurrent-{p}");
|
|
let mut client = DataPlaneServiceClient::connect(grpc_ep.clone())
|
|
.await
|
|
.unwrap();
|
|
let response = client
|
|
.subscribe(tonic::Request::new(SubscribeRequest {
|
|
topic: topic.clone(),
|
|
partition: 0,
|
|
consumer_group: String::new(),
|
|
start_offset: Some(0),
|
|
max_batch_size: 1000,
|
|
}))
|
|
.await
|
|
.unwrap();
|
|
|
|
let mut stream = response.into_inner();
|
|
let mut count = 0u64;
|
|
while count < msgs_per_producer {
|
|
match tokio::time::timeout(Duration::from_secs(5), stream.next()).await {
|
|
Ok(Some(Ok(batch))) => count += batch.messages.len() as u64,
|
|
_ => break,
|
|
}
|
|
}
|
|
assert_eq!(
|
|
count, msgs_per_producer,
|
|
"topic {topic} expected {msgs_per_producer} messages, got {count}"
|
|
);
|
|
}
|
|
}
|