Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Output more helpful error messages for pipeline runtime errors #629

Merged
merged 11 commits into from
Dec 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,19 @@
}
},
"args": [
//server run -f temp/sample.yaml temp/sample.trickle -l temp/logger.yaml
//"server",
"run",
"temp/sample.trickle",
//"tests/scripts/heredoc_interpolation_quotes/script.tremor"
//"-f",
//"temp/sample.yaml",
//"temp/sample.trickle",
//"-l",
//"temp/logger.yaml",
//"tremor-cli/tests/cli/data/script_with_error.tremor",
//"-i",
//"tremor-cli/tests/cli/data/input.json",
"temp/end.tremor",
//"tests/script_errors/lexer_invalid_float/script.tremor"
//"-f",
//"../tremor-www-docs/docs/workshop/examples/35_reverse_proxy_load_balancing/etc/tremor/config/config.yaml",
// "../tremor-www-docs/docs/workshop/examples/35_reverse_proxy_load_balancing/etc/tremor/config/request_handling.trickle",
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@

## Unreleased (in branch main)

### Fixes

* Allow to express minimal value of i64 as int literal [#629](https://github.com/tremor-rs/tremor-runtime/pull/629)
* Fix scientific float literals (e.g. `1.0e-5`) [#629](https://github.com/tremor-rs/tremor-runtime/pull/629)
* Output errors to stderr on `tremor run` invocation. [#629](https://github.com/tremor-rs/tremor-runtime/pull/629)
* Output more helpful errors for runtime errors (like accessing non-existing fields) [#629](https://github.com/tremor-rs/tremor-runtime/pull/629)
* Fix gelf preprocessor to accept valid (unchunked) gelf messages [#628](https://github.com/tremor-rs/tremor-runtime/pull/628)
* Fix memory access issue for large objects


## 0.9.2

### New Features
Expand Down
54 changes: 53 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/offramp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use std::borrow::{Borrow, Cow};
use std::fmt;
use tremor_common::time::nanotime;

#[derive(Debug)]
pub enum Msg {
Event {
event: Event,
Expand Down Expand Up @@ -212,7 +213,7 @@ impl Manager {
)
.await
{
error!("Failed to create onramp {}: {}", id, e);
error!("Failed to create offramp {}: {}", id, e);
return Err(e);
}
// merge channels and prioritize contraflow/insight events
Expand Down
128 changes: 126 additions & 2 deletions src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::borrow::Cow;
use std::fmt;
use std::time::Duration;
use tremor_common::time::nanotime;
use tremor_pipeline::errors::ErrorKind as PipelineErrorKind;
use tremor_pipeline::{CBAction, Event, ExecutableGraph, SignalKind};

const TICK_MS: u64 = 100;
Expand Down Expand Up @@ -263,6 +264,7 @@ fn maybe_send(r: Result<()>) {
}
}

#[allow(clippy::too_many_lines)]
async fn pipeline_task(
id: TremorURL,
mut pipeline: ExecutableGraph,
Expand Down Expand Up @@ -298,12 +300,42 @@ async fn pipeline_task(
handle_insights(&mut pipeline, &onramps).await;
maybe_send(send_events(&mut eventset, &mut dests).await);
}
Err(e) => error!("error: {:?}", e),
Err(e) => {
let err_str = if let PipelineErrorKind::Script(script_kind) = e.0 {
let script_error = tremor_script::errors::Error(script_kind, e.1);
// possibly a hygienic error
pipeline
.source
.as_ref()
.and_then(|s| script_error.locate_in_source(s))
.map_or_else(
|| format!(" {:?}", script_error),
|located| format!("\n{}", located),
) // add a newline to have the error nicely formatted in the log
} else {
format!(" {:?}", e)
};
error!("Error handling event:{}", err_str);
}
}
}
M::F(Msg::Signal(signal)) => {
if let Err(e) = pipeline.enqueue_signal(signal.clone(), &mut eventset) {
error!("error: {:?}", e)
let err_str = if let PipelineErrorKind::Script(script_kind) = e.0 {
let script_error = tremor_script::errors::Error(script_kind, e.1);
// possibly a hygienic error
pipeline
.source
.as_ref()
.and_then(|s| script_error.locate_in_source(s))
.map_or_else(
|| format!(" {:?}", script_error),
|located| format!("\n{}", located),
) // add a newline to have the error nicely formatted in the log
} else {
format!(" {:?}", e)
};
error!("Error handling signal:{}", err_str);
} else {
maybe_send(send_signal(&id, signal, &mut dests).await);
handle_insights(&mut pipeline, &onramps).await;
Expand Down Expand Up @@ -433,3 +465,95 @@ impl Manager {
Ok(Addr::new(tx, cf_tx, mgmt_tx, req.id))
}
}

#[cfg(test)]
mod tests {
use super::*;
use tremor_pipeline::FN_REGISTRY;

use simd_json::prelude::*;
use tremor_script::Value;
use tremor_script::{path::ModulePath, query::Query};

#[async_std::test]
async fn test_pipeline_event_error() -> Result<()> {
let module_path = ModulePath { mounts: vec![] };
let query = r#"
select event.non_existent
from in
into out;
"#;
let aggr_reg: tremor_script::registry::Aggr = tremor_script::aggr_registry();
let q = Query::parse(
&module_path,
"manager_error_test.trickle",
query,
vec![],
&*FN_REGISTRY.lock()?,
&aggr_reg,
)?;
let config = tremor_pipeline::query::Query(q);
let id = TremorURL::parse("/pipeline/manager_error_test/instance")?;
let manager = Manager::new(12);
let (handle, sender) = manager.start();
let (tx, rx) = async_channel::bounded(1);
let create = Create { config, id };
let create_msg = ManagerMsg::Create(tx, create);
sender.send(create_msg).await?;
let addr = rx.recv().await??;
let (offramp_tx, offramp_rx) = async_channel::unbounded();
let offramp_url = TremorURL::parse("/offramp/fake_offramp/instance/in")?;
// connect a channel so we can receive events from the back of the pipeline :)
addr.send_mgmt(MgmtMsg::ConnectOfframp(
"out".into(),
offramp_url,
offramp_tx,
))
.await?;

// sending an event, that triggers an error
addr.send(Msg::Event {
event: Event::default(),
input: "in".into(),
})
.await?;

match offramp_rx.try_recv() {
Err(async_channel::TryRecvError::Empty) => (), // ok
_ => return Err("Expected no msg at out fake offramp!".into()),
};

// send a second event, that gets through
let mut second_event = Event::default();
let mut obj = Value::object_with_capacity(1);
obj.insert("non_existent", Value::from(true))?;
second_event.data = obj.into();
addr.send(Msg::Event {
event: second_event,
input: "in".into(),
})
.await?;

loop {
println!("checking fake offramp for event");
match offramp_rx.recv().await {
Ok(offramp::Msg::Event { event, .. }) => {
println!("{:?}", event);
let (value, _meta) = event.data.suffix().clone().into_parts();
assert_eq!(Value::from(true), value); // check that we received what we sent in, not the faulty event
break;
}
Ok(offramp::Msg::Signal(_)) => {
// ignore
continue;
}
_ => return Err("Expected 1 msg at out fake offramp!".into()),
};
}

// stopping the manager
sender.send(ManagerMsg::Stop).await?;
handle.cancel().await;
Ok(())
}
}
5 changes: 5 additions & 0 deletions tests/script_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ test_cases!(
pp_cyclic,
pp_nest_cyclic,
//INSERT
lexer_invalid_hex2,
lexer_invalid_int_invalid_char,
lexer_invalid_hex,
lexer_invalid_float,
lexer_invalid_int,
string_interpolation_eof,
string_interpolation_invalid_utf8,
string_interpolation_escape,
Expand Down
3 changes: 3 additions & 0 deletions tests/script_errors/lexer_invalid_float/error.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Error:
1 | 12.0_5
| ^ An unexpected character '_' was found
1 change: 1 addition & 0 deletions tests/script_errors/lexer_invalid_float/script.tremor
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
12.0_5
3 changes: 3 additions & 0 deletions tests/script_errors/lexer_invalid_hex/error.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Error:
1 | let invalid = 0x;
| ^^^ An invalid hexadecimal
1 change: 1 addition & 0 deletions tests/script_errors/lexer_invalid_hex/script.tremor
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
let invalid = 0x;
3 changes: 3 additions & 0 deletions tests/script_errors/lexer_invalid_hex2/error.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Error:
1 | -0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ An invalid hexadecimal
1 change: 1 addition & 0 deletions tests/script_errors/lexer_invalid_hex2/script.tremor
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
3 changes: 3 additions & 0 deletions tests/script_errors/lexer_invalid_int/error.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Error:
1 | -161390617380431786853494948250188242145606612051826469551916209783790476376052574664352834580008614464743948248296718335
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ An invalid integer literal
1 change: 1 addition & 0 deletions tests/script_errors/lexer_invalid_int/script.tremor
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-161390617380431786853494948250188242145606612051826469551916209783790476376052574664352834580008614464743948248296718335
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Error:
1 | 12345_
| ^ An unexpected character '_' was found
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
12345_
13 changes: 10 additions & 3 deletions tremor-cli/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use crate::errors::{Error, Result};
use std::io::{BufRead, BufReader, Read, Write};
use tremor_common::file;

use std::collections::HashMap;

use std::path::Path;
use std::process::{Command, Stdio};
use std::sync::mpsc::TryRecvError;
Expand Down Expand Up @@ -83,14 +85,19 @@ pub(crate) struct TargetProcess {
}

impl TargetProcess {
pub fn new_with_stderr(cmd: &str, args: &[String]) -> Result<Self> {
TargetProcess::new(cmd, args)
pub fn new_with_stderr(
cmd: &str,
args: &[String],
env: &HashMap<String, String>,
) -> Result<Self> {
TargetProcess::new(cmd, args, env)
}

/// Spawn target process and pipe IO
fn new(cmd: &str, args: &[String]) -> Result<Self> {
fn new(cmd: &str, args: &[String], env: &HashMap<String, String>) -> Result<Self> {
let mut target_cmd = Command::new(cmd)
.args(args)
.envs(env)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
Expand Down
Loading