Skip to content
This repository has been archived by the owner on Sep 16, 2021. It is now read-only.

Adapt to new select target restrictions #89

Merged
merged 3 commits into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions docs/tremor-query/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ Tremor supports tumbling windows by number of events or by time.
General configuration Parameters:

* `eviction_period`: duration in nanoseconds without events arriving, after which to evict / remove the current window data for a single group.
* `max_groups`: maximum number of groups to maintain simultaneously in memory. Groups added beyond that number will be ignored. Per default, tremor allows as much groups as will fit into memory.

Each select statement maintains the groups for the current windows in an in memory data-structure. This contains the group values as well as the aggregate states.
If your grouping values possibly have a very high cardinality it is possible to end up with runaway memory growth, as per default the group data structures won't be evicted,
unless `eviction_period` is set. Old groups will be discarded after `2 x eviction_period` if no event for those groups arrived.
To configure an upper bound on the number of groups that should be maintained simultaneously for a window, set `max_groups`.
This will help avoid unbounded memory growth, especially when using `emit_empty_windows` on time based windows.

##### Windows based on number of events

Expand All @@ -146,6 +153,7 @@ independent from event flow with a granularity of `100ms`. It is thus possible t
Configuration Parameters:

- `interval`: Time interval in nanoseconds after which the window closes.
- `emit_empty_windows` - By default, time based windows will only emit, if events arrived. By configuring `emit_empty_windows` as `true` this window will emit every `interval`, regardless if events arrived or not. If you use this in a `group by` query and the cardinality is likely huge, consider using `max_groups` and `eviction_period` to avoid runaway memory growth such a window will one event per interval and group for which we've seen events before.


Window definition grammar:
Expand Down Expand Up @@ -272,12 +280,14 @@ select event from in into out;
```

Select operations can filter ingested data with the specification of a `where` clause. The clause forms a predicate check on the inbound events before any further processing takes place.
That means the `event` available to the `where` clause is the unprocessed inbound event from the input stream (`in` in this case):

```trickle
select event from in where event.is_interesting into out;
```

Select operations can filter data being forwarded to other operators with the specification of a `having` clause. The clause forms a predicate check on outbound synthetic events after any other processing has taken place.
That means the `event` available to the `having` clause is the result of evaluating the `select` target clause (the expression between `select` and `from`).

```trickle
select event from in into out having event.is_interesting;
Expand All @@ -291,11 +301,12 @@ with
interval = datetime::with_seconds(15),
end;

select { "count": aggr::stats::count(event) } from in[fifteen_secs] into out having event.count > 0;
select { "count": aggr::stats::count() } from in[fifteen_secs] into out having event.count > 0;
```

In the above operation, we emit a synthetic count every fifteen seconds if at least one event has been witnessed during a 15 second window of time.


Select operations can be grouped through defining a `group by` clause.

```trickle
Expand All @@ -304,7 +315,7 @@ with
interval = datetime::with_seconds(15),
end;

select { "count": aggr::stats::count(event) }
select { "count": aggr::stats::count() }
from in[fifteen_secs]
group by set(event.partition)
into out
Expand All @@ -314,3 +325,26 @@ having event.count > 0;
In the above operation, we partition the ingested events into groups defined by a required `event.partition` data field on the inbound event. Each of these groups maintains an independent fifteen second tumbling window, and each window upon closing gates outbound synthetic events by a count for that group.

The current implementation of `select` allows set-based and each-based grouping. These can be composed concatenatively. However `cube` and `rollup` based grouping dimensions are not currently supported.

In windowed queries any event related data can only be referenced in those two cases:

* it is used as an argument to an aggregate function
* it is used as expression in the `group by` clause

Here is an example of valid and invalid references:

```trickle
define tumbling window my_window
with
size = 12
end;

select {
"last": aggr::win::last(event.other), # ok, inside aggregate function
"foo": event.foo + 1, # ok, used inside aggregate function
"bad": event.other, # NOT OK
"bad_meta": $my_meta, # NOT OK, same rules apply to event metadata
} from in[my_window]
group by set(event.foo, event.bar)
into out;
```
81 changes: 31 additions & 50 deletions docs/tremor-query/walkthrough.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,46 +20,31 @@ The most basic query possible in trickle is
select event from in into out; # A basic passthrough query pipeline
```

This would be configured/programmed as follows in the more verbose
pipeline yaml format:

```yaml
pipeline:
- id: main
interface:
inputs:
- in
outputs:
- out
links:
in: [out]
```

The `event` keyword selected the event from the standard input stream `in`
The `event` keyword selects the event from the standard input stream `in`
and passes it through unchanged to the standard output stream `out`.

In tremor's trickle query langauge queries are compiled and optimised and
In tremor's trickle query language queries are compiled and optimised and
data is streamed though the query. Data can be passed through, transformed,
filtered, aggregated, branched or combined to form continuous stream processing
algorithms.

Like other event processing languages, trickle _inverts_ the relationship between
query and data when compared to normal RDBMS SQL langauges. Instead of running a
dynamic query against static in memory or disk persisted data, we compilse and
query and data when compared to normal RDBMS SQL languages. Instead of running a
dynamic query against static in memory or disk persisted data, we compile and
optimise queries and stream near real-time data through each query.

Data can be ingested from the outside world via the 'in' standard stream.
Data can be produced to the outside world via the 'out' standard stream.
Errors can be processed through a standard 'err' standard stream.
Errors can be processed through the 'err' standard stream.

These three primitives are analgous to the `stdin`, `stdout` and `stderr` streams
These three primitives are analogous to the `stdin`, `stdout` and `stderr` streams
in UNIX-like systems. These can be chained or interconnected via connecting multiple
statements together to form a directed acyclic graph.

We can branch inputs using `where` and `having` clauses as filters to logically
partition streams into independent processing streams.

In the below example we pratition events by their `seq_num` field. If the number
In the below example we partition events by their `seq_num` field. If the number
is even, we branch the corresponding events into a stream named `evens`. If the
number is odd, we branch to a stream named `odds`.

Expand Down Expand Up @@ -87,14 +72,14 @@ We can test this with a json event using the `tremor` command line tool
{ "seq_num": 4, "value": 10, "group": "horse" }
```

Assuming the trickle query is stored in a file called `evenodd.tricle` with the sample event
Assuming the trickle query is stored in a file called `evenodd.trickle` with the sample event
in a file called `data.json`

```bash
$ tremor run evenodd.trickle -i data.json
```

The command line tool will repeatedly inject the event `-i` argument document and we would
The command line tool will inject all events from the file provided by the `-i` argument and we would
expect to see output from the tool as follows:

```bash
Expand All @@ -103,15 +88,11 @@ expect to see output from the tool as follows:

## Scripts and Operators

The query language is _backwards compatible_ with the legacy pipeline format in that the same
operations are available in the old and new syntax. The new structured query syntax is more
flexible and powerful, however.

For example, here's the logic for an entire backpressure algorithm that could be introduced as
a proxy between two systems:
Here's the logic for an entire backpressure algorithm that could be introduced as
a proxy between two systems, implemented by using a builtin operator called [`qos::backpressure`](operators.md#qosbackpressure):

```trickle
define generic::backpressure operator bp
define qos::backpressure operator bp
with
timeout = 10000,
steps = [ 1, 50, 100, 250, 750 ],
Expand All @@ -122,8 +103,8 @@ select event from in into bp;
select event from bp into out;
```

A slightly more complex example that uses both operators and the tremor scripting langauge
with the query langauge all together:
A slightly more complex example that uses both operators and the tremor scripting language
with the query language all together:

```tremor
define grouper::bucket operator kfc;
Expand Down Expand Up @@ -151,12 +132,12 @@ select event from categorize into kfc;
select event from kfc into out;
```

Operators, in the query langauge as in the pipeline format are defined as `<module>::<name>` in the
Operators are defined as `<module>::<name>` in the
context of an operator definition clause. Operators, like script definitions can take arguments.

Definitions in tremor are non-executing. They should be considered as templates or specifications.

In the query langauge, any `define` clause creates specifications, possibly with arguments for
In the query language, any `define` clause creates specifications, possibly with arguments for
specialization. They are typically incarnated via the `create` clause. Anything that is `create`ed
will form a stream or node in the query graph - these _do_ consume memory and participate in a
pipeline query algorithm.
Expand Down Expand Up @@ -221,18 +202,18 @@ select event from b into out;

## Aggregations

A key feature of the tremor query langauge are aggregations. These are supported with:
A key feature of the tremor query language are aggregations. These are supported with:

- Aggregate functions - An aggregate function is a function that runs in the context of a temporal window of events, emitting results intermittently
- Windows - A window is a range of event, clock or data time. There can be many different types of window
- Windows - A window is a range of events, clock or data time. There can be many different types of windows.
- Aggregate functions - An aggregate function is a function that runs in the context of a window of events, emitting results intermittently
- Tilt Frames - A tilt frame is a chain of compatible windows with **decreasing** resolution used to reduce memory pressure and preserve relative accuracy of windowed aggregate functions

An example clock-driven tumbling window:

```
define tumbling window `15secs`
with
interval = datetime::with_seconds(15),
with
interval = core::datetime::with_seconds(15),
end;

select {
Expand All @@ -249,14 +230,14 @@ into out;

To use a window we need to define the window specifications, such as a 15 second clock-based
tumbling window called `15secs` as above. We can then create instances of these windows at runtime by
applying those windows to streams. This is done in the `from` clause in `select` expressions.
applying those windows to streams. This is done in the `from` clause of a `select` statement.

Wherever windows are applied, aggregate functions can be used. In the abvove example, we are calculating
the minimum, maximum, average, standard deviation and variance of a `value` numeric field in data streaming
into the query via the standard input stream.

The query language is not constrained to to clock-driven window definitions. Windows can also be
data-drive or fully programmatic.
The query language is not constrained to clock-driven window definitions. Windows can also be
data-driven or fully programmatic.

A more complete example:

Expand All @@ -279,7 +260,7 @@ into normalize;
```

In the above example we use a single aggregate function called `aggr::stats::hdr` which uses a high dynamic range
or HDR Histogram to compute quartile estimates and basic statistics against a number of dynamic grouping fields
or HDR Histogram to compute quantile estimates and basic statistics against a number of dynamic grouping fields
set by the `group` clause. A group clause effectively partitions our operation by the group expressions provided
by the trickle query programmer. In the example, we're using the field names of the nested 'fields' record on inbound
events to compose a component of a group that is also qualified by tags and a measurement name. The field component
Expand All @@ -295,7 +276,7 @@ each frame without amplifying error - in short, we get the **effect** of summari

## Aggregation Mechanics

The mechanics of aggregation in the query langauge are non-trivial.
The mechanics of aggregation in the query language are non-trivial.

A high level non-normative summary follows.

Expand All @@ -311,7 +292,7 @@ In the illustration above events `1` and `2` in the first window `w0` produce a
Events `3` and `4` in the second window `w1` produce a single synthetic or derivate event `b`
As there is no 6th event in the example illustration, we will _never_ get another synthetic output event

Contrast this with the 10 second or clock-based tumbling window. In tfirst window `w0`s lifetime we capture
Contrast this with the 10 second or clock-based tumbling window. In the first window `w0`s lifetime we capture
all events in the illustration.

### Tilt Frames
Expand All @@ -337,7 +318,7 @@ and memory overhead without the using the tilt-frame mechanism.

## Group Mechanics

The group clause in the query language partitions streams before windows and tilt frames
The `group by` clause in the query language partitions streams before windows and tilt frames
are applied. Groups can be set-based, each-based or composites thereof.

### Set based grouping
Expand Down Expand Up @@ -385,9 +366,9 @@ set and qualified by country ...

### Limitations

There are cases however that currently complex to partitionable with a
single sql expression due to limitations with the grouping clause. For example
what is we wanted to make availability zones a component of our group partitions?
There are cases however that are currently complex to partition with a
single select statement due to limitations with the grouping clause. For example
what if we wanted to make availability zones a component of our group partitions?

How would we structure such a query?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ offramp:
- id: influx-output
type: rest # an influxdb offramp
codec: influx
postprocessors:
- lines
config:
endpoint:
host: influxdb
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ binding:
- id: example # The unique name of this binding template
links:
'/onramp/udp-input/{instance}/out': # Connect the inpunt to the pipeline
- '/pipeline/example/{instance}/in'
- '/pipeline/system::passthrough/{instance}/in'
- '/pipeline/example/{instance}/in'
'/pipeline/system::passthrough/{instance}/out':
- '/offramp/debug2/{instance}/in'
'/pipeline/example/{instance}/out': # Connect the pipeline to the output
- '/offramp/influx-output/{instance}/in'
- '/offramp/influx-output/{instance}/in'
'/pipeline/example/{instance}/err': # Connect the pipeline to the output
- '/offramp/system::stdout/{instance}/in'
- '/offramp/system::stdout/{instance}/in'
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use std::record;

define tumbling window `10secs`
with
interval = 10000
interval = core::datetime::with_seconds(10)
end;

define tumbling window `1min`
with
interval = 60000
interval = core::datetime::with_minutes(1)
end;

create stream normalize;
Expand All @@ -33,10 +33,10 @@ select {
from in
group by set(event.measurement, event.tags, each(record::keys(event.fields)))
into aggregate
having type::is_number(event.value);
having type::is_number(event.value);


select
select
{
"measurement": event.measurement,
"tags": patch event.tags of insert "window" => window end,
Expand Down