diff --git a/CHANGELOG.md b/CHANGELOG.md index 0853d46103a..dfb7f10a7ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ - [#4804](https://github.com/influxdb/influxdb/pull/4804): Complete lint for services/admin. Thanks @nii236 - [#4796](https://github.com/influxdb/influxdb/pull/4796): Check point without fields. Thanks @CrazyJvm - [#4815](https://github.com/influxdb/influxdb/pull/4815): Added `Time` field into aggregate output across the cluster. Thanks @li-ang +- [#4817](https://github.com/influxdb/influxdb/pull/4817): Fix Min,Max,Top,Bottom function when query distributed node. Thanks @mengjinglei ## v0.9.5 [2015-11-20] diff --git a/tsdb/functions.go b/tsdb/functions.go index f10b0cb468f..fd5cef4cb6d 100644 --- a/tsdb/functions.go +++ b/tsdb/functions.go @@ -199,6 +199,21 @@ func InitializeUnmarshaller(c *influxql.Call) (UnmarshalFunc, error) { err := json.Unmarshal(b, &o) return &o, err }, nil + case "min", "max": + return func(b []byte) (interface{}, error) { + if string(b) == "null" { + return nil, nil + } + var o minMaxMapOut + err := json.Unmarshal(b, &o) + return &o, err + }, nil + case "top", "bottom": + return func(b []byte) (interface{}, error) { + var o PositionPoints + err := json.Unmarshal(b, &o) + return o, err + }, nil case "spread": return func(b []byte) (interface{}, error) { var o spreadMapOutput diff --git a/tsdb/functions_test.go b/tsdb/functions_test.go index 9bbf1eff0ef..528a95b9ec6 100644 --- a/tsdb/functions_test.go +++ b/tsdb/functions_test.go @@ -863,3 +863,132 @@ func TestReduceTopBottom(t *testing.T) { } } } + +func TestInitializeUnmarshallerMaxMin(t *testing.T) { + tests := []struct { + Name string + input []byte + output interface{} + call *influxql.Call + }{ + { + Name: "max - one point", + input: []byte(`{"Time":1447729856247384906,"Val":1,"Type":0,"Fields":{"":1},"Tags":{}}`), + output: PositionPoint{ + Time: int64(1447729856247384906), + Value: float64(1), + Fields: map[string]interface{}{"": float64(1)}, + Tags: map[string]string{}, + }, + call: &influxql.Call{Name: "max"}, + }, + { + Name: "max - nil point", + input: []byte(`null`), + output: nil, + call: &influxql.Call{Name: "max"}, + }, + { + Name: "min - one point", + input: []byte(`{"Time":1447729856247384906,"Val":1,"Type":0,"Fields":{"":1},"Tags":{}}`), + output: PositionPoint{ + Time: int64(1447729856247384906), + Value: float64(1), + Fields: map[string]interface{}{"": float64(1)}, + Tags: map[string]string{}, + }, + call: &influxql.Call{Name: "min"}, + }, + { + Name: "min - nil point", + input: []byte(`null`), + output: nil, + call: &influxql.Call{Name: "min"}, + }, + } + for _, test := range tests { + unmarshaller, err := InitializeUnmarshaller(test.call) + if err != nil { + t.Errorf("initialize unmarshaller for %v, got error:%v", test.Name, err) + } + + // unmarshaller take bytes recieved from remote server as input, + // unmarshal it into an interface the reducer can use + unmarshallOutput, err := unmarshaller(test.input) + if err != nil { + t.Errorf("unmarshaller unmarshal %v fail with error:%v", &test.input, err) + } + + //if input is "null" then the unmarshal output is expect to be nil + if string(test.input) == "null" && unmarshallOutput != nil { + t.Errorf("initialize unmarshaller, \nexp\n %v\ngot\n %v", nil, spew.Sdump(unmarshallOutput)) + continue + } + + // initialize a reducer that can take the output of unmarshaller as input + reducer, err := initializeReduceFunc(test.call) + if err != nil { + t.Errorf("initialize %v reduce function fail with error:%v", test.Name, err) + } + + output := reducer([]interface{}{unmarshallOutput}) + if !reflect.DeepEqual(output, test.output) { + t.Errorf("Wrong output. \nexp\n %v\ngot\n %v", spew.Sdump(test.output), spew.Sdump(output)) + } + } +} + +func TestInitializeUnmarshallerTopBottom(t *testing.T) { + tests := []struct { + Name string + input []byte + output interface{} + call *influxql.Call + }{ + { + Name: "top - one point", + input: []byte(`[{"Time":1447729856247384906,"Value":1,"Fields":{"":1},"Tags":{}}]`), + output: PositionPoints{ + {int64(1447729856247384906), float64(1), map[string]interface{}{"": float64(1)}, map[string]string{}}, + }, + call: &influxql.Call{ + Name: "top", + Args: []influxql.Expr{ + &influxql.VarRef{Val: "field1"}, + &influxql.NumberLiteral{Val: 1}, + }, + }, + }, + { + Name: "bottom - one point", + input: []byte(`[{"Time":1447729856247384906,"Value":1,"Fields":{"":1},"Tags":{}}]`), + output: PositionPoints{ + {int64(1447729856247384906), float64(1), map[string]interface{}{"": float64(1)}, map[string]string{}}, + }, + call: &influxql.Call{ + Name: "bottom", + Args: []influxql.Expr{ + &influxql.VarRef{Val: "field1"}, + &influxql.NumberLiteral{Val: 1}, + }, + }, + }, + } + for _, test := range tests { + unmarshaller, err := InitializeUnmarshaller(test.call) + if err != nil { + t.Errorf("initialize unmarshaller for %v, got error:%v", test.Name, err) + } + + // unmarshaller take bytes recieved from remote server as input, + // unmarshal it into an interface the reducer can use + output, err := unmarshaller(test.input) + if err != nil { + t.Errorf("unmarshaller unmarshal %v fail with error:%v", &test.input, err) + } + + if !reflect.DeepEqual(output, test.output) { + t.Errorf("Wrong output. \nexp\n %v\ngot\n %v", spew.Sdump(test.output), spew.Sdump(output)) + } + } +}