Skip to content

Commit

Permalink
Merge pull request #4817 from mengjinglei/fix-MinMax
Browse files Browse the repository at this point in the history
fix issue #4816 some aggregate function work incorrectly in cluster mode
  • Loading branch information
otoolep committed Nov 21, 2015
2 parents 298b149 + 580f71f commit 3ae624e
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- [#4804](https://github.com/influxdb/influxdb/pull/4804): Complete lint for services/admin. Thanks @nii236
- [#4796](https://github.com/influxdb/influxdb/pull/4796): Check point without fields. Thanks @CrazyJvm
- [#4815](https://github.com/influxdb/influxdb/pull/4815): Added `Time` field into aggregate output across the cluster. Thanks @li-ang
- [#4817](https://github.com/influxdb/influxdb/pull/4817): Fix Min,Max,Top,Bottom function when query distributed node. Thanks @mengjinglei

## v0.9.5 [2015-11-20]

Expand Down
15 changes: 15 additions & 0 deletions tsdb/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,21 @@ func InitializeUnmarshaller(c *influxql.Call) (UnmarshalFunc, error) {
err := json.Unmarshal(b, &o)
return &o, err
}, nil
case "min", "max":
return func(b []byte) (interface{}, error) {
if string(b) == "null" {
return nil, nil
}
var o minMaxMapOut
err := json.Unmarshal(b, &o)
return &o, err
}, nil
case "top", "bottom":
return func(b []byte) (interface{}, error) {
var o PositionPoints
err := json.Unmarshal(b, &o)
return o, err
}, nil
case "spread":
return func(b []byte) (interface{}, error) {
var o spreadMapOutput
Expand Down
129 changes: 129 additions & 0 deletions tsdb/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,3 +863,132 @@ func TestReduceTopBottom(t *testing.T) {
}
}
}

func TestInitializeUnmarshallerMaxMin(t *testing.T) {
tests := []struct {
Name string
input []byte
output interface{}
call *influxql.Call
}{
{
Name: "max - one point",
input: []byte(`{"Time":1447729856247384906,"Val":1,"Type":0,"Fields":{"":1},"Tags":{}}`),
output: PositionPoint{
Time: int64(1447729856247384906),
Value: float64(1),
Fields: map[string]interface{}{"": float64(1)},
Tags: map[string]string{},
},
call: &influxql.Call{Name: "max"},
},
{
Name: "max - nil point",
input: []byte(`null`),
output: nil,
call: &influxql.Call{Name: "max"},
},
{
Name: "min - one point",
input: []byte(`{"Time":1447729856247384906,"Val":1,"Type":0,"Fields":{"":1},"Tags":{}}`),
output: PositionPoint{
Time: int64(1447729856247384906),
Value: float64(1),
Fields: map[string]interface{}{"": float64(1)},
Tags: map[string]string{},
},
call: &influxql.Call{Name: "min"},
},
{
Name: "min - nil point",
input: []byte(`null`),
output: nil,
call: &influxql.Call{Name: "min"},
},
}
for _, test := range tests {
unmarshaller, err := InitializeUnmarshaller(test.call)
if err != nil {
t.Errorf("initialize unmarshaller for %v, got error:%v", test.Name, err)
}

// unmarshaller take bytes recieved from remote server as input,
// unmarshal it into an interface the reducer can use
unmarshallOutput, err := unmarshaller(test.input)
if err != nil {
t.Errorf("unmarshaller unmarshal %v fail with error:%v", &test.input, err)
}

//if input is "null" then the unmarshal output is expect to be nil
if string(test.input) == "null" && unmarshallOutput != nil {
t.Errorf("initialize unmarshaller, \nexp\n %v\ngot\n %v", nil, spew.Sdump(unmarshallOutput))
continue
}

// initialize a reducer that can take the output of unmarshaller as input
reducer, err := initializeReduceFunc(test.call)
if err != nil {
t.Errorf("initialize %v reduce function fail with error:%v", test.Name, err)
}

output := reducer([]interface{}{unmarshallOutput})
if !reflect.DeepEqual(output, test.output) {
t.Errorf("Wrong output. \nexp\n %v\ngot\n %v", spew.Sdump(test.output), spew.Sdump(output))
}
}
}

func TestInitializeUnmarshallerTopBottom(t *testing.T) {
tests := []struct {
Name string
input []byte
output interface{}
call *influxql.Call
}{
{
Name: "top - one point",
input: []byte(`[{"Time":1447729856247384906,"Value":1,"Fields":{"":1},"Tags":{}}]`),
output: PositionPoints{
{int64(1447729856247384906), float64(1), map[string]interface{}{"": float64(1)}, map[string]string{}},
},
call: &influxql.Call{
Name: "top",
Args: []influxql.Expr{
&influxql.VarRef{Val: "field1"},
&influxql.NumberLiteral{Val: 1},
},
},
},
{
Name: "bottom - one point",
input: []byte(`[{"Time":1447729856247384906,"Value":1,"Fields":{"":1},"Tags":{}}]`),
output: PositionPoints{
{int64(1447729856247384906), float64(1), map[string]interface{}{"": float64(1)}, map[string]string{}},
},
call: &influxql.Call{
Name: "bottom",
Args: []influxql.Expr{
&influxql.VarRef{Val: "field1"},
&influxql.NumberLiteral{Val: 1},
},
},
},
}
for _, test := range tests {
unmarshaller, err := InitializeUnmarshaller(test.call)
if err != nil {
t.Errorf("initialize unmarshaller for %v, got error:%v", test.Name, err)
}

// unmarshaller take bytes recieved from remote server as input,
// unmarshal it into an interface the reducer can use
output, err := unmarshaller(test.input)
if err != nil {
t.Errorf("unmarshaller unmarshal %v fail with error:%v", &test.input, err)
}

if !reflect.DeepEqual(output, test.output) {
t.Errorf("Wrong output. \nexp\n %v\ngot\n %v", spew.Sdump(test.output), spew.Sdump(output))
}
}
}

0 comments on commit 3ae624e

Please sign in to comment.