Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't precreate shard groups entirely in past #3931

Merged
merged 2 commits into from
Sep 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ With this release InfluxDB is moving to Go 1.5.
- [#3901](https://github.com/influxdb/influxdb/pull/3901): Unblock relaxed write consistency level Thanks @takayuki!
- [#3950](https://github.com/influxdb/influxdb/pull/3950): Limit bz1 quickcheck tests to 10 iterations on CI
- [#3977](https://github.com/influxdb/influxdb/pull/3977): Silence wal logging during testing
- [#3931](https://github.com/influxdb/influxdb/pull/3931): Don't precreate shard groups entirely in the past

## v0.9.3 [2015-08-26]

Expand Down
50 changes: 23 additions & 27 deletions meta/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1389,38 +1389,34 @@ func (s *Store) UserCount() (count int, err error) {
return
}

// PrecreateShardGroups creates shard groups whose endtime is before the cutoff time passed in. This
// avoid the need for these shards to be created when data for the corresponding time range arrives.
// Shard creation involves Raft consensus, and precreation avoids taking the hit at write-time.
func (s *Store) PrecreateShardGroups(cutoff time.Time) error {
// PrecreateShardGroups creates shard groups whose endtime is before the 'to' time passed in, but
// is yet to expire before 'from'. This is to avoid the need for these shards to be created when data
// for the corresponding time range arrives. Shard creation involves Raft consensus, and precreation
// avoids taking the hit at write-time.
func (s *Store) PrecreateShardGroups(from, to time.Time) error {
s.read(func(data *Data) error {
for _, di := range data.Databases {
for _, rp := range di.RetentionPolicies {
for _, g := range rp.ShardGroups {
// Check to see if it is not deleted and going to end before our interval
if !g.Deleted() && g.EndTime.Before(cutoff) {
nextShardGroupTime := g.EndTime.Add(1 * time.Nanosecond)

// Check if successive shard group exists.
if sgi, err := s.ShardGroupByTimestamp(di.Name, rp.Name, nextShardGroupTime); err != nil {
s.Logger.Printf("failed to check if successive shard group for group exists %d: %s",
g.ID, err.Error())
continue
} else if sgi != nil && !sgi.Deleted() {
continue
}

// It doesn't. Create it.
if newGroup, err := s.CreateShardGroupIfNotExists(di.Name, rp.Name, nextShardGroupTime); err != nil {
s.Logger.Printf("failed to create successive shard group for group %d: %s",
g.ID, err.Error())
} else {
s.Logger.Printf("new shard group %d successfully created for database %s, retention policy %s",
newGroup.ID, di.Name, rp.Name)
}
if len(rp.ShardGroups) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment here would be useful. When would len(rp.ShardGroups) == 0? Is this basically checking to make sure that the retention policy has been written to at least once before pre-creating shard groups?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly. Or all shard groups have been expired out.

// No data was ever written to this group, or all groups have been deleted.
continue
}
g := rp.ShardGroups[len(rp.ShardGroups)-1] // Get the last group in time.
if !g.Deleted() && g.EndTime.Before(to) && g.EndTime.After(from) {
// Group is not deleted, will end before the future time, but is still yet to expire.
// This last check is important, so the system doesn't create shards groups wholly
// in the past.

// Create successive shard group.
nextShardGroupTime := g.EndTime.Add(1 * time.Nanosecond)
if newGroup, err := s.CreateShardGroupIfNotExists(di.Name, rp.Name, nextShardGroupTime); err != nil {
s.Logger.Printf("failed to create successive shard group for group %d: %s",
g.ID, err.Error())
} else {
s.Logger.Printf("new shard group %d successfully created for database %s, retention policy %s",
newGroup.ID, di.Name, rp.Name)
}
}

}
}
return nil
Expand Down
37 changes: 32 additions & 5 deletions meta/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,30 +489,57 @@ func TestStore_PrecreateShardGroup(t *testing.T) {
s := MustOpenStore()
defer s.Close()

// Create node, database, policy, & group.
// Create node, database, policy, & groups.
if _, err := s.CreateNode("host0"); err != nil {
t.Fatal(err)
} else if _, err := s.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 2, Duration: 1 * time.Hour}); err != nil {
t.Fatal(err)
} else if _, err := s.CreateShardGroup("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil {
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp1", ReplicaN: 2, Duration: 1 * time.Hour}); err != nil {
t.Fatal(err)
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp2", ReplicaN: 2, Duration: 1 * time.Hour}); err != nil {
t.Fatal(err)
} else if err := s.PrecreateShardGroups(time.Date(2001, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil {
} else if _, err := s.CreateShardGroup("db0", "rp0", time.Date(2001, time.January, 1, 1, 0, 0, 0, time.UTC)); err != nil {
t.Fatal(err)
} else if _, err := s.CreateShardGroup("db0", "rp1", time.Date(2000, time.January, 1, 1, 0, 0, 0, time.UTC)); err != nil {
t.Fatal(err)
}

if err := s.PrecreateShardGroups(time.Date(2001, time.January, 1, 0, 0, 0, 0, time.UTC), time.Date(2001, time.January, 1, 3, 0, 0, 0, time.UTC)); err != nil {
t.Fatal(err)
}

// rp0 should undergo precreation.
groups, err := s.ShardGroups("db0", "rp0")
if err != nil {
t.Fatal(err)
}
if len(groups) != 2 {
t.Fatalf("shard group precreation failed to create new shard group")
t.Fatalf("shard group precreation failed to create new shard group for rp0")
}
if groups[1].StartTime != time.Date(2000, time.January, 1, 1, 0, 0, 0, time.UTC) {
if groups[1].StartTime != time.Date(2001, time.January, 1, 2, 0, 0, 0, time.UTC) {
t.Fatalf("precreated shard group has wrong start time, exp %s, got %s",
time.Date(2000, time.January, 1, 1, 0, 0, 0, time.UTC), groups[1].StartTime)
}

// rp1 should not undergo precreation since it is completely in the past.
groups, err = s.ShardGroups("db0", "rp1")
if err != nil {
t.Fatal(err)
}
if len(groups) != 1 {
t.Fatalf("shard group precreation created new shard group for rp1")
}

// rp2 should not undergo precreation since it has no shards.
groups, err = s.ShardGroups("db0", "rp2")
if err != nil {
t.Fatal(err)
}
if len(groups) != 0 {
t.Fatalf("shard group precreation created new shard group for rp2")
}
}

// Ensure the store can create a new continuous query.
Expand Down
8 changes: 4 additions & 4 deletions services/precreator/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Service struct {

MetaStore interface {
IsLeader() bool
PrecreateShardGroups(cutoff time.Time) error
PrecreateShardGroups(now, cutoff time.Time) error
}
}

Expand Down Expand Up @@ -91,9 +91,9 @@ func (s *Service) runPrecreation() {
}

// precreate performs actual resource precreation.
func (s *Service) precreate(t time.Time) error {
cutoff := t.Add(s.advancePeriod).UTC()
if err := s.MetaStore.PrecreateShardGroups(cutoff); err != nil {
func (s *Service) precreate(now time.Time) error {
cutoff := now.Add(s.advancePeriod).UTC()
if err := s.MetaStore.PrecreateShardGroups(now, cutoff); err != nil {
return err
}
return nil
Expand Down
8 changes: 4 additions & 4 deletions services/precreator/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func Test_ShardPrecreation(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
ms := metaStore{
PrecreateShardGroupsFn: func(u time.Time) error {
PrecreateShardGroupsFn: func(v, u time.Time) error {
wg.Done()
if u != now.Add(advancePeriod) {
t.Fatalf("precreation called with wrong time, got %s, exp %s", u, now)
Expand Down Expand Up @@ -47,13 +47,13 @@ func Test_ShardPrecreation(t *testing.T) {

// PointsWriter represents a mock impl of PointsWriter.
type metaStore struct {
PrecreateShardGroupsFn func(cutoff time.Time) error
PrecreateShardGroupsFn func(now, cutoff time.Time) error
}

func (m metaStore) IsLeader() bool {
return true
}

func (m metaStore) PrecreateShardGroups(timestamp time.Time) error {
return m.PrecreateShardGroupsFn(timestamp)
func (m metaStore) PrecreateShardGroups(now, cutoff time.Time) error {
return m.PrecreateShardGroupsFn(now, cutoff)
}