feat: able to run containers
This commit is contained in:
@@ -12,6 +12,24 @@ pub struct PublishResponse {
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct GetTopicRequest {
|
||||
#[prost(string, tag="1")]
|
||||
pub topic: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct GetTopicResponse {
|
||||
#[prost(message, optional, tag="1")]
|
||||
pub projects: ::core::option::Option<Projects>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct Projects {
|
||||
#[prost(message, repeated, tag="1")]
|
||||
pub projects: ::prost::alloc::vec::Vec<Project>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct Project {
|
||||
#[prost(string, tag="1")]
|
||||
pub name: ::prost::alloc::string::String,
|
||||
|
@@ -4,6 +4,7 @@ pub mod registry_service_client {
|
||||
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
|
||||
use tonic::codegen::*;
|
||||
use tonic::codegen::http::Uri;
|
||||
///
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RegistryServiceClient<T> {
|
||||
inner: tonic::client::Grpc<T>,
|
||||
@@ -84,6 +85,7 @@ pub mod registry_service_client {
|
||||
self.inner = self.inner.max_encoding_message_size(limit);
|
||||
self
|
||||
}
|
||||
///
|
||||
pub async fn publish(
|
||||
&mut self,
|
||||
request: impl tonic::IntoRequest<super::PublishRequest>,
|
||||
@@ -109,6 +111,32 @@ pub mod registry_service_client {
|
||||
.insert(GrpcMethod::new("norun.v1.RegistryService", "Publish"));
|
||||
self.inner.unary(req, path, codec).await
|
||||
}
|
||||
///
|
||||
pub async fn get_topic(
|
||||
&mut self,
|
||||
request: impl tonic::IntoRequest<super::GetTopicRequest>,
|
||||
) -> std::result::Result<
|
||||
tonic::Response<super::GetTopicResponse>,
|
||||
tonic::Status,
|
||||
> {
|
||||
self.inner
|
||||
.ready()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tonic::Status::new(
|
||||
tonic::Code::Unknown,
|
||||
format!("Service was not ready: {}", e.into()),
|
||||
)
|
||||
})?;
|
||||
let codec = tonic::codec::ProstCodec::default();
|
||||
let path = http::uri::PathAndQuery::from_static(
|
||||
"/norun.v1.RegistryService/GetTopic",
|
||||
);
|
||||
let mut req = request.into_request();
|
||||
req.extensions_mut()
|
||||
.insert(GrpcMethod::new("norun.v1.RegistryService", "GetTopic"));
|
||||
self.inner.unary(req, path, codec).await
|
||||
}
|
||||
}
|
||||
}
|
||||
/// Generated server implementations.
|
||||
@@ -118,11 +146,21 @@ pub mod registry_service_server {
|
||||
/// Generated trait containing gRPC methods that should be implemented for use with RegistryServiceServer.
|
||||
#[async_trait]
|
||||
pub trait RegistryService: Send + Sync + 'static {
|
||||
///
|
||||
async fn publish(
|
||||
&self,
|
||||
request: tonic::Request<super::PublishRequest>,
|
||||
) -> std::result::Result<tonic::Response<super::PublishResponse>, tonic::Status>;
|
||||
///
|
||||
async fn get_topic(
|
||||
&self,
|
||||
request: tonic::Request<super::GetTopicRequest>,
|
||||
) -> std::result::Result<
|
||||
tonic::Response<super::GetTopicResponse>,
|
||||
tonic::Status,
|
||||
>;
|
||||
}
|
||||
///
|
||||
#[derive(Debug)]
|
||||
pub struct RegistryServiceServer<T: RegistryService> {
|
||||
inner: _Inner<T>,
|
||||
@@ -248,6 +286,52 @@ pub mod registry_service_server {
|
||||
};
|
||||
Box::pin(fut)
|
||||
}
|
||||
"/norun.v1.RegistryService/GetTopic" => {
|
||||
#[allow(non_camel_case_types)]
|
||||
struct GetTopicSvc<T: RegistryService>(pub Arc<T>);
|
||||
impl<
|
||||
T: RegistryService,
|
||||
> tonic::server::UnaryService<super::GetTopicRequest>
|
||||
for GetTopicSvc<T> {
|
||||
type Response = super::GetTopicResponse;
|
||||
type Future = BoxFuture<
|
||||
tonic::Response<Self::Response>,
|
||||
tonic::Status,
|
||||
>;
|
||||
fn call(
|
||||
&mut self,
|
||||
request: tonic::Request<super::GetTopicRequest>,
|
||||
) -> Self::Future {
|
||||
let inner = Arc::clone(&self.0);
|
||||
let fut = async move {
|
||||
<T as RegistryService>::get_topic(&inner, request).await
|
||||
};
|
||||
Box::pin(fut)
|
||||
}
|
||||
}
|
||||
let accept_compression_encodings = self.accept_compression_encodings;
|
||||
let send_compression_encodings = self.send_compression_encodings;
|
||||
let max_decoding_message_size = self.max_decoding_message_size;
|
||||
let max_encoding_message_size = self.max_encoding_message_size;
|
||||
let inner = self.inner.clone();
|
||||
let fut = async move {
|
||||
let inner = inner.0;
|
||||
let method = GetTopicSvc(inner);
|
||||
let codec = tonic::codec::ProstCodec::default();
|
||||
let mut grpc = tonic::server::Grpc::new(codec)
|
||||
.apply_compression_config(
|
||||
accept_compression_encodings,
|
||||
send_compression_encodings,
|
||||
)
|
||||
.apply_max_message_size_config(
|
||||
max_decoding_message_size,
|
||||
max_encoding_message_size,
|
||||
);
|
||||
let res = grpc.unary(method, req).await;
|
||||
Ok(res)
|
||||
};
|
||||
Box::pin(fut)
|
||||
}
|
||||
_ => {
|
||||
Box::pin(async move {
|
||||
Ok(
|
||||
|
@@ -21,6 +21,8 @@ tonic = { workspace = true }
|
||||
tokio-util = "0.7.15"
|
||||
async-trait = "0.1.88"
|
||||
notmad = "0.7.2"
|
||||
bollard = "0.19.1"
|
||||
futures-util = "0.3.31"
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = "1.4.1"
|
||||
|
@@ -1,12 +1,13 @@
|
||||
use clap::{Parser, Subcommand};
|
||||
|
||||
use crate::{
|
||||
cli::{publish::PublishCommand, serve::ServeCommand},
|
||||
cli::{publish::PublishCommand, serve::ServeCommand, subscribe::SubscribeCommand},
|
||||
state::ClientState,
|
||||
};
|
||||
|
||||
mod publish;
|
||||
mod serve;
|
||||
mod subscribe;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(author, version, about)]
|
||||
@@ -24,6 +25,7 @@ struct Cli {
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum CliSubcommands {
|
||||
Subscribe(SubscribeCommand),
|
||||
Publish(PublishCommand),
|
||||
Serve(ServeCommand),
|
||||
}
|
||||
@@ -36,5 +38,6 @@ pub async fn execute() -> anyhow::Result<()> {
|
||||
match cmd.subcommands {
|
||||
CliSubcommands::Publish(cmd) => cmd.execute(&state).await,
|
||||
CliSubcommands::Serve(cmd) => cmd.execute().await,
|
||||
CliSubcommands::Subscribe(cmd) => cmd.execute(&state).await,
|
||||
}
|
||||
}
|
||||
|
@@ -4,9 +4,6 @@ use crate::{grpc_client::GrpcClientState, models::ProjectTag, project_file, stat
|
||||
|
||||
#[derive(clap::Parser, Debug)]
|
||||
pub struct PublishCommand {
|
||||
#[arg(value_parser = clap::value_parser!(ProjectTag))]
|
||||
project_tag: ProjectTag,
|
||||
|
||||
#[arg(long = "project-path", default_value = ".")]
|
||||
project_path: PathBuf,
|
||||
}
|
||||
|
31
crates/norun/src/cli/subscribe.rs
Normal file
31
crates/norun/src/cli/subscribe.rs
Normal file
@@ -0,0 +1,31 @@
|
||||
use crate::{
|
||||
container_runtime::ContainerRuntimeState, grpc_client::GrpcClientState, state::ClientState,
|
||||
};
|
||||
|
||||
#[derive(clap::Parser)]
|
||||
pub struct SubscribeCommand {
|
||||
#[arg(long)]
|
||||
topic: String,
|
||||
}
|
||||
|
||||
impl SubscribeCommand {
|
||||
pub async fn execute(&self, state: &ClientState) -> anyhow::Result<()> {
|
||||
let projects = state.grpc_client().subscribe(&self.topic).await?;
|
||||
|
||||
let runtime = state.container_runtime();
|
||||
|
||||
println!("printing found projects (len={})", projects.projects.len());
|
||||
for project in projects.projects {
|
||||
println!("project: {project:?}");
|
||||
|
||||
runtime
|
||||
.ensure_running(
|
||||
&project.name,
|
||||
&format!("{}:{}", project.image, project.version),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
85
crates/norun/src/container_runtime.rs
Normal file
85
crates/norun/src/container_runtime.rs
Normal file
@@ -0,0 +1,85 @@
|
||||
use std::{collections::HashMap, sync::LazyLock};
|
||||
|
||||
use bollard::{
|
||||
Docker,
|
||||
image::CreateImageOptions,
|
||||
query_parameters::{
|
||||
CreateContainerOptionsBuilder, CreateImageOptionsBuilder, ListContainersOptionsBuilder,
|
||||
StartContainerOptions,
|
||||
},
|
||||
secret::ContainerCreateBody,
|
||||
};
|
||||
use futures_util::TryStreamExt;
|
||||
|
||||
use crate::state::ClientState;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ContainerRuntime {
|
||||
client: Docker,
|
||||
}
|
||||
|
||||
impl ContainerRuntime {
|
||||
#[tracing::instrument(skip(self), level = "trace")]
|
||||
pub async fn ensure_running(&self, name: &str, image: &str) -> anyhow::Result<()> {
|
||||
tracing::debug!("ensuring that image is running");
|
||||
|
||||
let containers = self
|
||||
.client
|
||||
.list_containers(Some(
|
||||
ListContainersOptionsBuilder::default()
|
||||
.all(true)
|
||||
.filters(&HashMap::from([(
|
||||
"name".to_string(),
|
||||
vec![name.to_string()],
|
||||
)]))
|
||||
.build(),
|
||||
))
|
||||
.await?;
|
||||
if !containers.is_empty() {
|
||||
// Reconcile difference
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let _ = self
|
||||
.client
|
||||
.create_image(
|
||||
Some(CreateImageOptionsBuilder::new().from_image(image).build()),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
|
||||
self.client
|
||||
.create_container(
|
||||
Some(CreateContainerOptionsBuilder::new().name(name).build()),
|
||||
ContainerCreateBody {
|
||||
image: Some(image.into()),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
self.client
|
||||
.start_container(name, None::<StartContainerOptions>)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub trait ContainerRuntimeState {
|
||||
fn container_runtime(&self) -> ContainerRuntime;
|
||||
}
|
||||
|
||||
impl ContainerRuntimeState for ClientState {
|
||||
fn container_runtime(&self) -> ContainerRuntime {
|
||||
static CLIENT: LazyLock<bollard::Docker> = LazyLock::new(|| {
|
||||
Docker::connect_with_defaults().expect("to be able to connect to a docker daemon")
|
||||
});
|
||||
|
||||
ContainerRuntime {
|
||||
client: CLIENT.clone(),
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,4 +1,6 @@
|
||||
use norun_grpc_interface::{PublishRequest, registry_service_client::RegistryServiceClient};
|
||||
use norun_grpc_interface::{
|
||||
GetTopicRequest, Projects, PublishRequest, registry_service_client::RegistryServiceClient,
|
||||
};
|
||||
use tokio::sync::OnceCell;
|
||||
use tonic::transport::Channel;
|
||||
|
||||
@@ -26,6 +28,23 @@ impl GrpcClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn subscribe(&self, topic: &str) -> anyhow::Result<Projects> {
|
||||
tracing::trace!("calling subscribe via. grpc on registry");
|
||||
|
||||
let mut registry_client = self.get_registry_client().await?;
|
||||
|
||||
let res = registry_client
|
||||
.get_topic(GetTopicRequest {
|
||||
topic: topic.to_string(),
|
||||
})
|
||||
.await?;
|
||||
|
||||
let res = res.into_inner();
|
||||
|
||||
res.projects
|
||||
.ok_or_else(|| anyhow::anyhow!("failed to get projects from server"))
|
||||
}
|
||||
|
||||
async fn get_registry_client(&self) -> anyhow::Result<RegistryServiceClient<Channel>> {
|
||||
let client = self
|
||||
.registry_client
|
||||
|
@@ -1,6 +1,9 @@
|
||||
use norun_grpc_interface::{registry_service_server::RegistryService, *};
|
||||
|
||||
use crate::{server::services::registry::RegistryServiceState, state::ServerState};
|
||||
use crate::{
|
||||
server::services::registry::{self, RegistryServiceState},
|
||||
state::ServerState,
|
||||
};
|
||||
|
||||
pub struct GrpcRegistryService {
|
||||
pub state: ServerState,
|
||||
@@ -8,7 +11,7 @@ pub struct GrpcRegistryService {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl RegistryService for GrpcRegistryService {
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
#[tracing::instrument(skip(self), level = "trace")]
|
||||
async fn publish(
|
||||
&self,
|
||||
request: tonic::Request<PublishRequest>,
|
||||
@@ -30,4 +33,34 @@ impl RegistryService for GrpcRegistryService {
|
||||
|
||||
Ok(tonic::Response::new(PublishResponse {}))
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self), level = "trace")]
|
||||
async fn get_topic(
|
||||
&self,
|
||||
request: tonic::Request<GetTopicRequest>,
|
||||
) -> std::result::Result<tonic::Response<GetTopicResponse>, tonic::Status> {
|
||||
tracing::debug!("subscribe called");
|
||||
|
||||
let req = request.into_inner();
|
||||
|
||||
let projects = self
|
||||
.state
|
||||
.registry_service()
|
||||
.get_topic(&req.topic)
|
||||
.await
|
||||
.inspect_err(|e| tracing::warn!("failed to subscribe on topic: {}", e))
|
||||
.map_err(|e| tonic::Status::internal(e.to_string()))?;
|
||||
|
||||
Ok(tonic::Response::new(GetTopicResponse {
|
||||
projects: Some(projects.into()),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<registry::Projects> for Projects {
|
||||
fn from(value: registry::Projects) -> Self {
|
||||
Self {
|
||||
projects: value.projects.into_iter().collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -10,6 +10,8 @@ mod server;
|
||||
mod grpc_client;
|
||||
mod grpc_server;
|
||||
|
||||
mod container_runtime;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
|
@@ -1,7 +1,4 @@
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
sync::{Arc, LazyLock},
|
||||
};
|
||||
use std::sync::{Arc, LazyLock};
|
||||
|
||||
use norun_grpc_interface::Project;
|
||||
use tokio::sync::Mutex;
|
||||
@@ -25,6 +22,25 @@ impl RegistryService {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_topic(&self, topic: &str) -> anyhow::Result<Projects> {
|
||||
tracing::debug!("get projects for topic");
|
||||
|
||||
let projects = {
|
||||
let store = self.store.lock().await;
|
||||
store
|
||||
.iter()
|
||||
.filter(|i| i.name == topic)
|
||||
.cloned()
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
Ok(Projects { projects })
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Projects {
|
||||
pub projects: Vec<Project>,
|
||||
}
|
||||
|
||||
pub trait RegistryServiceState {
|
||||
|
Reference in New Issue
Block a user