Skip to content

Commit

Permalink
Merge 7483454 into a3715d9
Browse files Browse the repository at this point in the history
  • Loading branch information
mfelsche authored Dec 3, 2020
2 parents a3715d9 + 7483454 commit 701ebff
Show file tree
Hide file tree
Showing 39 changed files with 614 additions and 51 deletions.
13 changes: 11 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,19 @@
}
},
"args": [
//server run -f temp/sample.yaml temp/sample.trickle -l temp/logger.yaml
//"server",
"run",
"temp/sample.trickle",
//"tests/scripts/heredoc_interpolation_quotes/script.tremor"
//"-f",
//"temp/sample.yaml",
//"temp/sample.trickle",
//"-l",
//"temp/logger.yaml",
//"tremor-cli/tests/cli/data/script_with_error.tremor",
//"-i",
//"tremor-cli/tests/cli/data/input.json",
"temp/end.tremor",
//"tests/script_errors/lexer_invalid_float/script.tremor"
//"-f",
//"../tremor-www-docs/docs/workshop/examples/35_reverse_proxy_load_balancing/etc/tremor/config/config.yaml",
// "../tremor-www-docs/docs/workshop/examples/35_reverse_proxy_load_balancing/etc/tremor/config/request_handling.trickle",
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@

## Unreleased (in branch main)

### Fixes

* Allow to express minimal value of i64 as int literal [#629](https://github.com/tremor-rs/tremor-runtime/pull/629)
* Fix scientific float literals (e.g. `1.0e-5`) [#629](https://github.com/tremor-rs/tremor-runtime/pull/629)
* Output errors to stderr on `tremor run` invocation. [#629](https://github.com/tremor-rs/tremor-runtime/pull/629)
* Output more helpful errors for runtime errors (like accessing non-existing fields) [#629](https://github.com/tremor-rs/tremor-runtime/pull/629)
* Fix gelf preprocessor to accept valid (unchunked) gelf messages [#628](https://github.com/tremor-rs/tremor-runtime/pull/628)
* Fix memory access issue for large objects


## 0.9.2

### New Features
Expand Down
54 changes: 53 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/offramp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use std::borrow::{Borrow, Cow};
use std::fmt;
use tremor_common::time::nanotime;

#[derive(Debug)]
pub enum Msg {
Event {
event: Event,
Expand Down Expand Up @@ -212,7 +213,7 @@ impl Manager {
)
.await
{
error!("Failed to create onramp {}: {}", id, e);
error!("Failed to create offramp {}: {}", id, e);
return Err(e);
}
// merge channels and prioritize contraflow/insight events
Expand Down
128 changes: 126 additions & 2 deletions src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::borrow::Cow;
use std::fmt;
use std::time::Duration;
use tremor_common::time::nanotime;
use tremor_pipeline::errors::ErrorKind as PipelineErrorKind;
use tremor_pipeline::{CBAction, Event, ExecutableGraph, SignalKind};

const TICK_MS: u64 = 100;
Expand Down Expand Up @@ -263,6 +264,7 @@ fn maybe_send(r: Result<()>) {
}
}

#[allow(clippy::too_many_lines)]
async fn pipeline_task(
id: TremorURL,
mut pipeline: ExecutableGraph,
Expand Down Expand Up @@ -298,12 +300,42 @@ async fn pipeline_task(
handle_insights(&mut pipeline, &onramps).await;
maybe_send(send_events(&mut eventset, &mut dests).await);
}
Err(e) => error!("error: {:?}", e),
Err(e) => {
let err_str = if let PipelineErrorKind::Script(script_kind) = e.0 {
let script_error = tremor_script::errors::Error(script_kind, e.1);
// possibly a hygienic error
pipeline
.source
.as_ref()
.and_then(|s| script_error.locate_in_source(s))
.map_or_else(
|| format!(" {:?}", script_error),
|located| format!("\n{}", located),
) // add a newline to have the error nicely formatted in the log
} else {
format!(" {:?}", e)
};
error!("Error handling event:{}", err_str);
}
}
}
M::F(Msg::Signal(signal)) => {
if let Err(e) = pipeline.enqueue_signal(signal.clone(), &mut eventset) {
error!("error: {:?}", e)
let err_str = if let PipelineErrorKind::Script(script_kind) = e.0 {
let script_error = tremor_script::errors::Error(script_kind, e.1);
// possibly a hygienic error
pipeline
.source
.as_ref()
.and_then(|s| script_error.locate_in_source(s))
.map_or_else(
|| format!(" {:?}", script_error),
|located| format!("\n{}", located),
) // add a newline to have the error nicely formatted in the log
} else {
format!(" {:?}", e)
};
error!("Error handling signal:{}", err_str);
} else {
maybe_send(send_signal(&id, signal, &mut dests).await);
handle_insights(&mut pipeline, &onramps).await;
Expand Down Expand Up @@ -433,3 +465,95 @@ impl Manager {
Ok(Addr::new(tx, cf_tx, mgmt_tx, req.id))
}
}

#[cfg(test)]
mod tests {
use super::*;
use tremor_pipeline::FN_REGISTRY;

use simd_json::prelude::*;
use tremor_script::Value;
use tremor_script::{path::ModulePath, query::Query};

#[async_std::test]
async fn test_pipeline_event_error() -> Result<()> {
let module_path = ModulePath { mounts: vec![] };
let query = r#"
select event.non_existent
from in
into out;
"#;
let aggr_reg: tremor_script::registry::Aggr = tremor_script::aggr_registry();
let q = Query::parse(
&module_path,
"manager_error_test.trickle",
query,
vec![],
&*FN_REGISTRY.lock()?,
&aggr_reg,
)?;
let config = tremor_pipeline::query::Query(q);
let id = TremorURL::parse("/pipeline/manager_error_test/instance")?;
let manager = Manager::new(12);
let (handle, sender) = manager.start();
let (tx, rx) = async_channel::bounded(1);
let create = Create { config, id };
let create_msg = ManagerMsg::Create(tx, create);
sender.send(create_msg).await?;
let addr = rx.recv().await??;
let (offramp_tx, offramp_rx) = async_channel::unbounded();
let offramp_url = TremorURL::parse("/offramp/fake_offramp/instance/in")?;
// connect a channel so we can receive events from the back of the pipeline :)
addr.send_mgmt(MgmtMsg::ConnectOfframp(
"out".into(),
offramp_url,
offramp_tx,
))
.await?;

// sending an event, that triggers an error
addr.send(Msg::Event {
event: Event::default(),
input: "in".into(),
})
.await?;

match offramp_rx.try_recv() {
Err(async_channel::TryRecvError::Empty) => (), // ok
_ => return Err("Expected no msg at out fake offramp!".into()),
};

// send a second event, that gets through
let mut second_event = Event::default();
let mut obj = Value::object_with_capacity(1);
obj.insert("non_existent", Value::from(true))?;
second_event.data = obj.into();
addr.send(Msg::Event {
event: second_event,
input: "in".into(),
})
.await?;

loop {
println!("checking fake offramp for event");
match offramp_rx.recv().await {
Ok(offramp::Msg::Event { event, .. }) => {
println!("{:?}", event);
let (value, _meta) = event.data.suffix().clone().into_parts();
assert_eq!(Value::from(true), value); // check that we received what we sent in, not the faulty event
break;
}
Ok(offramp::Msg::Signal(_)) => {
// ignore
continue;
}
_ => return Err("Expected 1 msg at out fake offramp!".into()),
};
}

// stopping the manager
sender.send(ManagerMsg::Stop).await?;
handle.cancel().await;
Ok(())
}
}
5 changes: 5 additions & 0 deletions tests/script_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ test_cases!(
pp_cyclic,
pp_nest_cyclic,
//INSERT
lexer_invalid_hex2,
lexer_invalid_int_invalid_char,
lexer_invalid_hex,
lexer_invalid_float,
lexer_invalid_int,
string_interpolation_eof,
string_interpolation_invalid_utf8,
string_interpolation_escape,
Expand Down
3 changes: 3 additions & 0 deletions tests/script_errors/lexer_invalid_float/error.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Error:
1 | 12.0_5
| ^ An unexpected character '_' was found
1 change: 1 addition & 0 deletions tests/script_errors/lexer_invalid_float/script.tremor
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
12.0_5
3 changes: 3 additions & 0 deletions tests/script_errors/lexer_invalid_hex/error.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Error:
1 | let invalid = 0x;
| ^^^ An invalid hexadecimal
1 change: 1 addition & 0 deletions tests/script_errors/lexer_invalid_hex/script.tremor
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
let invalid = 0x;
3 changes: 3 additions & 0 deletions tests/script_errors/lexer_invalid_hex2/error.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Error:
1 | -0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ An invalid hexadecimal
1 change: 1 addition & 0 deletions tests/script_errors/lexer_invalid_hex2/script.tremor
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
3 changes: 3 additions & 0 deletions tests/script_errors/lexer_invalid_int/error.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Error:
1 | -161390617380431786853494948250188242145606612051826469551916209783790476376052574664352834580008614464743948248296718335
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ An invalid integer literal
1 change: 1 addition & 0 deletions tests/script_errors/lexer_invalid_int/script.tremor
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-161390617380431786853494948250188242145606612051826469551916209783790476376052574664352834580008614464743948248296718335
3 changes: 3 additions & 0 deletions tests/script_errors/lexer_invalid_int_invalid_char/error.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Error:
1 | 12345_
| ^ An unexpected character '_' was found
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
12345_
13 changes: 10 additions & 3 deletions tremor-cli/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use crate::errors::{Error, Result};
use std::io::{BufRead, BufReader, Read, Write};
use tremor_common::file;

use std::collections::HashMap;

use std::path::Path;
use std::process::{Command, Stdio};
use std::sync::mpsc::TryRecvError;
Expand Down Expand Up @@ -83,14 +85,19 @@ pub(crate) struct TargetProcess {
}

impl TargetProcess {
pub fn new_with_stderr(cmd: &str, args: &[String]) -> Result<Self> {
TargetProcess::new(cmd, args)
pub fn new_with_stderr(
cmd: &str,
args: &[String],
env: &HashMap<String, String>,
) -> Result<Self> {
TargetProcess::new(cmd, args, env)
}

/// Spawn target process and pipe IO
fn new(cmd: &str, args: &[String]) -> Result<Self> {
fn new(cmd: &str, args: &[String], env: &HashMap<String, String>) -> Result<Self> {
let mut target_cmd = Command::new(cmd)
.args(args)
.envs(env)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
Expand Down
Loading

0 comments on commit 701ebff

Please sign in to comment.