From eec2f9d9f29f1142b4314ef0db8de819a78e9b43 Mon Sep 17 00:00:00 2001 From: Lorenzo Affetti Date: Wed, 19 Dec 2018 19:14:28 +0100 Subject: [PATCH] feat(functions): converting limit to use arrow arrays --- docs/SPEC.md | 18 ++++++-- functions/tests/testdata/limit.flux | 51 ++++++++++++++++++++ functions/tests/testdata/limit_offset.flux | 54 ++++++++++++++++++++++ functions/transformations/limit.go | 39 ++++++++-------- 4 files changed, 138 insertions(+), 24 deletions(-) create mode 100644 functions/tests/testdata/limit.flux create mode 100644 functions/tests/testdata/limit_offset.flux diff --git a/docs/SPEC.md b/docs/SPEC.md index 8fa7b7946c..bbd8bf6d1b 100644 --- a/docs/SPEC.md +++ b/docs/SPEC.md @@ -1831,17 +1831,25 @@ LogarithmicBins has the following properties: #### Limit -Limit caps the number of records in output tables to a fixed size n. +Limit caps the number of records in output tables to a fixed size `n`. One output table is produced for each input table. -The output table will contain the first n records from the input table. -If the input table has less than n records all records will be output. +Each output table will contain the first `n` records after the first `offset` records of the input table. +If the input table has less than `offset + n` records, all records except the first `offset` ones will be output. Limit has the following properties: * `n` int - The maximum number of records to output. + The maximum number of records per table to output. +* `offset` int + The number of records to skip per table before limiting to `n`. Defaults to 0. -Example: `from(bucket: "telegraf/autogen") |> limit(n: 10)` +Example: + +``` +from(bucket: "telegraf/autogen") + |> range(start: -1h) + |> limit(n: 10, offset: 1) +``` #### Map diff --git a/functions/tests/testdata/limit.flux b/functions/tests/testdata/limit.flux new file mode 100644 index 0000000000..0c4d25e581 --- /dev/null +++ b/functions/tests/testdata/limit.flux @@ -0,0 +1,51 @@ +inData = " +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string +#group,false,false,false,false,false,false,true,true,true +#default,_result,,,,,,,, +,result,table,_start,_stop,_time,_value,_field,_measurement,host +,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,82.9833984375,used_percent,swap,host.local +,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,82.598876953125,used_percent,swap,host.local +,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,82.598876953125,used_percent,swap,host.local +,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,82.598876953125,used_percent,swap,host.local +,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,82.598876953125,used_percent,swap,host.local +,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,82.6416015625,used_percent,swap,host.local +,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,1.83,load1,system,host.local +,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,1.7,load1,system,host.local +,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,1.74,load1,system,host.local +,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,1.63,load1,system,host.local +,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,1.91,load1,system,host.local +,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,1.84,load1,system,host.local +,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,1.98,load15,system,host.local +,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,1.97,load15,system,host.local +,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,1.97,load15,system,host.local +,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,1.96,load15,system,host.local +,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,1.98,load15,system,host.local +,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,1.97,load15,system,host.local +,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,1.95,load5,system,host.local +,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,1.92,load5,system,host.local +,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,1.92,load5,system,host.local +,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,1.89,load5,system,host.local +,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,1.94,load5,system,host.local +,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,1.93,load5,system,host.local +" + +outData = " +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string +#group,false,false,false,false,false,false,true,true,true +#default,_result,,,,,,,, +,result,table,_start,_stop,_time,_value,_field,_measurement,host +,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,82.9833984375,used_percent,swap,host.local +,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,1.83,load1,system,host.local +,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,1.98,load15,system,host.local +,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,1.95,load5,system,host.local +" + +t_limit = (table=<-) => + table + |> range(start: 2018-05-22T19:00:00Z, stop: 2018-05-22T20:00:00Z) + |> limit(n: 1) + +testingTest(name: "limit", + input: testLoadStorage(csv: inData), + want: testLoadMem(csv: outData), + test: t_limit) \ No newline at end of file diff --git a/functions/tests/testdata/limit_offset.flux b/functions/tests/testdata/limit_offset.flux new file mode 100644 index 0000000000..ccaea2fdb5 --- /dev/null +++ b/functions/tests/testdata/limit_offset.flux @@ -0,0 +1,54 @@ +inData = " +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string +#group,false,false,false,false,false,false,true,true,true +#default,_result,,,,,,,, +,result,table,_start,_stop,_time,_value,_field,_measurement,host +,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,82.9833984375,used_percent,swap,host.local +,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,82.598876953125,used_percent,swap,host.local +,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,82.598876953125,used_percent,swap,host.local +,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,82.598876953125,used_percent,swap,host.local +,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,82.598876953125,used_percent,swap,host.local +,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,82.6416015625,used_percent,swap,host.local +,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,1.83,load1,system,host.local +,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,1.7,load1,system,host.local +,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,1.74,load1,system,host.local +,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,1.63,load1,system,host.local +,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,1.91,load1,system,host.local +,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,1.84,load1,system,host.local +,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,1.98,load15,system,host.local +,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,1.97,load15,system,host.local +,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,1.97,load15,system,host.local +,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,1.96,load15,system,host.local +,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,1.98,load15,system,host.local +,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,1.97,load15,system,host.local +,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,1.95,load5,system,host.local +,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,1.92,load5,system,host.local +,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,1.92,load5,system,host.local +,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,1.89,load5,system,host.local +,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,1.94,load5,system,host.local +,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,1.93,load5,system,host.local +" + +outData = " +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string +#group,false,false,false,false,false,false,true,true,true +#default,_result,,,,,,,, +,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,82.598876953125,used_percent,swap,host.local +,,161,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,82.598876953125,used_percent,swap,host.local +,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,1.7,load1,system,host.local +,,162,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,1.74,load1,system,host.local +,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,1.97,load15,system,host.local +,,163,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,1.97,load15,system,host.local +,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,1.92,load5,system,host.local +,,164,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,1.92,load5,system,host.local +" + +t_limit = (table=<-) => + table + |> range(start: 2018-05-22T19:00:00Z, stop: 2018-05-22T20:00:00Z) + |> limit(n: 2, offset: 1) + +testingTest(name: "limit_offset", + input: testLoadStorage(csv: inData), + want: testLoadMem(csv: outData), + test: t_limit) \ No newline at end of file diff --git a/functions/transformations/limit.go b/functions/transformations/limit.go index 0dea56738f..026eb4b5c3 100644 --- a/functions/transformations/limit.go +++ b/functions/transformations/limit.go @@ -3,7 +3,9 @@ package transformations import ( "fmt" + "github.com/apache/arrow/go/arrow/array" "github.com/influxdata/flux" + "github.com/influxdata/flux/arrow" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/plan" "github.com/influxdata/flux/semantic" @@ -13,7 +15,6 @@ import ( const LimitKind = "limit" // LimitOpSpec limits the number of rows returned per table. -// Currently offset is not supported. type LimitOpSpec struct { N int64 `json:"n"` Offset int64 `json:"offset"` @@ -134,7 +135,7 @@ func (t *limitTransformation) Process(id execute.DatasetID, tbl flux.Table) erro n := t.n offset := t.offset var finishedErr error - err := tbl.Do(func(cr flux.ColReader) error { + err := tbl.DoArrow(func(cr flux.ArrowColReader) error { if n <= 0 { // Returning an error terminates iteration finishedErr = errors.New("finished") @@ -155,11 +156,11 @@ func (t *limitTransformation) Process(id execute.DatasetID, tbl flux.Table) erro } n -= count lcr := sliceColReader{ - ColReader: cr, - start: start, - stop: stop, + ArrowColReader: cr, + start: start, + stop: stop, } - err := execute.AppendCols(lcr, builder) + err := execute.AppendColsArrow(lcr, builder) if err != nil { return err } @@ -173,7 +174,7 @@ func (t *limitTransformation) Process(id execute.DatasetID, tbl flux.Table) erro } type sliceColReader struct { - flux.ColReader + flux.ArrowColReader start, stop int } @@ -181,28 +182,28 @@ func (cr sliceColReader) Len() int { return cr.stop } -func (cr sliceColReader) Bools(j int) []bool { - return cr.ColReader.Bools(j)[cr.start:cr.stop] +func (cr sliceColReader) Bools(j int) *array.Boolean { + return arrow.BoolSlice(cr.ArrowColReader.Bools(j), cr.start, cr.stop) } -func (cr sliceColReader) Ints(j int) []int64 { - return cr.ColReader.Ints(j)[cr.start:cr.stop] +func (cr sliceColReader) Ints(j int) *array.Int64 { + return arrow.IntSlice(cr.ArrowColReader.Ints(j), cr.start, cr.stop) } -func (cr sliceColReader) UInts(j int) []uint64 { - return cr.ColReader.UInts(j)[cr.start:cr.stop] +func (cr sliceColReader) UInts(j int) *array.Uint64 { + return arrow.UintSlice(cr.ArrowColReader.UInts(j), cr.start, cr.stop) } -func (cr sliceColReader) Floats(j int) []float64 { - return cr.ColReader.Floats(j)[cr.start:cr.stop] +func (cr sliceColReader) Floats(j int) *array.Float64 { + return arrow.FloatSlice(cr.ArrowColReader.Floats(j), cr.start, cr.stop) } -func (cr sliceColReader) Strings(j int) []string { - return cr.ColReader.Strings(j)[cr.start:cr.stop] +func (cr sliceColReader) Strings(j int) *array.Binary { + return arrow.StringSlice(cr.ArrowColReader.Strings(j), cr.start, cr.stop) } -func (cr sliceColReader) Times(j int) []execute.Time { - return cr.ColReader.Times(j)[cr.start:cr.stop] +func (cr sliceColReader) Times(j int) *array.Int64 { + return arrow.IntSlice(cr.ArrowColReader.Times(j), cr.start, cr.stop) } func (t *limitTransformation) UpdateWatermark(id execute.DatasetID, mark execute.Time) error {