Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Truncate the shard group end time if it exceeds MaxNanoTime #6693

Merged
merged 1 commit into from
May 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
- [#6702](https://github.com/influxdata/influxdb/issues/6702): Fix SELECT statement required privileges.
- [#6701](https://github.com/influxdata/influxdb/issues/6701): Filter out sources that do not match the shard database/retention policy.
- [#6683](https://github.com/influxdata/influxdb/issues/6683): Fix compaction planning re-compacting large TSM files
- [#6693](https://github.com/influxdata/influxdb/pull/6693): Truncate the shard group end time if it exceeds MaxNanoTime.

## v0.13.0 [2016-05-12]

Expand Down
44 changes: 44 additions & 0 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6149,6 +6149,50 @@ func TestServer_Query_DuplicateMeasurements(t *testing.T) {
}
}

func TestServer_Query_LargeTimestamp(t *testing.T) {
t.Parallel()
s := OpenDefaultServer(NewConfig())
defer s.Close()

writes := []string{
fmt.Sprintf(`cpu value=100 %d`, models.MaxNanoTime.UnixNano()),
}

test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: strings.Join(writes, "\n")},
}
test.addQueries([]*Query{
&Query{
name: `select value at max nano time`,
params: url.Values{"db": []string{"db0"}},
command: fmt.Sprintf(`SELECT value FROM cpu WHERE time <= %d`, models.MaxNanoTime.UnixNano()),
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["` + models.MaxNanoTime.Format(time.RFC3339Nano) + `",100]]}]}]}`,
},
}...)

if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}

// Open a new server with the same configuration file.
// This is to ensure the meta data was marshaled correctly.
s2 := OpenServer(s.Config)
defer s2.Close()

for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}

// This test reproduced a data race with closing the
// Subscriber points channel while writes were in-flight in the PointsWriter.
func TestServer_ConcurrentPointsWriter_Subscriber(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions services/meta/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
internal "github.com/influxdata/influxdb/services/meta/internal"
)

Expand Down Expand Up @@ -371,6 +372,9 @@ func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time)
sgi.ID = data.MaxShardGroupID
sgi.StartTime = timestamp.Truncate(rpi.ShardGroupDuration).UTC()
sgi.EndTime = sgi.StartTime.Add(rpi.ShardGroupDuration).UTC()
if sgi.EndTime.After(models.MaxNanoTime) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be if !sgi.EndTime.Before(models.MaxNanoTime) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. time.Time doesn't overflow so this works. It's the int64 that overflows.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, you're right. I was getting this confused with checking the point's time.

sgi.EndTime = models.MaxNanoTime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In here also need to do sgi.StartTime = sgi.EndTime.Add(-models.MaxNanoTime.UnixNano()) or similar.

}

data.MaxShardID++
sgi.Shards = []ShardInfo{
Expand Down