diff --git a/src/offramp.rs b/src/offramp.rs index fe0f4e6ab2..6eb5dcb422 100644 --- a/src/offramp.rs +++ b/src/offramp.rs @@ -34,6 +34,7 @@ use std::borrow::{Borrow, Cow}; use std::fmt; use tremor_common::time::nanotime; +#[derive(Debug)] pub enum Msg { Event { event: Event, diff --git a/src/pipeline.rs b/src/pipeline.rs index 238f3a5ca0..34bfb7fabd 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -302,7 +302,7 @@ async fn pipeline_task( } Err(e) => { let err_str = if let PipelineErrorKind::Script(script_kind) = e.0 { - let script_error: tremor_script::errors::Error = script_kind.into(); + let script_error = tremor_script::errors::Error(script_kind, e.1); // possibly a hygienic error pipeline .source @@ -315,13 +315,27 @@ async fn pipeline_task( } else { format!(" {:?}", e) }; - error!("Error:{}", err_str); + error!("Error handling event:{}", err_str); } } } M::F(Msg::Signal(signal)) => { if let Err(e) = pipeline.enqueue_signal(signal.clone(), &mut eventset) { - error!("error: {:?}", e) + let err_str = if let PipelineErrorKind::Script(script_kind) = e.0 { + let script_error = tremor_script::errors::Error(script_kind, e.1); + // possibly a hygienic error + pipeline + .source + .as_ref() + .and_then(|s| script_error.locate_in_source(s)) + .map_or_else( + || format!(" {:?}", script_error), + |located| format!("\n{}", located), + ) // add a newline to have the error nicely formatted in the log + } else { + format!(" {:?}", e) + }; + error!("Error handling signal:{}", err_str); } else { maybe_send(send_signal(&id, signal, &mut dests).await); handle_insights(&mut pipeline, &onramps).await; @@ -451,3 +465,95 @@ impl Manager { Ok(Addr::new(tx, cf_tx, mgmt_tx, req.id)) } } + +#[cfg(test)] +mod tests { + use super::*; + use tremor_pipeline::FN_REGISTRY; + + use simd_json::prelude::*; + use tremor_script::Value; + use tremor_script::{path::ModulePath, query::Query}; + + #[async_std::test] + async fn test_pipeline_event_error() -> Result<()> { + let module_path = ModulePath { mounts: vec![] }; + let query = r#" + select event.non_existent + from in + into out; + "#; + let aggr_reg: tremor_script::registry::Aggr = tremor_script::aggr_registry(); + let q = Query::parse( + &module_path, + "manager_error_test.trickle", + query, + vec![], + &*FN_REGISTRY.lock()?, + &aggr_reg, + )?; + let config = tremor_pipeline::query::Query(q); + let id = TremorURL::parse("/pipeline/manager_error_test/instance")?; + let manager = Manager::new(12); + let (handle, sender) = manager.start(); + let (tx, rx) = async_channel::bounded(1); + let create = Create { config, id }; + let create_msg = ManagerMsg::Create(tx, create); + sender.send(create_msg).await?; + let addr = rx.recv().await??; + let (offramp_tx, offramp_rx) = async_channel::unbounded(); + let offramp_url = TremorURL::parse("/offramp/fake_offramp/instance/in")?; + // connect a channel so we can receive events from the back of the pipeline :) + addr.send_mgmt(MgmtMsg::ConnectOfframp( + "out".into(), + offramp_url, + offramp_tx, + )) + .await?; + + // sending an event, that triggers an error + addr.send(Msg::Event { + event: Event::default(), + input: "in".into(), + }) + .await?; + + match offramp_rx.try_recv() { + Err(async_channel::TryRecvError::Empty) => (), // ok + _ => return Err("Expected no msg at out fake offramp!".into()), + }; + + // send a second event, that gets through + let mut second_event = Event::default(); + let mut obj = Value::object_with_capacity(1); + obj.insert("non_existent", Value::from(true))?; + second_event.data = obj.into(); + addr.send(Msg::Event { + event: second_event, + input: "in".into(), + }) + .await?; + + loop { + println!("checking fake offramp for event"); + match offramp_rx.recv().await { + Ok(offramp::Msg::Event { event, .. }) => { + println!("{:?}", event); + let (value, _meta) = event.data.suffix().clone().into_parts(); + assert_eq!(Value::from(true), value); // check that we received what we sent in, not the faulty event + break; + } + Ok(offramp::Msg::Signal(_)) => { + // ignore + continue; + } + _ => return Err("Expected 1 msg at out fake offramp!".into()), + }; + } + + // stopping the manager + sender.send(ManagerMsg::Stop).await?; + handle.cancel().await; + Ok(()) + } +}