use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use sq_grpc_interface::{ data_plane_service_client::DataPlaneServiceClient, data_plane_service_server::DataPlaneServiceServer, status_service_client::StatusServiceClient, status_service_server::StatusServiceServer, AckMode, GetStatusRequest, MessageHeader, PublishMessage, PublishRequest, PublishSettings, SubscribeRequest, }; use sq_sim::fs::InMemoryFileSystem; use sq_sim::SimClock; use sq_storage::engine::StorageEngine; use tokio::sync::Mutex; use tokio_stream::StreamExt; /// A lightweight test harness that starts a gRPC server on a random port /// and returns both the server task and connected clients. struct TestServer { addr: SocketAddr, _shutdown: tokio::sync::oneshot::Sender<()>, } impl TestServer { async fn start() -> Self { let fs = Arc::new(InMemoryFileSystem::new()); let clock = Arc::new(SimClock::new()); let config = sq_models::WalConfig { max_segment_bytes: 1024 * 1024, max_segment_age_secs: 3600, data_dir: PathBuf::from("/data"), ..Default::default() }; let engine = StorageEngine::new(fs, clock, config).unwrap(); engine.recover().unwrap(); let engine = Arc::new(Mutex::new(engine)); let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); // Build the health server state-like object inline for tests. let node_id = "test-node".to_string(); struct TestHealthServer { node_id: String, } #[tonic::async_trait] impl sq_grpc_interface::status_service_server::StatusService for TestHealthServer { async fn status( &self, _request: tonic::Request, ) -> Result, tonic::Status> { Ok(tonic::Response::new(sq_grpc_interface::GetStatusResponse { node_id: self.node_id.clone(), cluster: None, })) } } struct TestDataPlaneServer { engine: Arc>>, } #[tonic::async_trait] impl sq_grpc_interface::data_plane_service_server::DataPlaneService for TestDataPlaneServer { async fn publish( &self, request: tonic::Request, ) -> Result, tonic::Status> { let req = request.into_inner(); if req.messages.is_empty() { return Err(tonic::Status::invalid_argument( "messages must not be empty", )); } let mut results = Vec::new(); let engine = self.engine.lock().await; for msg in &req.messages { if msg.topic.is_empty() { return Err(tonic::Status::invalid_argument("topic must not be empty")); } let headers: Vec = msg .headers .iter() .map(|h| sq_models::Header { key: h.key.clone(), value: h.value.clone(), }) .collect(); let key = if msg.key.is_empty() { None } else { Some(msg.key.as_slice()) }; let offset = engine .append(&msg.topic, 0, key, &msg.value, &headers, 0) .map_err(|e| tonic::Status::internal(e.to_string()))?; results.push(sq_grpc_interface::PublishResult { topic: msg.topic.clone(), partition: 0, offset, }); } Ok(tonic::Response::new(sq_grpc_interface::PublishResponse { results, })) } type SubscribeStream = std::pin::Pin< Box< dyn tokio_stream::Stream< Item = Result, > + Send + 'static, >, >; async fn subscribe( &self, request: tonic::Request, ) -> Result, tonic::Status> { let req = request.into_inner(); let batch_size = if req.max_batch_size == 0 { 100 } else { req.max_batch_size as usize }; let start_offset = req.start_offset.unwrap_or(0); let topic = req.topic.clone(); let partition = req.partition; let engine = self.engine.clone(); let stream = async_stream::try_stream! { let mut current_offset = start_offset; let mut empty_polls = 0u32; loop { let messages = { let eng = engine.lock().await; eng.read(&topic, partition, current_offset, batch_size) .map_err(|e| tonic::Status::internal(e.to_string()))? }; if messages.is_empty() { empty_polls += 1; // In tests, stop after a few empty polls to avoid hanging. if empty_polls > 3 { break; } tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; continue; } empty_polls = 0; let consumed: Vec = messages .iter() .map(|m| { current_offset = m.offset + 1; sq_grpc_interface::ConsumedMessage { offset: m.offset, topic: m.topic.to_string(), partition: m.partition, key: m.key.clone().unwrap_or_default(), value: m.value.clone(), headers: m .headers .iter() .map(|h| MessageHeader { key: h.key.clone(), value: h.value.clone(), }) .collect(), timestamp_ms: m.timestamp_ms, } }) .collect(); yield sq_grpc_interface::SubscribeResponse { messages: consumed }; } }; Ok(tonic::Response::new(Box::pin(stream))) } async fn ack( &self, _request: tonic::Request, ) -> Result, tonic::Status> { Ok(tonic::Response::new(sq_grpc_interface::AckResponse {})) } } let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener); tokio::spawn(async move { tonic::transport::Server::builder() .add_service(StatusServiceServer::new(TestHealthServer { node_id: node_id.clone(), })) .add_service(DataPlaneServiceServer::new(TestDataPlaneServer { engine, })) .serve_with_incoming_shutdown(incoming, async { let _ = shutdown_rx.await; }) .await .unwrap(); }); // Give the server a moment to start. tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; TestServer { addr, _shutdown: shutdown_tx, } } fn endpoint(&self) -> String { format!("http://{}", self.addr) } } #[tokio::test] async fn test_status_rpc() { let server = TestServer::start().await; let mut client = StatusServiceClient::connect(server.endpoint()).await.unwrap(); let response = client .status(tonic::Request::new(GetStatusRequest {})) .await .unwrap(); assert_eq!(response.into_inner().node_id, "test-node"); } #[tokio::test] async fn test_publish_single_message() { let server = TestServer::start().await; let mut client = DataPlaneServiceClient::connect(server.endpoint()) .await .unwrap(); let response = client .publish(tonic::Request::new(PublishRequest { messages: vec![PublishMessage { topic: "orders".to_string(), key: vec![], value: b"hello world".to_vec(), headers: vec![], }], settings: None, producer_id: "test".to_string(), })) .await .unwrap(); let results = response.into_inner().results; assert_eq!(results.len(), 1); assert_eq!(results[0].topic, "orders"); assert_eq!(results[0].offset, 0); } #[tokio::test] async fn test_publish_batch_sequential_offsets() { let server = TestServer::start().await; let mut client = DataPlaneServiceClient::connect(server.endpoint()) .await .unwrap(); let messages: Vec = (0..100) .map(|i| PublishMessage { topic: "events".to_string(), key: vec![], value: format!("msg-{i}").into_bytes(), headers: vec![], }) .collect(); let response = client .publish(tonic::Request::new(PublishRequest { messages, settings: Some(PublishSettings { ack_mode: AckMode::All.into(), }), producer_id: "test".to_string(), })) .await .unwrap(); let results = response.into_inner().results; assert_eq!(results.len(), 100); for (i, r) in results.iter().enumerate() { assert_eq!(r.offset, i as u64); assert_eq!(r.topic, "events"); } } #[tokio::test] async fn test_publish_empty_topic_returns_error() { let server = TestServer::start().await; let mut client = DataPlaneServiceClient::connect(server.endpoint()) .await .unwrap(); let err = client .publish(tonic::Request::new(PublishRequest { messages: vec![PublishMessage { topic: "".to_string(), key: vec![], value: b"data".to_vec(), headers: vec![], }], settings: None, producer_id: "test".to_string(), })) .await .unwrap_err(); assert_eq!(err.code(), tonic::Code::InvalidArgument); } #[tokio::test] async fn test_publish_empty_messages_returns_error() { let server = TestServer::start().await; let mut client = DataPlaneServiceClient::connect(server.endpoint()) .await .unwrap(); let err = client .publish(tonic::Request::new(PublishRequest { messages: vec![], settings: None, producer_id: "test".to_string(), })) .await .unwrap_err(); assert_eq!(err.code(), tonic::Code::InvalidArgument); } #[tokio::test] async fn test_publish_with_key_and_headers() { let server = TestServer::start().await; let mut client = DataPlaneServiceClient::connect(server.endpoint()) .await .unwrap(); let response = client .publish(tonic::Request::new(PublishRequest { messages: vec![PublishMessage { topic: "orders".to_string(), key: b"order-123".to_vec(), value: b"payload".to_vec(), headers: vec![MessageHeader { key: "trace-id".to_string(), value: b"abc-123".to_vec(), }], }], settings: None, producer_id: "test".to_string(), })) .await .unwrap(); let results = response.into_inner().results; assert_eq!(results.len(), 1); assert_eq!(results[0].offset, 0); } #[tokio::test] async fn test_subscribe_from_beginning() { let server = TestServer::start().await; let mut client = DataPlaneServiceClient::connect(server.endpoint()) .await .unwrap(); // Publish 10 messages first. let messages: Vec = (0..10) .map(|i| PublishMessage { topic: "events".to_string(), key: vec![], value: format!("msg-{i}").into_bytes(), headers: vec![], }) .collect(); client .publish(tonic::Request::new(PublishRequest { messages, settings: None, producer_id: "test".to_string(), })) .await .unwrap(); // Subscribe from offset 0. let response = client .subscribe(tonic::Request::new(SubscribeRequest { topic: "events".to_string(), partition: 0, consumer_group: "".to_string(), start_offset: Some(0), max_batch_size: 100, })) .await .unwrap(); let mut stream = response.into_inner(); let mut all_messages = Vec::new(); while let Some(Ok(batch)) = stream.next().await { all_messages.extend(batch.messages); if all_messages.len() >= 10 { break; } } assert_eq!(all_messages.len(), 10); for (i, msg) in all_messages.iter().enumerate() { assert_eq!(msg.offset, i as u64); assert_eq!(msg.value, format!("msg-{i}").as_bytes()); assert_eq!(msg.topic, "events"); } } #[tokio::test] async fn test_subscribe_from_middle() { let server = TestServer::start().await; let mut client = DataPlaneServiceClient::connect(server.endpoint()) .await .unwrap(); // Publish 10 messages. let messages: Vec = (0..10) .map(|i| PublishMessage { topic: "events".to_string(), key: vec![], value: format!("msg-{i}").into_bytes(), headers: vec![], }) .collect(); client .publish(tonic::Request::new(PublishRequest { messages, settings: None, producer_id: "test".to_string(), })) .await .unwrap(); // Subscribe from offset 5. let response = client .subscribe(tonic::Request::new(SubscribeRequest { topic: "events".to_string(), partition: 0, consumer_group: "".to_string(), start_offset: Some(5), max_batch_size: 100, })) .await .unwrap(); let mut stream = response.into_inner(); let mut all_messages = Vec::new(); while let Some(Ok(batch)) = stream.next().await { all_messages.extend(batch.messages); if all_messages.len() >= 5 { break; } } assert_eq!(all_messages.len(), 5); assert_eq!(all_messages[0].offset, 5); assert_eq!(all_messages[4].offset, 9); }