feat: mad not properly surfaces panics
Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
@@ -77,8 +77,11 @@
|
|||||||
|
|
||||||
use futures::stream::FuturesUnordered;
|
use futures::stream::FuturesUnordered;
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
use std::{fmt::Display, sync::Arc, error::Error};
|
use std::{error::Error, fmt::Display, sync::Arc};
|
||||||
use tokio::signal::unix::{SignalKind, signal};
|
use tokio::{
|
||||||
|
signal::unix::{SignalKind, signal},
|
||||||
|
task::JoinError,
|
||||||
|
};
|
||||||
|
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
@@ -101,15 +104,13 @@ pub enum MadError {
|
|||||||
|
|
||||||
/// Error that occurred during the run phase of a component.
|
/// Error that occurred during the run phase of a component.
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
RunError {
|
RunError { run: anyhow::Error },
|
||||||
run: anyhow::Error
|
|
||||||
},
|
|
||||||
|
|
||||||
/// Error that occurred during the close phase of a component.
|
/// Error that occurred during the close phase of a component.
|
||||||
#[error("component(s) failed during close")]
|
#[error("component(s) failed during close")]
|
||||||
CloseError {
|
CloseError {
|
||||||
#[source]
|
#[source]
|
||||||
close: anyhow::Error
|
close: anyhow::Error,
|
||||||
},
|
},
|
||||||
|
|
||||||
/// Multiple errors from different components.
|
/// Multiple errors from different components.
|
||||||
@@ -180,7 +181,7 @@ impl Display for AggregateError {
|
|||||||
writeln!(f, "{} component errors occurred:", self.errors.len())?;
|
writeln!(f, "{} component errors occurred:", self.errors.len())?;
|
||||||
for (i, error) in self.errors.iter().enumerate() {
|
for (i, error) in self.errors.iter().enumerate() {
|
||||||
write!(f, "\n[Component {}] {}", i + 1, error)?;
|
write!(f, "\n[Component {}] {}", i + 1, error)?;
|
||||||
|
|
||||||
// Print the error chain for each component error
|
// Print the error chain for each component error
|
||||||
let mut source = error.source();
|
let mut source = error.source();
|
||||||
let mut level = 1;
|
let mut level = 1;
|
||||||
@@ -501,11 +502,32 @@ impl Mad {
|
|||||||
|
|
||||||
tracing::debug!(component = name, "mad running");
|
tracing::debug!(component = name, "mad running");
|
||||||
|
|
||||||
|
let handle = tokio::spawn(async move { comp.run(job_cancellation).await });
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = cancellation_token.cancelled() => {
|
_ = cancellation_token.cancelled() => {
|
||||||
error_tx.send(CompletionResult { res: Ok(()) , name }).await
|
error_tx.send(CompletionResult { res: Ok(()) , name }).await
|
||||||
}
|
}
|
||||||
res = comp.run(job_cancellation) => {
|
res = handle => {
|
||||||
|
let res = match res {
|
||||||
|
Ok(res) => res,
|
||||||
|
Err(join) => {
|
||||||
|
match join.source() {
|
||||||
|
Some(error) => {
|
||||||
|
Err(MadError::RunError{run: anyhow::anyhow!("component aborted: {:?}", error)})
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
if join.is_panic(){
|
||||||
|
Err(MadError::RunError { run: anyhow::anyhow!("component panicked: {}", join) })
|
||||||
|
} else {
|
||||||
|
Err(MadError::RunError { run: anyhow::anyhow!("component faced unknown error: {}", join) })
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
error_tx.send(CompletionResult { res , name }).await
|
error_tx.send(CompletionResult { res , name }).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -796,13 +818,13 @@ mod tests {
|
|||||||
.context("failed to read configuration")
|
.context("failed to read configuration")
|
||||||
.context("unable to initialize database")
|
.context("unable to initialize database")
|
||||||
.context("service startup failed");
|
.context("service startup failed");
|
||||||
|
|
||||||
let mad_error = MadError::Inner(error);
|
let mad_error = MadError::Inner(error);
|
||||||
let display = format!("{}", mad_error);
|
let display = format!("{}", mad_error);
|
||||||
|
|
||||||
// Should display the top-level error message
|
// Should display the top-level error message
|
||||||
assert!(display.contains("service startup failed"));
|
assert!(display.contains("service startup failed"));
|
||||||
|
|
||||||
// Test error chain iteration
|
// Test error chain iteration
|
||||||
if let MadError::Inner(ref e) = mad_error {
|
if let MadError::Inner(ref e) = mad_error {
|
||||||
let chain: Vec<String> = e.chain().map(|c| c.to_string()).collect();
|
let chain: Vec<String> = e.chain().map(|c| c.to_string()).collect();
|
||||||
@@ -818,26 +840,26 @@ mod tests {
|
|||||||
fn test_aggregate_error_display() {
|
fn test_aggregate_error_display() {
|
||||||
let error1 = MadError::Inner(
|
let error1 = MadError::Inner(
|
||||||
anyhow::anyhow!("database connection failed")
|
anyhow::anyhow!("database connection failed")
|
||||||
.context("failed to connect to PostgreSQL")
|
.context("failed to connect to PostgreSQL"),
|
||||||
);
|
);
|
||||||
|
|
||||||
let error2 = MadError::Inner(
|
let error2 = MadError::Inner(
|
||||||
anyhow::anyhow!("port already in use")
|
anyhow::anyhow!("port already in use")
|
||||||
.context("failed to bind to port 8080")
|
.context("failed to bind to port 8080")
|
||||||
.context("web server initialization failed")
|
.context("web server initialization failed"),
|
||||||
);
|
);
|
||||||
|
|
||||||
let aggregate = MadError::AggregateError(AggregateError {
|
let aggregate = MadError::AggregateError(AggregateError {
|
||||||
errors: vec![error1, error2],
|
errors: vec![error1, error2],
|
||||||
});
|
});
|
||||||
|
|
||||||
let display = format!("{}", aggregate);
|
let display = format!("{}", aggregate);
|
||||||
|
|
||||||
// Check that it shows multiple errors
|
// Check that it shows multiple errors
|
||||||
assert!(display.contains("2 component errors occurred"));
|
assert!(display.contains("2 component errors occurred"));
|
||||||
assert!(display.contains("[Component 1]"));
|
assert!(display.contains("[Component 1]"));
|
||||||
assert!(display.contains("[Component 2]"));
|
assert!(display.contains("[Component 2]"));
|
||||||
|
|
||||||
// Check that context chains are displayed
|
// Check that context chains are displayed
|
||||||
assert!(display.contains("failed to connect to PostgreSQL"));
|
assert!(display.contains("failed to connect to PostgreSQL"));
|
||||||
assert!(display.contains("database connection failed"));
|
assert!(display.contains("database connection failed"));
|
||||||
@@ -852,7 +874,7 @@ mod tests {
|
|||||||
let aggregate = AggregateError {
|
let aggregate = AggregateError {
|
||||||
errors: vec![error],
|
errors: vec![error],
|
||||||
};
|
};
|
||||||
|
|
||||||
let display = format!("{}", aggregate);
|
let display = format!("{}", aggregate);
|
||||||
// Single error should be displayed directly
|
// Single error should be displayed directly
|
||||||
assert!(display.contains("single error"));
|
assert!(display.contains("single error"));
|
||||||
@@ -864,9 +886,9 @@ mod tests {
|
|||||||
let error = MadError::Inner(
|
let error = MadError::Inner(
|
||||||
anyhow::anyhow!("root cause")
|
anyhow::anyhow!("root cause")
|
||||||
.context("middle layer")
|
.context("middle layer")
|
||||||
.context("top layer")
|
.context("top layer"),
|
||||||
);
|
);
|
||||||
|
|
||||||
// Test that we can access the error chain
|
// Test that we can access the error chain
|
||||||
if let MadError::Inner(ref e) = error {
|
if let MadError::Inner(ref e) = error {
|
||||||
let chain: Vec<String> = e.chain().map(|c| c.to_string()).collect();
|
let chain: Vec<String> = e.chain().map(|c| c.to_string()).collect();
|
||||||
@@ -882,13 +904,13 @@ mod tests {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_component_error_propagation() {
|
async fn test_component_error_propagation() {
|
||||||
struct FailingComponent;
|
struct FailingComponent;
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl Component for FailingComponent {
|
impl Component for FailingComponent {
|
||||||
fn name(&self) -> Option<String> {
|
fn name(&self) -> Option<String> {
|
||||||
Some("test-component".to_string())
|
Some("test-component".to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run(&self, _cancel: CancellationToken) -> Result<(), MadError> {
|
async fn run(&self, _cancel: CancellationToken) -> Result<(), MadError> {
|
||||||
Err(anyhow::anyhow!("IO error")
|
Err(anyhow::anyhow!("IO error")
|
||||||
.context("failed to open file")
|
.context("failed to open file")
|
||||||
@@ -896,16 +918,16 @@ mod tests {
|
|||||||
.into())
|
.into())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let result = Mad::builder()
|
let result = Mad::builder()
|
||||||
.add(FailingComponent)
|
.add(FailingComponent)
|
||||||
.cancellation(Some(std::time::Duration::from_millis(100)))
|
.cancellation(Some(std::time::Duration::from_millis(100)))
|
||||||
.run()
|
.run()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
assert!(result.is_err());
|
assert!(result.is_err());
|
||||||
let error = result.unwrap_err();
|
let error = result.unwrap_err();
|
||||||
|
|
||||||
// Check error display
|
// Check error display
|
||||||
let display = format!("{}", error);
|
let display = format!("{}", error);
|
||||||
assert!(display.contains("component initialization failed"));
|
assert!(display.contains("component initialization failed"));
|
||||||
|
|||||||
@@ -138,6 +138,30 @@ async fn test_can_shutdown_gracefully() -> anyhow::Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
#[traced_test]
|
||||||
|
async fn test_component_panics_shutdowns_cleanly() -> anyhow::Result<()> {
|
||||||
|
let res = Mad::builder()
|
||||||
|
.add_fn({
|
||||||
|
move |_cancel| async move {
|
||||||
|
panic!("my inner panic");
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.add_fn(|cancel| async move {
|
||||||
|
cancel.cancelled().await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
.run()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let err_content = res.unwrap_err().to_string();
|
||||||
|
assert!(err_content.contains("component panicked"));
|
||||||
|
assert!(err_content.contains("my inner panic"));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_can_easily_transform_error() -> anyhow::Result<()> {
|
fn test_can_easily_transform_error() -> anyhow::Result<()> {
|
||||||
fn fallible() -> anyhow::Result<()> {
|
fn fallible() -> anyhow::Result<()> {
|
||||||
|
|||||||
Reference in New Issue
Block a user