Skip to content

Commit

Permalink
feat(functions): converting limit to use arrow arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
affo committed Jan 8, 2019
1 parent d62cb1b commit 5d48531
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 24 deletions.
18 changes: 13 additions & 5 deletions docs/SPEC.md
Original file line number Diff line number Diff line change
Expand Up @@ -1831,17 +1831,25 @@ LogarithmicBins has the following properties:

#### Limit

Limit caps the number of records in output tables to a fixed size n.
Limit caps the number of records in output tables to a fixed size `n`.
One output table is produced for each input table.
The output table will contain the first n records from the input table.
If the input table has less than n records all records will be output.
Each output table will contain the first `n` records after the first `offset` records of the input table.
If the input table has less than `offset + n` records, all records except the first `offset` ones will be output.

Limit has the following properties:

* `n` int
The maximum number of records to output.
The maximum number of records per table to output.
* `offset` int
The number of records to skip per table before limiting to `n`. Defaults to 0.

Example: `from(bucket: "telegraf/autogen") |> limit(n: 10)`
Example:

```
from(bucket: "telegraf/autogen")
|> range(start: -1h)
|> limit(n: 10, offset: 1)
```

#### Map

Expand Down
6 changes: 6 additions & 0 deletions functions/tests/testdata/limit.flux
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
limit_fn = (table=<-) =>
table
|> range(start: 2018-05-22T19:00:00Z, stop: 2018-05-22T20:00:00Z)
|> limit(n: 1)

testingTest(name: "limit", load: testLoadData, infile: "limit.in.csv", outfile: "limit.out.csv", test: limit_fn)
28 changes: 28 additions & 0 deletions functions/tests/testdata/limit.in.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
#group,false,false,false,false,false,false,true,true,true
#default,_result,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,host
,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,82.9833984375,used_percent,swap,host.local
,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,82.598876953125,used_percent,swap,host.local
,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,82.598876953125,used_percent,swap,host.local
,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,82.598876953125,used_percent,swap,host.local
,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,82.598876953125,used_percent,swap,host.local
,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,82.6416015625,used_percent,swap,host.local
,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,1.83,load1,system,host.local
,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,1.7,load1,system,host.local
,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,1.74,load1,system,host.local
,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,1.63,load1,system,host.local
,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,1.91,load1,system,host.local
,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,1.84,load1,system,host.local
,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,1.98,load15,system,host.local
,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,1.97,load15,system,host.local
,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,1.97,load15,system,host.local
,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,1.96,load15,system,host.local
,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,1.98,load15,system,host.local
,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,1.97,load15,system,host.local
,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,1.95,load5,system,host.local
,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,1.92,load5,system,host.local
,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,1.92,load5,system,host.local
,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,1.89,load5,system,host.local
,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,1.94,load5,system,host.local
,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,1.93,load5,system,host.local
8 changes: 8 additions & 0 deletions functions/tests/testdata/limit.out.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
#group,false,false,false,false,false,false,true,true,true
#default,_result,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,host
,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,82.9833984375,used_percent,swap,host.local
,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,1.83,load1,system,host.local
,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,1.98,load15,system,host.local
,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,1.95,load5,system,host.local
6 changes: 6 additions & 0 deletions functions/tests/testdata/limit_offset.flux
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
limit_fn = (table=<-) =>
table
|> range(start: 2018-05-22T19:00:00Z, stop: 2018-05-22T20:00:00Z)
|> limit(n: 2, offset: 1)

testingTest(name: "limit_offset", load: testLoadData, infile: "limit.in.csv", outfile: "limit_offset.out.csv", test: limit_fn)
11 changes: 11 additions & 0 deletions functions/tests/testdata/limit_offset.out.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
#group,false,false,false,false,false,false,true,true,true
#default,_result,,,,,,,,
,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,82.598876953125,used_percent,swap,host.local
,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,82.598876953125,used_percent,swap,host.local
,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,1.7,load1,system,host.local
,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,1.74,load1,system,host.local
,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,1.97,load15,system,host.local
,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,1.97,load15,system,host.local
,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,1.92,load5,system,host.local
,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,1.92,load5,system,host.local
39 changes: 20 additions & 19 deletions functions/transformations/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package transformations
import (
"fmt"

"github.com/apache/arrow/go/arrow/array"
"github.com/influxdata/flux"
"github.com/influxdata/flux/arrow"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/semantic"
Expand All @@ -13,7 +15,6 @@ import (
const LimitKind = "limit"

// LimitOpSpec limits the number of rows returned per table.
// Currently offset is not supported.
type LimitOpSpec struct {
N int64 `json:"n"`
Offset int64 `json:"offset"`
Expand Down Expand Up @@ -134,7 +135,7 @@ func (t *limitTransformation) Process(id execute.DatasetID, tbl flux.Table) erro
n := t.n
offset := t.offset
var finishedErr error
err := tbl.Do(func(cr flux.ColReader) error {
err := tbl.DoArrow(func(cr flux.ArrowColReader) error {
if n <= 0 {
// Returning an error terminates iteration
finishedErr = errors.New("finished")
Expand All @@ -155,11 +156,11 @@ func (t *limitTransformation) Process(id execute.DatasetID, tbl flux.Table) erro
}
n -= count
lcr := sliceColReader{
ColReader: cr,
start: start,
stop: stop,
ArrowColReader: cr,
start: start,
stop: stop,
}
err := execute.AppendCols(lcr, builder)
err := execute.AppendColsArrow(lcr, builder)
if err != nil {
return err
}
Expand All @@ -173,36 +174,36 @@ func (t *limitTransformation) Process(id execute.DatasetID, tbl flux.Table) erro
}

type sliceColReader struct {
flux.ColReader
flux.ArrowColReader
start, stop int
}

func (cr sliceColReader) Len() int {
return cr.stop
}

func (cr sliceColReader) Bools(j int) []bool {
return cr.ColReader.Bools(j)[cr.start:cr.stop]
func (cr sliceColReader) Bools(j int) *array.Boolean {
return arrow.BoolSlice(cr.ArrowColReader.Bools(j), cr.start, cr.stop)
}

func (cr sliceColReader) Ints(j int) []int64 {
return cr.ColReader.Ints(j)[cr.start:cr.stop]
func (cr sliceColReader) Ints(j int) *array.Int64 {
return arrow.IntSlice(cr.ArrowColReader.Ints(j), cr.start, cr.stop)
}

func (cr sliceColReader) UInts(j int) []uint64 {
return cr.ColReader.UInts(j)[cr.start:cr.stop]
func (cr sliceColReader) UInts(j int) *array.Uint64 {
return arrow.UintSlice(cr.ArrowColReader.UInts(j), cr.start, cr.stop)
}

func (cr sliceColReader) Floats(j int) []float64 {
return cr.ColReader.Floats(j)[cr.start:cr.stop]
func (cr sliceColReader) Floats(j int) *array.Float64 {
return arrow.FloatSlice(cr.ArrowColReader.Floats(j), cr.start, cr.stop)
}

func (cr sliceColReader) Strings(j int) []string {
return cr.ColReader.Strings(j)[cr.start:cr.stop]
func (cr sliceColReader) Strings(j int) *array.Binary {
return arrow.StringSlice(cr.ArrowColReader.Strings(j), cr.start, cr.stop)
}

func (cr sliceColReader) Times(j int) []execute.Time {
return cr.ColReader.Times(j)[cr.start:cr.stop]
func (cr sliceColReader) Times(j int) *array.Int64 {
return arrow.IntSlice(cr.ArrowColReader.Times(j), cr.start, cr.stop)
}

func (t *limitTransformation) UpdateWatermark(id execute.DatasetID, mark execute.Time) error {
Expand Down

0 comments on commit 5d48531

Please sign in to comment.