From 9ea840ec917ca39c47ca26776a2f0a4f6bb7000d Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Fri, 17 Apr 2015 16:03:32 -0600 Subject: [PATCH] Handle distributed queries when shards != data nodes Fixes #2272 There was previously a explict panic put in the query engine to prevent queries where the number of shards was not equal to the number of data nodes in the cluster. This was waiting for the distributed queries branch to land but was not removed when that landed. There may be a more efficient way to do fix this but this fix simply queries all the shards and merges their outputs. Previously, the code assumed that only one shard would be hit. Querying multiple shards ended up producing duplicate values during the map phase so the map output needed to be merged as opposed to appended to avoid the dups. --- cmd/influxd/server_integration_test.go | 16 +++++ influxql/engine.go | 47 +++++++++++- influxql/engine_test.go | 93 ++++++++++++++++++++++++ tx.go | 98 ++++++++++++-------------- 4 files changed, 202 insertions(+), 52 deletions(-) create mode 100644 influxql/engine_test.go diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index 1446aafabf8..90bd0a02fd4 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -1443,6 +1443,22 @@ func Test3NodeClusterPartiallyReplicated(t *testing.T) { runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes)-1) } +// ensure that all queries work if there are more nodes in a cluster than the replication factor +func Test5NodeClusterPartiallyReplicated(t *testing.T) { + testName := "5-node server integration partial replication" + if testing.Short() { + t.Skip(fmt.Sprintf("skipping '%s'", testName)) + } + dir := tempfile() + defer os.RemoveAll(dir) + + nodes := createCombinedNodeCluster(t, testName, dir, 5, nil) + defer nodes.Close() + + runTestsData(t, testName, nodes, "mydb", "myrp", 2) + runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", 2) +} + func TestClientLibrary(t *testing.T) { testName := "single server integration via client library" if testing.Short() { diff --git a/influxql/engine.go b/influxql/engine.go index c5e52035d96..7b89632d897 100644 --- a/influxql/engine.go +++ b/influxql/engine.go @@ -204,6 +204,50 @@ func (m *MapReduceJob) Execute(out chan *Row, filterEmptyResults bool) { out <- row } +// mergeOutputs merges two sorted slices of rawQueryMapOutput such that duplicate +// timestamp entries, if they exists, are remove and the final output is sorted by time +func (m *MapReduceJob) mergeOutputs(first, second []*rawQueryMapOutput) []*rawQueryMapOutput { + var i, j int + v := []*rawQueryMapOutput{} + for { + + // indexes are past both slice maxes + if i >= len(first) && j >= len(second) { + break + } + + // first slice is done, append the rest of second + if i >= len(first) { + v = append(v, second[j:]...) + break + } + + // second slice is done, append the rest of first + if j >= len(second) { + v = append(v, first[i:]...) + break + } + + f := first[i] + s := second[j] + + // append the next smallest value to keep output sorted by time + if f.Timestamp < s.Timestamp { + v = append(v, f) + i += 1 + } else if f.Timestamp > s.Timestamp { + v = append(v, s) + j += 1 + // timestamps are the same so there is a dup, pick exiting and continue + } else { + v = append(v, f) + i += 1 + j += 1 + } + } + return v +} + // processRawQuery will handle running the mappers and then reducing their output // for queries that pull back raw data values without computing any kind of aggregates. func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) { @@ -313,7 +357,8 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) { valuesSent += len(values) } - valuesToReturn = append(valuesToReturn, values...) + // merge the existing values with the new ones + valuesToReturn = m.mergeOutputs(valuesToReturn, values) // hit the chunk size? Send out what has been accumulated, but keep // processing. diff --git a/influxql/engine_test.go b/influxql/engine_test.go new file mode 100644 index 00000000000..ae61592642d --- /dev/null +++ b/influxql/engine_test.go @@ -0,0 +1,93 @@ +package influxql + +import ( + "testing" + "time" +) + +func TestMergeOutputs(t *testing.T) { + job := MapReduceJob{} + + test := []struct { + name string + first []*rawQueryMapOutput + second []*rawQueryMapOutput + expected []*rawQueryMapOutput + }{ + { + name: "empty slices", + first: []*rawQueryMapOutput{}, + second: []*rawQueryMapOutput{}, + expected: []*rawQueryMapOutput{}, + }, + { + name: "first empty", + first: []*rawQueryMapOutput{}, + second: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}}, + expected: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}}, + }, + { + name: "second empty", + first: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}}, + second: []*rawQueryMapOutput{}, + expected: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}}, + }, + { + name: "first before", + first: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}}, + second: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}}, + expected: []*rawQueryMapOutput{ + &rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}, + &rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}, + }, + }, + { + name: "second before", + first: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}}, + second: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}}, + expected: []*rawQueryMapOutput{ + &rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}, + &rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}, + }, + }, + { + name: "dups removed", + first: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}}, + second: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}}, + expected: []*rawQueryMapOutput{ + &rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}, + }, + }, + { + name: "sorted dups removed", + first: []*rawQueryMapOutput{ + &rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}, + &rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}, + }, + second: []*rawQueryMapOutput{ + &rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}, + &rawQueryMapOutput{time.Unix(2, 0).UnixNano(), 0}, + }, + expected: []*rawQueryMapOutput{ + &rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}, + &rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}, + &rawQueryMapOutput{time.Unix(2, 0).UnixNano(), 0}, + }, + }, + } + + for _, c := range test { + got := job.mergeOutputs(c.first, c.second) + + if len(got) != len(c.expected) { + t.Errorf("test %s: result length mismatch: got %v, exp %v", c.name, len(got), len(c.expected)) + } + + for j := 0; j < len(c.expected); j++ { + if exp := c.expected[j]; exp.Timestamp != got[j].Timestamp { + t.Errorf("test %s: timestamp mismatch: got %v, exp %v", c.name, got[j].Timestamp, exp.Timestamp) + } + } + + } +} diff --git a/tx.go b/tx.go index e0fb85d331c..6660e794863 100644 --- a/tx.go +++ b/tx.go @@ -139,60 +139,56 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri // create mappers for each shard we need to hit for _, sg := range shardGroups { - if len(sg.Shards) != 1 { // we'll only have more than 1 shard in a group when RF < # servers in cluster - // TODO: implement distributed queries. - panic("distributed queries not implemented yet and there are too many shards in this group") - } - - shard := sg.Shards[0] - - var mapper influxql.Mapper - - // create either a remote or local mapper for this shard - if shard.store == nil { - nodes := tx.server.DataNodesByID(shard.DataNodeIDs) - if len(nodes) == 0 { - return nil, ErrShardNotFound + for _, shard := range sg.Shards { + var mapper influxql.Mapper + + // create either a remote or local mapper for this shard + if shard.store == nil { + nodes := tx.server.DataNodesByID(shard.DataNodeIDs) + if len(nodes) == 0 { + return nil, ErrShardNotFound + } + + balancer := NewDataNodeBalancer(nodes) + + mapper = &RemoteMapper{ + dataNodes: balancer, + Database: mm.Database, + MeasurementName: m.Name, + TMin: tmin.UnixNano(), + TMax: tmax.UnixNano(), + SeriesIDs: t.SeriesIDs, + ShardID: shard.ID, + WhereFields: whereFields, + SelectFields: selectFields, + SelectTags: selectTags, + Limit: stmt.Limit, + Offset: stmt.Offset, + Interval: interval, + } + mapper.(*RemoteMapper).SetFilters(t.Filters) + } else { + mapper = &LocalMapper{ + seriesIDs: t.SeriesIDs, + db: shard.store, + job: job, + decoder: NewFieldCodec(m), + filters: t.Filters, + whereFields: whereFields, + selectFields: selectFields, + selectTags: selectTags, + tmax: tmax.UnixNano(), + interval: interval, + // multiple mappers may need to be merged together to get the results + // for a raw query. So each mapper will have to read at least the + // limit plus the offset in data points to ensure we've hit our mark + limit: uint64(stmt.Limit) + uint64(stmt.Offset), + } } - balancer := NewDataNodeBalancer(nodes) - - mapper = &RemoteMapper{ - dataNodes: balancer, - Database: mm.Database, - MeasurementName: m.Name, - TMin: tmin.UnixNano(), - TMax: tmax.UnixNano(), - SeriesIDs: t.SeriesIDs, - ShardID: shard.ID, - WhereFields: whereFields, - SelectFields: selectFields, - SelectTags: selectTags, - Limit: stmt.Limit, - Offset: stmt.Offset, - Interval: interval, - } - mapper.(*RemoteMapper).SetFilters(t.Filters) - } else { - mapper = &LocalMapper{ - seriesIDs: t.SeriesIDs, - db: shard.store, - job: job, - decoder: NewFieldCodec(m), - filters: t.Filters, - whereFields: whereFields, - selectFields: selectFields, - selectTags: selectTags, - tmax: tmax.UnixNano(), - interval: interval, - // multiple mappers may need to be merged together to get the results - // for a raw query. So each mapper will have to read at least the - // limit plus the offset in data points to ensure we've hit our mark - limit: uint64(stmt.Limit) + uint64(stmt.Offset), - } - } + mappers = append(mappers, mapper) - mappers = append(mappers, mapper) + } } job.Mappers = mappers