diff --git a/CHANGELOG.md b/CHANGELOG.md index c3225c526b4..735a2aec5c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ With this release InfluxDB is moving to Go 1.5. - [#3901](https://github.com/influxdb/influxdb/pull/3901): Unblock relaxed write consistency level Thanks @takayuki! - [#3950](https://github.com/influxdb/influxdb/pull/3950): Limit bz1 quickcheck tests to 10 iterations on CI - [#3977](https://github.com/influxdb/influxdb/pull/3977): Silence wal logging during testing +- [#3931](https://github.com/influxdb/influxdb/pull/3931): Don't precreate shard groups entirely in the past ## v0.9.3 [2015-08-26] diff --git a/meta/store.go b/meta/store.go index 23bac17f218..705849d9d17 100644 --- a/meta/store.go +++ b/meta/store.go @@ -1389,38 +1389,34 @@ func (s *Store) UserCount() (count int, err error) { return } -// PrecreateShardGroups creates shard groups whose endtime is before the cutoff time passed in. This -// avoid the need for these shards to be created when data for the corresponding time range arrives. -// Shard creation involves Raft consensus, and precreation avoids taking the hit at write-time. -func (s *Store) PrecreateShardGroups(cutoff time.Time) error { +// PrecreateShardGroups creates shard groups whose endtime is before the 'to' time passed in, but +// is yet to expire before 'from'. This is to avoid the need for these shards to be created when data +// for the corresponding time range arrives. Shard creation involves Raft consensus, and precreation +// avoids taking the hit at write-time. +func (s *Store) PrecreateShardGroups(from, to time.Time) error { s.read(func(data *Data) error { for _, di := range data.Databases { for _, rp := range di.RetentionPolicies { - for _, g := range rp.ShardGroups { - // Check to see if it is not deleted and going to end before our interval - if !g.Deleted() && g.EndTime.Before(cutoff) { - nextShardGroupTime := g.EndTime.Add(1 * time.Nanosecond) - - // Check if successive shard group exists. - if sgi, err := s.ShardGroupByTimestamp(di.Name, rp.Name, nextShardGroupTime); err != nil { - s.Logger.Printf("failed to check if successive shard group for group exists %d: %s", - g.ID, err.Error()) - continue - } else if sgi != nil && !sgi.Deleted() { - continue - } - - // It doesn't. Create it. - if newGroup, err := s.CreateShardGroupIfNotExists(di.Name, rp.Name, nextShardGroupTime); err != nil { - s.Logger.Printf("failed to create successive shard group for group %d: %s", - g.ID, err.Error()) - } else { - s.Logger.Printf("new shard group %d successfully created for database %s, retention policy %s", - newGroup.ID, di.Name, rp.Name) - } + if len(rp.ShardGroups) == 0 { + // No data was ever written to this group, or all groups have been deleted. + continue + } + g := rp.ShardGroups[len(rp.ShardGroups)-1] // Get the last group in time. + if !g.Deleted() && g.EndTime.Before(to) && g.EndTime.After(from) { + // Group is not deleted, will end before the future time, but is still yet to expire. + // This last check is important, so the system doesn't create shards groups wholly + // in the past. + + // Create successive shard group. + nextShardGroupTime := g.EndTime.Add(1 * time.Nanosecond) + if newGroup, err := s.CreateShardGroupIfNotExists(di.Name, rp.Name, nextShardGroupTime); err != nil { + s.Logger.Printf("failed to create successive shard group for group %d: %s", + g.ID, err.Error()) + } else { + s.Logger.Printf("new shard group %d successfully created for database %s, retention policy %s", + newGroup.ID, di.Name, rp.Name) } } - } } return nil diff --git a/meta/store_test.go b/meta/store_test.go index ef79306b756..622e037c89d 100644 --- a/meta/store_test.go +++ b/meta/store_test.go @@ -489,30 +489,57 @@ func TestStore_PrecreateShardGroup(t *testing.T) { s := MustOpenStore() defer s.Close() - // Create node, database, policy, & group. + // Create node, database, policy, & groups. if _, err := s.CreateNode("host0"); err != nil { t.Fatal(err) } else if _, err := s.CreateDatabase("db0"); err != nil { t.Fatal(err) } else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 2, Duration: 1 * time.Hour}); err != nil { t.Fatal(err) - } else if _, err := s.CreateShardGroup("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil { + } else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp1", ReplicaN: 2, Duration: 1 * time.Hour}); err != nil { + t.Fatal(err) + } else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp2", ReplicaN: 2, Duration: 1 * time.Hour}); err != nil { t.Fatal(err) - } else if err := s.PrecreateShardGroups(time.Date(2001, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil { + } else if _, err := s.CreateShardGroup("db0", "rp0", time.Date(2001, time.January, 1, 1, 0, 0, 0, time.UTC)); err != nil { + t.Fatal(err) + } else if _, err := s.CreateShardGroup("db0", "rp1", time.Date(2000, time.January, 1, 1, 0, 0, 0, time.UTC)); err != nil { t.Fatal(err) } + if err := s.PrecreateShardGroups(time.Date(2001, time.January, 1, 0, 0, 0, 0, time.UTC), time.Date(2001, time.January, 1, 3, 0, 0, 0, time.UTC)); err != nil { + t.Fatal(err) + } + + // rp0 should undergo precreation. groups, err := s.ShardGroups("db0", "rp0") if err != nil { t.Fatal(err) } if len(groups) != 2 { - t.Fatalf("shard group precreation failed to create new shard group") + t.Fatalf("shard group precreation failed to create new shard group for rp0") } - if groups[1].StartTime != time.Date(2000, time.January, 1, 1, 0, 0, 0, time.UTC) { + if groups[1].StartTime != time.Date(2001, time.January, 1, 2, 0, 0, 0, time.UTC) { t.Fatalf("precreated shard group has wrong start time, exp %s, got %s", time.Date(2000, time.January, 1, 1, 0, 0, 0, time.UTC), groups[1].StartTime) } + + // rp1 should not undergo precreation since it is completely in the past. + groups, err = s.ShardGroups("db0", "rp1") + if err != nil { + t.Fatal(err) + } + if len(groups) != 1 { + t.Fatalf("shard group precreation created new shard group for rp1") + } + + // rp2 should not undergo precreation since it has no shards. + groups, err = s.ShardGroups("db0", "rp2") + if err != nil { + t.Fatal(err) + } + if len(groups) != 0 { + t.Fatalf("shard group precreation created new shard group for rp2") + } } // Ensure the store can create a new continuous query. diff --git a/services/precreator/service.go b/services/precreator/service.go index 180b5ec9a93..9643a3ea2e0 100644 --- a/services/precreator/service.go +++ b/services/precreator/service.go @@ -18,7 +18,7 @@ type Service struct { MetaStore interface { IsLeader() bool - PrecreateShardGroups(cutoff time.Time) error + PrecreateShardGroups(now, cutoff time.Time) error } } @@ -91,9 +91,9 @@ func (s *Service) runPrecreation() { } // precreate performs actual resource precreation. -func (s *Service) precreate(t time.Time) error { - cutoff := t.Add(s.advancePeriod).UTC() - if err := s.MetaStore.PrecreateShardGroups(cutoff); err != nil { +func (s *Service) precreate(now time.Time) error { + cutoff := now.Add(s.advancePeriod).UTC() + if err := s.MetaStore.PrecreateShardGroups(now, cutoff); err != nil { return err } return nil diff --git a/services/precreator/service_test.go b/services/precreator/service_test.go index da73a7ea60b..d8667ba4cd4 100644 --- a/services/precreator/service_test.go +++ b/services/precreator/service_test.go @@ -18,7 +18,7 @@ func Test_ShardPrecreation(t *testing.T) { var wg sync.WaitGroup wg.Add(1) ms := metaStore{ - PrecreateShardGroupsFn: func(u time.Time) error { + PrecreateShardGroupsFn: func(v, u time.Time) error { wg.Done() if u != now.Add(advancePeriod) { t.Fatalf("precreation called with wrong time, got %s, exp %s", u, now) @@ -47,13 +47,13 @@ func Test_ShardPrecreation(t *testing.T) { // PointsWriter represents a mock impl of PointsWriter. type metaStore struct { - PrecreateShardGroupsFn func(cutoff time.Time) error + PrecreateShardGroupsFn func(now, cutoff time.Time) error } func (m metaStore) IsLeader() bool { return true } -func (m metaStore) PrecreateShardGroups(timestamp time.Time) error { - return m.PrecreateShardGroupsFn(timestamp) +func (m metaStore) PrecreateShardGroups(now, cutoff time.Time) error { + return m.PrecreateShardGroupsFn(now, cutoff) }