mirror of
https://github.com/kjuulh/dagger-rs.git
synced 2025-08-04 06:43:26 +02:00
fix(core): Fix async panic on blocking #19
Replaced internal threads with tokio spawn functions
This commit is contained in:
@@ -3,7 +3,7 @@ use std::{
|
||||
io::{BufRead, BufReader},
|
||||
path::PathBuf,
|
||||
process::{Child, Stdio},
|
||||
sync::{mpsc::sync_channel, Arc},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use crate::{config::Config, connect_params::ConnectParams};
|
||||
@@ -20,12 +20,12 @@ impl CliSession {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn connect(
|
||||
pub async fn connect(
|
||||
&self,
|
||||
config: &Config,
|
||||
cli_path: &PathBuf,
|
||||
) -> eyre::Result<(ConnectParams, Child)> {
|
||||
self.inner.connect(config, cli_path)
|
||||
self.inner.connect(config, cli_path).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,13 +33,13 @@ impl CliSession {
|
||||
struct InnerCliSession {}
|
||||
|
||||
impl InnerCliSession {
|
||||
pub fn connect(
|
||||
pub async fn connect(
|
||||
&self,
|
||||
config: &Config,
|
||||
cli_path: &PathBuf,
|
||||
) -> eyre::Result<(ConnectParams, Child)> {
|
||||
let proc = self.start(config, cli_path)?;
|
||||
let params = self.get_conn(proc)?;
|
||||
let params = self.get_conn(proc).await?;
|
||||
Ok(params)
|
||||
}
|
||||
|
||||
@@ -70,7 +70,7 @@ impl InnerCliSession {
|
||||
return Ok(proc);
|
||||
}
|
||||
|
||||
fn get_conn(
|
||||
async fn get_conn(
|
||||
&self,
|
||||
mut proc: std::process::Child,
|
||||
) -> eyre::Result<(ConnectParams, std::process::Child)> {
|
||||
@@ -84,14 +84,14 @@ impl InnerCliSession {
|
||||
.take()
|
||||
.ok_or(eyre::anyhow!("could not acquire stderr from child process"))?;
|
||||
|
||||
let (sender, receiver) = sync_channel(1);
|
||||
let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
std::thread::spawn(move || {
|
||||
tokio::spawn(async move {
|
||||
let stdout_bufr = BufReader::new(stdout);
|
||||
for line in stdout_bufr.lines() {
|
||||
let out = line.as_ref().unwrap();
|
||||
if let Ok(conn) = serde_json::from_str::<ConnectParams>(&out) {
|
||||
sender.send(conn).unwrap();
|
||||
sender.send(conn).await.unwrap();
|
||||
}
|
||||
if let Ok(line) = line {
|
||||
println!("dagger: {}", line);
|
||||
@@ -99,7 +99,7 @@ impl InnerCliSession {
|
||||
}
|
||||
});
|
||||
|
||||
std::thread::spawn(|| {
|
||||
tokio::spawn(async move {
|
||||
let stderr_bufr = BufReader::new(stderr);
|
||||
for line in stderr_bufr.lines() {
|
||||
if let Ok(line) = line {
|
||||
@@ -109,7 +109,7 @@ impl InnerCliSession {
|
||||
}
|
||||
});
|
||||
|
||||
let conn = receiver.recv()?;
|
||||
let conn = receiver.recv().await.ok_or(eyre::anyhow!("could not receive ok signal from dagger-engine"))?;
|
||||
|
||||
Ok((conn, proc))
|
||||
}
|
||||
|
Reference in New Issue
Block a user