Skip to content

Commit

Permalink
Filter out sources that do not match the shard database/retention policy
Browse files Browse the repository at this point in the history
If you use a statement like this:

    SELECT value FROM one..cpu, two..cpu

It will access both the `one` and `two` databases as if you had selected
the `cpu` measurement twice for both of them. Updated the `tsdb.Shard`
create iterator function to filter out any sources that do not apply to
that shard so this duplication doesn't happen.

Fixes #6701.
  • Loading branch information
jsternberg committed May 23, 2016
1 parent aa2f490 commit 5e7e0bd
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
- [#6676](https://github.com/influxdata/influxdb/issues/6676): Ensures client sends correct precision when inserting points.
- [#2048](https://github.com/influxdata/influxdb/issues/2048): Check that retention policies exist before creating CQ
- [#6702](https://github.com/influxdata/influxdb/issues/6702): Fix SELECT statement required privileges.
- [#6701](https://github.com/influxdata/influxdb/issues/6701): Filter out sources that do not match the shard database/retention policy.

## v0.13.0 [2016-05-12]

Expand Down
55 changes: 55 additions & 0 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6094,6 +6094,61 @@ func TestServer_Query_IntoTarget(t *testing.T) {
}
}

// This test ensures that data is not duplicated with measurements
// of the same name.
func TestServer_Query_DuplicateMeasurements(t *testing.T) {
t.Parallel()
s := OpenDefaultServer(NewConfig())
defer s.Close()

// Create a second database.
if err := s.CreateDatabaseAndRetentionPolicy("db1", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaClient.SetDefaultRetentionPolicy("db1", "rp0"); err != nil {
t.Fatal(err)
}

test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: fmt.Sprintf(`cpu value=1 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano())},
}

if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}

test = NewTest("db1", "rp0")
test.writes = Writes{
&Write{data: fmt.Sprintf(`cpu value=2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:10Z").UnixNano())},
}

if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}

test.addQueries([]*Query{
&Query{
name: "select from both databases",
params: url.Values{"db": []string{"db0"}},
command: `SELECT value FROM db0.rp0.cpu, db1.rp0.cpu`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T00:00:00Z",1],["2000-01-01T00:00:10Z",2]]}]}]}`,
},
}...)

for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}

// This test reproduced a data race with closing the
// Subscriber points channel while writes were in-flight in the PointsWriter.
func TestServer_ConcurrentPointsWriter_Subscriber(t *testing.T) {
Expand Down
14 changes: 14 additions & 0 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,20 @@ func (a Sources) Names() []string {
return names
}

// Filter returns a list of source names filtered by the database/retention policy.
func (a Sources) Filter(database, retentionPolicy string) []Source {
sources := make([]Source, 0, len(a))
for _, s := range a {
switch s := s.(type) {
case *Measurement:
if s.Database == database && s.RetentionPolicy == retentionPolicy {
sources = append(sources, s)
}
}
}
return sources
}

// HasSystemSource returns true if any of the sources are internal, system sources.
func (a Sources) HasSystemSource() bool {
for _, s := range a {
Expand Down
1 change: 1 addition & 0 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ func (s *Shard) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator,
if influxql.Sources(opt.Sources).HasSystemSource() {
return s.createSystemIterator(opt)
}
opt.Sources = influxql.Sources(opt.Sources).Filter(s.database, s.retentionPolicy)
return s.engine.CreateIterator(opt)
}

Expand Down
28 changes: 18 additions & 10 deletions tsdb/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,14 @@ cpu,host=serverB,region=uswest value=25 0
Expr: influxql.MustParseExpr(`value`),
Aux: []influxql.VarRef{{Val: "val2"}},
Dimensions: []string{"host"},
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
Ascending: true,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
Sources: []influxql.Source{&influxql.Measurement{
Name: "cpu",
Database: "db0",
RetentionPolicy: "rp0",
}},
Ascending: true,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
})
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -297,10 +301,14 @@ cpu,host=serverB,region=uswest value=25 0
Expr: influxql.MustParseExpr(`value`),
Aux: []influxql.VarRef{{Val: "val2"}},
Dimensions: []string{"host"},
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
Ascending: false,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
Sources: []influxql.Source{&influxql.Measurement{
Name: "cpu",
Database: "db0",
RetentionPolicy: "rp0",
}},
Ascending: false,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
})
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -494,8 +502,8 @@ func NewShard() *Shard {
return &Shard{
Shard: tsdb.NewShard(0,
tsdb.NewDatabaseIndex("db"),
filepath.Join(path, "data"),
filepath.Join(path, "wal"),
filepath.Join(path, "data", "db0", "rp0", "1"),
filepath.Join(path, "wal", "db0", "rp0", "1"),
opt,
),
path: path,
Expand Down
20 changes: 14 additions & 6 deletions tsdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,14 @@ func TestShards_CreateIterator(t *testing.T) {
itr, err := ics.CreateIterator(influxql.IteratorOptions{
Expr: influxql.MustParseExpr(`value`),
Dimensions: []string{"host"},
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
Ascending: true,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
Sources: []influxql.Source{&influxql.Measurement{
Name: "cpu",
Database: "db0",
RetentionPolicy: "rp0",
}},
Ascending: true,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
})
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -329,8 +333,12 @@ func TestStore_BackupRestoreShard(t *testing.T) {

// Read data from
itr, err := s1.Shard(100).CreateIterator(influxql.IteratorOptions{
Expr: influxql.MustParseExpr(`value`),
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
Expr: influxql.MustParseExpr(`value`),
Sources: []influxql.Source{&influxql.Measurement{
Name: "cpu",
Database: "db0",
RetentionPolicy: "rp0",
}},
Ascending: true,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
Expand Down

0 comments on commit 5e7e0bd

Please sign in to comment.