Skip to content

Commit

Permalink
Handle distributed queries when shards != data nodes
Browse files Browse the repository at this point in the history
Fixes #2272

There was previously a explict panic put in the query engine to prevent
queries where the number of shards was not equal to the number of data nodes
in the cluster.  This was waiting for the distributed queries branch to land
but was not removed when that landed.

There may be a more efficient way to do fix this but this fix simply queries
all the shards and merges their outputs.  Previously, the code assumed that
only one shard would be hit.  Querying multiple shards ended up producing
duplicate values during the map phase so the map output needed to be merged
as opposed to appended to avoid the dups.
  • Loading branch information
jwilder committed Apr 17, 2015
1 parent e890468 commit 9ea840e
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 52 deletions.
16 changes: 16 additions & 0 deletions cmd/influxd/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1443,6 +1443,22 @@ func Test3NodeClusterPartiallyReplicated(t *testing.T) {
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes)-1)
}

// ensure that all queries work if there are more nodes in a cluster than the replication factor
func Test5NodeClusterPartiallyReplicated(t *testing.T) {
testName := "5-node server integration partial replication"
if testing.Short() {
t.Skip(fmt.Sprintf("skipping '%s'", testName))
}
dir := tempfile()
defer os.RemoveAll(dir)

nodes := createCombinedNodeCluster(t, testName, dir, 5, nil)
defer nodes.Close()

runTestsData(t, testName, nodes, "mydb", "myrp", 2)
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", 2)
}

func TestClientLibrary(t *testing.T) {
testName := "single server integration via client library"
if testing.Short() {
Expand Down
47 changes: 46 additions & 1 deletion influxql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,50 @@ func (m *MapReduceJob) Execute(out chan *Row, filterEmptyResults bool) {
out <- row
}

// mergeOutputs merges two sorted slices of rawQueryMapOutput such that duplicate
// timestamp entries, if they exists, are remove and the final output is sorted by time
func (m *MapReduceJob) mergeOutputs(first, second []*rawQueryMapOutput) []*rawQueryMapOutput {
var i, j int
v := []*rawQueryMapOutput{}
for {

// indexes are past both slice maxes
if i >= len(first) && j >= len(second) {
break
}

// first slice is done, append the rest of second
if i >= len(first) {
v = append(v, second[j:]...)
break
}

// second slice is done, append the rest of first
if j >= len(second) {
v = append(v, first[i:]...)
break
}

f := first[i]
s := second[j]

// append the next smallest value to keep output sorted by time
if f.Timestamp < s.Timestamp {
v = append(v, f)
i += 1
} else if f.Timestamp > s.Timestamp {
v = append(v, s)
j += 1
// timestamps are the same so there is a dup, pick exiting and continue
} else {
v = append(v, f)
i += 1
j += 1
}
}
return v
}

// processRawQuery will handle running the mappers and then reducing their output
// for queries that pull back raw data values without computing any kind of aggregates.
func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) {
Expand Down Expand Up @@ -313,7 +357,8 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) {
valuesSent += len(values)
}

valuesToReturn = append(valuesToReturn, values...)
// merge the existing values with the new ones
valuesToReturn = m.mergeOutputs(valuesToReturn, values)

// hit the chunk size? Send out what has been accumulated, but keep
// processing.
Expand Down
93 changes: 93 additions & 0 deletions influxql/engine_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package influxql

import (
"testing"
"time"
)

func TestMergeOutputs(t *testing.T) {
job := MapReduceJob{}

test := []struct {
name string
first []*rawQueryMapOutput
second []*rawQueryMapOutput
expected []*rawQueryMapOutput
}{
{
name: "empty slices",
first: []*rawQueryMapOutput{},
second: []*rawQueryMapOutput{},
expected: []*rawQueryMapOutput{},
},
{
name: "first empty",
first: []*rawQueryMapOutput{},
second: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}},
expected: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}},
},
{
name: "second empty",
first: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}},
second: []*rawQueryMapOutput{},
expected: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}},
},
{
name: "first before",
first: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}},
second: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}},
expected: []*rawQueryMapOutput{
&rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0},
&rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0},
},
},
{
name: "second before",
first: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}},
second: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}},
expected: []*rawQueryMapOutput{
&rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0},
&rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0},
},
},
{
name: "dups removed",
first: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}},
second: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}},
expected: []*rawQueryMapOutput{
&rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0},
},
},
{
name: "sorted dups removed",
first: []*rawQueryMapOutput{
&rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0},
&rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0},
},
second: []*rawQueryMapOutput{
&rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0},
&rawQueryMapOutput{time.Unix(2, 0).UnixNano(), 0},
},
expected: []*rawQueryMapOutput{
&rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0},
&rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0},
&rawQueryMapOutput{time.Unix(2, 0).UnixNano(), 0},
},
},
}

for _, c := range test {
got := job.mergeOutputs(c.first, c.second)

if len(got) != len(c.expected) {
t.Errorf("test %s: result length mismatch: got %v, exp %v", c.name, len(got), len(c.expected))
}

for j := 0; j < len(c.expected); j++ {
if exp := c.expected[j]; exp.Timestamp != got[j].Timestamp {
t.Errorf("test %s: timestamp mismatch: got %v, exp %v", c.name, got[j].Timestamp, exp.Timestamp)
}
}

}
}
98 changes: 47 additions & 51 deletions tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,60 +139,56 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri

// create mappers for each shard we need to hit
for _, sg := range shardGroups {
if len(sg.Shards) != 1 { // we'll only have more than 1 shard in a group when RF < # servers in cluster
// TODO: implement distributed queries.
panic("distributed queries not implemented yet and there are too many shards in this group")
}

shard := sg.Shards[0]

var mapper influxql.Mapper

// create either a remote or local mapper for this shard
if shard.store == nil {
nodes := tx.server.DataNodesByID(shard.DataNodeIDs)
if len(nodes) == 0 {
return nil, ErrShardNotFound
for _, shard := range sg.Shards {
var mapper influxql.Mapper

// create either a remote or local mapper for this shard
if shard.store == nil {
nodes := tx.server.DataNodesByID(shard.DataNodeIDs)
if len(nodes) == 0 {
return nil, ErrShardNotFound
}

balancer := NewDataNodeBalancer(nodes)

mapper = &RemoteMapper{
dataNodes: balancer,
Database: mm.Database,
MeasurementName: m.Name,
TMin: tmin.UnixNano(),
TMax: tmax.UnixNano(),
SeriesIDs: t.SeriesIDs,
ShardID: shard.ID,
WhereFields: whereFields,
SelectFields: selectFields,
SelectTags: selectTags,
Limit: stmt.Limit,
Offset: stmt.Offset,
Interval: interval,
}
mapper.(*RemoteMapper).SetFilters(t.Filters)
} else {
mapper = &LocalMapper{
seriesIDs: t.SeriesIDs,
db: shard.store,
job: job,
decoder: NewFieldCodec(m),
filters: t.Filters,
whereFields: whereFields,
selectFields: selectFields,
selectTags: selectTags,
tmax: tmax.UnixNano(),
interval: interval,
// multiple mappers may need to be merged together to get the results
// for a raw query. So each mapper will have to read at least the
// limit plus the offset in data points to ensure we've hit our mark
limit: uint64(stmt.Limit) + uint64(stmt.Offset),
}
}

balancer := NewDataNodeBalancer(nodes)

mapper = &RemoteMapper{
dataNodes: balancer,
Database: mm.Database,
MeasurementName: m.Name,
TMin: tmin.UnixNano(),
TMax: tmax.UnixNano(),
SeriesIDs: t.SeriesIDs,
ShardID: shard.ID,
WhereFields: whereFields,
SelectFields: selectFields,
SelectTags: selectTags,
Limit: stmt.Limit,
Offset: stmt.Offset,
Interval: interval,
}
mapper.(*RemoteMapper).SetFilters(t.Filters)
} else {
mapper = &LocalMapper{
seriesIDs: t.SeriesIDs,
db: shard.store,
job: job,
decoder: NewFieldCodec(m),
filters: t.Filters,
whereFields: whereFields,
selectFields: selectFields,
selectTags: selectTags,
tmax: tmax.UnixNano(),
interval: interval,
// multiple mappers may need to be merged together to get the results
// for a raw query. So each mapper will have to read at least the
// limit plus the offset in data points to ensure we've hit our mark
limit: uint64(stmt.Limit) + uint64(stmt.Offset),
}
}
mappers = append(mappers, mapper)

mappers = append(mappers, mapper)
}
}

job.Mappers = mappers
Expand Down

0 comments on commit 9ea840e

Please sign in to comment.