Skip to content

Commit

Permalink
Add test for pipeline manager and event error handling.
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Wahl <mwahl@wayfair.com>
  • Loading branch information
Matthias Wahl authored and Licenser committed Dec 3, 2020
1 parent 05a2274 commit 0b4f20a
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 3 deletions.
1 change: 1 addition & 0 deletions src/offramp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use std::borrow::{Borrow, Cow};
use std::fmt;
use tremor_common::time::nanotime;

#[derive(Debug)]
pub enum Msg {
Event {
event: Event,
Expand Down
112 changes: 109 additions & 3 deletions src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ async fn pipeline_task(
}
Err(e) => {
let err_str = if let PipelineErrorKind::Script(script_kind) = e.0 {
let script_error: tremor_script::errors::Error = script_kind.into();
let script_error = tremor_script::errors::Error(script_kind, e.1);
// possibly a hygienic error
pipeline
.source
Expand All @@ -315,13 +315,27 @@ async fn pipeline_task(
} else {
format!(" {:?}", e)
};
error!("Error:{}", err_str);
error!("Error handling event:{}", err_str);
}
}
}
M::F(Msg::Signal(signal)) => {
if let Err(e) = pipeline.enqueue_signal(signal.clone(), &mut eventset) {
error!("error: {:?}", e)
let err_str = if let PipelineErrorKind::Script(script_kind) = e.0 {
let script_error = tremor_script::errors::Error(script_kind, e.1);
// possibly a hygienic error
pipeline
.source
.as_ref()
.and_then(|s| script_error.locate_in_source(s))
.map_or_else(
|| format!(" {:?}", script_error),
|located| format!("\n{}", located),
) // add a newline to have the error nicely formatted in the log
} else {
format!(" {:?}", e)
};
error!("Error handling signal:{}", err_str);
} else {
maybe_send(send_signal(&id, signal, &mut dests).await);
handle_insights(&mut pipeline, &onramps).await;
Expand Down Expand Up @@ -451,3 +465,95 @@ impl Manager {
Ok(Addr::new(tx, cf_tx, mgmt_tx, req.id))
}
}

#[cfg(test)]
mod tests {
use super::*;
use tremor_pipeline::FN_REGISTRY;

use simd_json::prelude::*;
use tremor_script::Value;
use tremor_script::{path::ModulePath, query::Query};

#[async_std::test]
async fn test_pipeline_event_error() -> Result<()> {
let module_path = ModulePath { mounts: vec![] };
let query = r#"
select event.non_existent
from in
into out;
"#;
let aggr_reg: tremor_script::registry::Aggr = tremor_script::aggr_registry();
let q = Query::parse(
&module_path,
"manager_error_test.trickle",
query,
vec![],
&*FN_REGISTRY.lock()?,
&aggr_reg,
)?;
let config = tremor_pipeline::query::Query(q);
let id = TremorURL::parse("/pipeline/manager_error_test/instance")?;
let manager = Manager::new(12);
let (handle, sender) = manager.start();
let (tx, rx) = async_channel::bounded(1);
let create = Create { config, id };
let create_msg = ManagerMsg::Create(tx, create);
sender.send(create_msg).await?;
let addr = rx.recv().await??;
let (offramp_tx, offramp_rx) = async_channel::unbounded();
let offramp_url = TremorURL::parse("/offramp/fake_offramp/instance/in")?;
// connect a channel so we can receive events from the back of the pipeline :)
addr.send_mgmt(MgmtMsg::ConnectOfframp(
"out".into(),
offramp_url,
offramp_tx,
))
.await?;

// sending an event, that triggers an error
addr.send(Msg::Event {
event: Event::default(),
input: "in".into(),
})
.await?;

match offramp_rx.try_recv() {
Err(async_channel::TryRecvError::Empty) => (), // ok
_ => return Err("Expected no msg at out fake offramp!".into()),
};

// send a second event, that gets through
let mut second_event = Event::default();
let mut obj = Value::object_with_capacity(1);
obj.insert("non_existent", Value::from(true))?;
second_event.data = obj.into();
addr.send(Msg::Event {
event: second_event,
input: "in".into(),
})
.await?;

loop {
println!("checking fake offramp for event");
match offramp_rx.recv().await {
Ok(offramp::Msg::Event { event, .. }) => {
println!("{:?}", event);
let (value, _meta) = event.data.suffix().clone().into_parts();
assert_eq!(Value::from(true), value); // check that we received what we sent in, not the faulty event
break;
}
Ok(offramp::Msg::Signal(_)) => {
// ignore
continue;
}
_ => return Err("Expected 1 msg at out fake offramp!".into()),
};
}

// stopping the manager
sender.send(ManagerMsg::Stop).await?;
handle.cancel().await;
Ok(())
}
}

0 comments on commit 0b4f20a

Please sign in to comment.