diff --git a/meta/store.go b/meta/store.go index 2115734cc2f..c1e424bed77 100644 --- a/meta/store.go +++ b/meta/store.go @@ -997,6 +997,34 @@ func (s *Store) UserCount() (count int, err error) { return } +// PrecreateShardGroups creates shard groups whose endtime is before the cutoff time passed in. This +// avoid the need for these shards to be created when data for the corresponding time range arrives. +// Shard creation involves Raft consensus, and precreation avoids taking the hit at write-time. +func (s *Store) PrecreateShardGroups(cutoff time.Time) error { + s.read(func(data *Data) error { + for _, di := range data.Databases { + for _, rp := range di.RetentionPolicies { + for _, g := range rp.ShardGroups { + // Check to see if it is going to end before our interval + if g.EndTime.Before(cutoff) { + s.Logger.Printf("pre-creating successive shard group for group %d, database %s, policy %s", + g.ID, di.Name, rp.Name) + if newGroup, err := s.CreateShardGroupIfNotExists(di.Name, rp.Name, g.EndTime.Add(1*time.Nanosecond)); err != nil { + s.Logger.Printf("failed to create successive shard group for group %d: %s", + g.ID, err.Error()) + } else { + s.Logger.Printf("new shard group %d successfully created", newGroup.ID) + } + } + } + + } + } + return nil + }) + return nil +} + // read executes a function with the current metadata. // If an error is returned then the cache is invalidated and retried. // diff --git a/meta/store_test.go b/meta/store_test.go index 4f9248052fa..ce1382a30a3 100644 --- a/meta/store_test.go +++ b/meta/store_test.go @@ -387,6 +387,38 @@ func TestStore_DeleteShardGroup(t *testing.T) { } } +// Ensure the store correctly precreates shard groups. +func TestStore_PrecreateShardGroup(t *testing.T) { + t.Parallel() + s := MustOpenStore() + defer s.Close() + + // Create node, database, policy, & group. + if _, err := s.CreateNode("host0"); err != nil { + t.Fatal(err) + } else if _, err := s.CreateDatabase("db0"); err != nil { + t.Fatal(err) + } else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1, Duration: 1 * time.Hour}); err != nil { + t.Fatal(err) + } else if _, err := s.CreateShardGroup("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil { + t.Fatal(err) + } else if err := s.PrecreateShardGroups(time.Date(2001, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil { + t.Fatal(err) + } + + groups, err := s.ShardGroups("db0", "rp0") + if err != nil { + t.Fatal(err) + } + if len(groups) != 2 { + t.Fatalf("shard group precreation failed to create new shard group") + } + if groups[1].StartTime != time.Date(2000, time.January, 1, 1, 0, 0, 0, time.UTC) { + t.Fatalf("precreated shard group has wrong start time, exp %s, got %s", + time.Date(2000, time.January, 1, 1, 0, 0, 0, time.UTC), groups[1].StartTime) + } +} + // Ensure the store can create a new continuous query. func TestStore_CreateContinuousQuery(t *testing.T) { t.Parallel() diff --git a/services/precreator/service.go b/services/precreator/service.go index a9f320768c9..eea833072ae 100644 --- a/services/precreator/service.go +++ b/services/precreator/service.go @@ -5,8 +5,6 @@ import ( "os" "sync" "time" - - "github.com/influxdb/influxdb/meta" ) type Service struct { @@ -20,13 +18,11 @@ type Service struct { MetaStore interface { IsLeader() bool - VisitRetentionPolicies(f func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo)) - ShardGroups(database, policy string) ([]meta.ShardGroupInfo, error) - CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) + PrecreateShardGroups(cutoff time.Time) error } } -// NewService returns an instance of the Graphite service. +// NewService returns an instance of the precreation service. func NewService(c Config) (*Service, error) { s := Service{ checkInterval: time.Duration(c.CheckInterval), @@ -42,7 +38,7 @@ func (s *Service) SetLogger(l *log.Logger) { s.Logger = l } -// Open starts the shard precreation service. +// Open starts the precreation service. func (s *Service) Open() error { if s.done != nil { return nil @@ -55,7 +51,7 @@ func (s *Service) Open() error { return nil } -// Close stops the shard precreation service. +// Close stops the precreation service. func (s *Service) Close() error { if s.done == nil { return nil @@ -68,7 +64,7 @@ func (s *Service) Close() error { return nil } -// runPrecreation continually checks if shards need precreation. +// runPrecreation continually checks if resources need precreation. func (s *Service) runPrecreation() { defer s.wg.Done() @@ -81,42 +77,21 @@ func (s *Service) runPrecreation() { continue } - if _, err := s.precreate(time.Now().UTC()); err != nil { + if err := s.precreate(time.Now().UTC()); err != nil { s.Logger.Printf("failed to precreate shards: %s", err.Error()) } case <-s.done: - s.Logger.Println("shard precreation service terminating") + s.Logger.Println("precreation service terminating") return } } } -// precreate performs actual shard precreation. Returns the number of groups that were created. -func (s *Service) precreate(t time.Time) (int, error) { +// precreate performs actual resource precreation. +func (s *Service) precreate(t time.Time) error { cutoff := t.Add(s.advancePeriod).UTC() - numCreated := 0 - - s.MetaStore.VisitRetentionPolicies(func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo) { - groups, err := s.MetaStore.ShardGroups(d.Name, r.Name) - if err != nil { - s.Logger.Printf("failed to retrieve shard groups for database %s, policy %s: %s", - d.Name, r.Name, err.Error()) - return - } - for _, g := range groups { - // Check to see if it is going to end before our interval - if g.EndTime.Before(cutoff) { - s.Logger.Printf("pre-creating successive shard group for group %d, database %s, policy %s", - g.ID, d.Name, r.Name) - if newGroup, err := s.MetaStore.CreateShardGroupIfNotExists(d.Name, r.Name, g.EndTime.Add(1*time.Nanosecond)); err != nil { - s.Logger.Printf("failed to create successive shard group for group %d: %s", - g.ID, err.Error()) - } else { - numCreated++ - s.Logger.Printf("new shard group %d successfully created", newGroup.ID) - } - } - } - }) - return numCreated, nil + if err := s.MetaStore.PrecreateShardGroups(cutoff); err != nil { + return err + } + return nil } diff --git a/services/precreator/service_test.go b/services/precreator/service_test.go index 2a01006861c..da73a7ea60b 100644 --- a/services/precreator/service_test.go +++ b/services/precreator/service_test.go @@ -1,10 +1,10 @@ package precreator import ( + "sync" "testing" "time" - "github.com/influxdb/influxdb/meta" "github.com/influxdb/influxdb/toml" ) @@ -12,78 +12,48 @@ func Test_ShardPrecreation(t *testing.T) { t.Parallel() now := time.Now().UTC() + advancePeriod := 5 * time.Minute // A test metastaore which returns 2 shard groups, only 1 of which requires a successor. + var wg sync.WaitGroup + wg.Add(1) ms := metaStore{ - VisitRetentionPoliciesFn: func(f func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo)) { - f(meta.DatabaseInfo{Name: "mydb"}, meta.RetentionPolicyInfo{Name: "myrp"}) - }, - ShardGroupsFn: func(database, policy string) ([]meta.ShardGroupInfo, error) { - if database != "mydb" || policy != "myrp" { - t.Fatalf("ShardGroups called with incorrect database or policy") - } - - // Make two shard groups, 1 which needs a successor, the other does not. - groups := make([]meta.ShardGroupInfo, 2) - groups[0] = meta.ShardGroupInfo{ - ID: 1, - StartTime: now.Add(-1 * time.Hour), - EndTime: now, - } - groups[1] = meta.ShardGroupInfo{ - ID: 2, - StartTime: now, - EndTime: now.Add(time.Hour), - } - return groups, nil - }, - CreateShardGroupIfNotExistFn: func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) { - if database != "mydb" || policy != "myrp" { - t.Fatalf("ShardGroups called with incorrect database or policy") + PrecreateShardGroupsFn: func(u time.Time) error { + wg.Done() + if u != now.Add(advancePeriod) { + t.Fatalf("precreation called with wrong time, got %s, exp %s", u, now) } - return &meta.ShardGroupInfo{ID: 3}, nil + return nil }, } srv, err := NewService(Config{ CheckInterval: toml.Duration(time.Minute), - AdvancePeriod: toml.Duration(5 * time.Minute), + AdvancePeriod: toml.Duration(advancePeriod), }) if err != nil { t.Fatalf("failed to create shard precreation service: %s", err.Error()) } srv.MetaStore = ms - n, err := srv.precreate(now) + err = srv.precreate(now) if err != nil { t.Fatalf("failed to precreate shards: %s", err.Error()) } - if n != 1 { - t.Fatalf("incorrect number of shard groups created, exp 1, got %d", n) - } + wg.Wait() // Ensure metastore test function is called. return } // PointsWriter represents a mock impl of PointsWriter. type metaStore struct { - VisitRetentionPoliciesFn func(f func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo)) - ShardGroupsFn func(database, policy string) ([]meta.ShardGroupInfo, error) - CreateShardGroupIfNotExistFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) + PrecreateShardGroupsFn func(cutoff time.Time) error } func (m metaStore) IsLeader() bool { return true } -func (m metaStore) VisitRetentionPolicies(f func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo)) { - m.VisitRetentionPoliciesFn(f) -} - -func (m metaStore) ShardGroups(database, policy string) ([]meta.ShardGroupInfo, error) { - return m.ShardGroupsFn(database, policy) -} - -func (m metaStore) CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) { - return m.CreateShardGroupIfNotExistFn(database, policy, timestamp) +func (m metaStore) PrecreateShardGroups(timestamp time.Time) error { + return m.PrecreateShardGroupsFn(timestamp) }