From b5ad92be70801df0d7a33498eeb0e2e0600704e2 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Wed, 10 Jun 2015 10:30:23 -0700 Subject: [PATCH 1/8] Create shard precreation service Initial commit includes basic configuration control. --- services/shard_precreation/config.go | 32 +++++++++++++++++++++++ services/shard_precreation/config_test.go | 31 ++++++++++++++++++++++ 2 files changed, 63 insertions(+) create mode 100644 services/shard_precreation/config.go create mode 100644 services/shard_precreation/config_test.go diff --git a/services/shard_precreation/config.go b/services/shard_precreation/config.go new file mode 100644 index 00000000000..8213fc197df --- /dev/null +++ b/services/shard_precreation/config.go @@ -0,0 +1,32 @@ +package shard_precreation + +import ( + "time" + + "github.com/influxdb/influxdb/toml" +) + +const ( + // DefaultCheckInterval is the shard precreation check time if none is specified. + DefaultCheckInterval = 10 * time.Minute + + // DefaultAdvancePeriod is the default period ahead of the endtime of a shard group + // that its successor group is created. + DefaultAdvancePeriod = 30 * time.Minute +) + +// Config represents the configuration for shard precreation. +type Config struct { + Enabled bool `toml:"enabled"` + CheckInterval toml.Duration `toml:"check-interval"` + AdvancePeriod toml.Duration `toml:"advance-period"` +} + +// NewConfig returns a new Config with defaults. +func NewConfig() Config { + return Config{ + Enabled: true, + CheckInterval: toml.Duration(DefaultCheckInterval), + AdvancePeriod: toml.Duration(DefaultAdvancePeriod), + } +} diff --git a/services/shard_precreation/config_test.go b/services/shard_precreation/config_test.go new file mode 100644 index 00000000000..1dcbdff944a --- /dev/null +++ b/services/shard_precreation/config_test.go @@ -0,0 +1,31 @@ +package shard_precreation_test + +import ( + "testing" + "time" + + "github.com/BurntSushi/toml" + "github.com/influxdb/influxdb/services/shard_precreation" +) + +func TestConfig_Parse(t *testing.T) { + // Parse configuration. + var c shard_precreation.Config + if _, err := toml.Decode(` +enabled = true +check-interval = "2m" +advance-period = "10m" +`, &c); err != nil { + + t.Fatal(err) + } + + // Validate configuration. + if !c.Enabled { + t.Fatalf("unexpected enabled state: %v", c.Enabled) + } else if time.Duration(c.CheckInterval) != 2*time.Minute { + t.Fatalf("unexpected check interval: %s", c.CheckInterval) + } else if time.Duration(c.AdvancePeriod) != 10*time.Minute { + t.Fatalf("unexpected advance period: %s", c.AdvancePeriod) + } +} From 3b74f753a8e1017bc42f0d3f19c6cabcb72d1216 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Wed, 10 Jun 2015 10:56:53 -0700 Subject: [PATCH 2/8] Add short design notes on shard precreation --- services/shard_precreation/notes.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 services/shard_precreation/notes.md diff --git a/services/shard_precreation/notes.md b/services/shard_precreation/notes.md new file mode 100644 index 00000000000..8830b7310c4 --- /dev/null +++ b/services/shard_precreation/notes.md @@ -0,0 +1,13 @@ +Shard Precreation +============ + +During normal operation when InfluxDB receives time-series data, it writes the data to files known as _shards_. Each shard only contains data for a specific range of time. Therefore, before data can be accepted by the system, the shards must exist and InfluxDB always checks that the required shards exist for every incoming data point. If the required shards do not exist, InfluxDB will create those shards. Because this requires a cluster to reach consensus, the process is not instantaneous and can temporarily impact write-throughput. + +Since almost all time-series data is written sequentially in time, the system has an excellent idea of the timestamps of future data. Shard precreation takes advantage of this fact by creating required shards ahead of time, thereby ensuring the required shards exist by the time new time-series data actually arrives. Write-throughput is therefore not affected when data is first received for a range of time that would normally trigger shard creation. + +Note that the shard-existence check must remain in place in the code, even with shard precreation. This is because while most data is written sequentially in time, this is not always the case. Data may be written with timestamps in the past, or farther in the future than shard precreation handles. + +## Configuration +Shard precreation can be disabled if necessary, though this is not recommended. If it is disabled, then shards will be only be created when explicitly needed. + +The interval between runs of the shard precreation service, as well as the time-in-advance the shards are created, are also configurable. The defaults should work for most deployments. From 481e1860a096ba621a56e3b045913464751cb24b Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Wed, 10 Jun 2015 11:13:17 -0700 Subject: [PATCH 3/8] Remove obsolete shard precreation source --- meta/shard_group_precreator.go | 74 ---------------------------------- 1 file changed, 74 deletions(-) delete mode 100644 meta/shard_group_precreator.go diff --git a/meta/shard_group_precreator.go b/meta/shard_group_precreator.go deleted file mode 100644 index 81470788571..00000000000 --- a/meta/shard_group_precreator.go +++ /dev/null @@ -1,74 +0,0 @@ -package meta - -type ShardGroupPrecreator struct{} - -/* -// StartShardGroupsPreCreate launches shard group pre-create to avoid write bottlenecks. -func (s *Server) StartShardGroupsPreCreate(checkInterval time.Duration) error { - if checkInterval == 0 { - return fmt.Errorf("shard group pre-create check interval must be non-zero") - } - sgpcDone := make(chan struct{}, 0) - s.sgpcDone = sgpcDone - go func() { - for { - select { - case <-sgpcDone: - return - case <-time.After(checkInterval): - s.ShardGroupPreCreate(checkInterval) - } - } - }() - return nil -} - -// ShardGroupPreCreate ensures that future shard groups and shards are created and ready for writing -// is removed from the server. -func (s *Server) ShardGroupPreCreate(checkInterval time.Duration) { - panic("not yet implemented") - - log.Println("shard group pre-create check commencing") - - // For safety, we double the check interval to ensure we have enough time to create all shard groups - // before they are needed, but as close to needed as possible. - // This is a complete punt on optimization - cutoff := time.Now().Add(checkInterval * 2).UTC() - - type group struct { - Database string - Retention string - ID uint64 - Time time.Time - } - - var groups []group - // Only keep the lock while walking the shard groups, so the lock is not held while - // any deletions take place across the cluster. - func() { - s.mu.RLock() - defer s.mu.RUnlock() - - // Check all shard groups. - // See if they have a "future" shard group ready to write to - // If not, create the next shard group, as well as each shard for the shardGroup - for _, db := range s.databases { - for _, rp := range db.policies { - for _, g := range rp.shardGroups { - // Check to see if it is going to end before our interval - if g.EndTime.Before(cutoff) { - log.Printf("pre-creating shard group for %d, retention policy %s, database %s", g.ID, rp.Name, db.name) - groups = append(groups, group{Database: db.name, Retention: rp.Name, ID: g.ID, Time: g.EndTime.Add(1 * time.Nanosecond)}) - } - } - } - } - }() - - for _, g := range groups { - if err := s.CreateShardGroupIfNotExists(g.Database, g.Retention, g.Time); err != nil { - log.Printf("unable to request pre-creation of shard group %d for time %s: %s", g.ID, g.Time, err.Error()) - } - } -} -*/ From dda073c19e3179350758da81517eaa365eed29d7 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Wed, 10 Jun 2015 11:12:25 -0700 Subject: [PATCH 4/8] Add shard precreation service --- services/shard_precreation/service.go | 122 ++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) create mode 100644 services/shard_precreation/service.go diff --git a/services/shard_precreation/service.go b/services/shard_precreation/service.go new file mode 100644 index 00000000000..1d7edfb8549 --- /dev/null +++ b/services/shard_precreation/service.go @@ -0,0 +1,122 @@ +package shard_precreation + +import ( + "log" + "os" + "sync" + "time" + + "github.com/influxdb/influxdb/meta" +) + +type Service struct { + checkInterval time.Duration + advancePeriod time.Duration + + Logger *log.Logger + + done chan struct{} + wg sync.WaitGroup + + MetaStore interface { + IsLeader() bool + VisitRetentionPolicies(f func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo)) + ShardGroups(database, policy string) ([]meta.ShardGroupInfo, error) + CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) + } +} + +// NewService returns an instance of the Graphite service. +func NewService(c Config) (*Service, error) { + s := Service{ + checkInterval: time.Duration(c.CheckInterval), + advancePeriod: time.Duration(c.AdvancePeriod), + Logger: log.New(os.Stderr, "[shard-precreation] ", log.LstdFlags), + } + + return &s, nil +} + +// SetLogger sets the internal logger to the logger passed in. +func (s *Service) SetLogger(l *log.Logger) { + s.Logger = l +} + +// Open starts the shard precreation service. +func (s *Service) Open() error { + if s.done != nil { + return nil + } + + s.done = make(chan struct{}) + + s.wg.Add(1) + go s.runPrecreation() + return nil +} + +// Close stops the shard precreation service. +func (s *Service) Close() error { + if s.done == nil { + return nil + } + + close(s.done) + s.wg.Wait() + s.done = nil + + return nil +} + +// runPrecreation continually checks if shards need precreation. +func (s *Service) runPrecreation() { + defer s.wg.Done() + + for { + select { + case <-time.After(s.checkInterval): + // Only run this on the leader, but always allow the loop to check + // as the leader can change. + if !s.MetaStore.IsLeader() { + continue + } + + if _, err := s.precreate(time.Now().UTC()); err != nil { + s.Logger.Printf("failed to precreate shards: %s", err.Error()) + } + case <-s.done: + s.Logger.Println("shard precreation service terminating") + return + } + } +} + +// precreate performs actual shard precreation. Returns the number of groups that were created. +func (s *Service) precreate(t time.Time) (int, error) { + cutoff := t.Add(s.advancePeriod).UTC() + numCreated := 0 + + s.MetaStore.VisitRetentionPolicies(func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo) { + groups, err := s.MetaStore.ShardGroups(d.Name, r.Name) + if err != nil { + s.Logger.Printf("failed to retrieve shard groups for database %s, policy %s: %s", + d.Name, r.Name, err.Error()) + return + } + for _, g := range groups { + // Check to see if it is going to end before our interval + if g.EndTime.Before(cutoff) { + s.Logger.Printf("pre-creating successive shard group for group %d, database %s, policy %s", + g.ID, d.Name, r.Name) + if newGroup, err := s.MetaStore.CreateShardGroupIfNotExists(d.Name, r.Name, g.EndTime.Add(1*time.Nanosecond)); err != nil { + s.Logger.Printf("failed to create successive shard group for group %d: %s", + g.ID, err.Error()) + } else { + numCreated++ + s.Logger.Printf("new shard group %d successfully created", newGroup.ID) + } + } + } + }) + return numCreated, nil +} From eb966989755f84f8ab2dec43299ff002ceef612b Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Wed, 10 Jun 2015 12:56:22 -0700 Subject: [PATCH 5/8] Unit-test shard precreation --- CHANGELOG.md | 1 + services/shard_precreation/service_test.go | 89 ++++++++++++++++++++++ 2 files changed, 90 insertions(+) create mode 100644 services/shard_precreation/service_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 1fc854401a7..b5b58906a7f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ - [#2857](https://github.com/influxdb/influxdb/issues/2857): Fix parsing commas in string field values. - [#2833](https://github.com/influxdb/influxdb/pull/2833): Make the default config valid. - [#2859](https://github.com/influxdb/influxdb/pull/2859): Fix panic on aggregate functions. +- [#2878](https://github.com/influxdb/influxdb/pull/2878): Re-enable shard precreation. ### Features - [2858](https://github.com/influxdb/influxdb/pull/2858): Support setting openTSDB write consistency. diff --git a/services/shard_precreation/service_test.go b/services/shard_precreation/service_test.go new file mode 100644 index 00000000000..3d824fa7774 --- /dev/null +++ b/services/shard_precreation/service_test.go @@ -0,0 +1,89 @@ +package shard_precreation + +import ( + "testing" + "time" + + "github.com/influxdb/influxdb/meta" + "github.com/influxdb/influxdb/toml" +) + +func Test_ShardPrecreation(t *testing.T) { + t.Parallel() + + now := time.Now().UTC() + + // A test metastaore which returns 2 shard groups, only 1 of which requires a successor. + ms := metaStore{ + VisitRetentionPoliciesFn: func(f func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo)) { + f(meta.DatabaseInfo{Name: "mydb"}, meta.RetentionPolicyInfo{Name: "myrp"}) + }, + ShardGroupsFn: func(database, policy string) ([]meta.ShardGroupInfo, error) { + if database != "mydb" || policy != "myrp" { + t.Fatalf("ShardGroups called with incorrect database or policy") + } + + // Make two shard groups, 1 which needs a successor, the other does not. + groups := make([]meta.ShardGroupInfo, 2) + groups[0] = meta.ShardGroupInfo{ + ID: 1, + StartTime: now.Add(-1 * time.Hour), + EndTime: now, + } + groups[1] = meta.ShardGroupInfo{ + ID: 2, + StartTime: now, + EndTime: now.Add(time.Hour), + } + return groups, nil + }, + CreateShardGroupIfNotExistFn: func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) { + if database != "mydb" || policy != "myrp" { + t.Fatalf("ShardGroups called with incorrect database or policy") + } + return &meta.ShardGroupInfo{ID: 3}, nil + }, + } + + srv, err := NewService(Config{ + CheckInterval: toml.Duration(time.Minute), + AdvancePeriod: toml.Duration(5 * time.Minute), + }) + if err != nil { + t.Fatalf("failed to create shard precreation service: %s", err.Error()) + } + srv.MetaStore = ms + + n, err := srv.precreate(now) + if err != nil { + t.Fatalf("failed to precreate shards: %s", err.Error()) + } + if n != 1 { + t.Fatalf("incorrect number of shard groups created, exp 1, got %d", n) + } + + return +} + +// PointsWriter represents a mock impl of PointsWriter. +type metaStore struct { + VisitRetentionPoliciesFn func(f func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo)) + ShardGroupsFn func(database, policy string) ([]meta.ShardGroupInfo, error) + CreateShardGroupIfNotExistFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) +} + +func (m metaStore) IsLeader() bool { + return true +} + +func (m metaStore) VisitRetentionPolicies(f func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo)) { + m.VisitRetentionPoliciesFn(f) +} + +func (m metaStore) ShardGroups(database, policy string) ([]meta.ShardGroupInfo, error) { + return m.ShardGroupsFn(database, policy) +} + +func (m metaStore) CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) { + return m.CreateShardGroupIfNotExistFn(database, policy, timestamp) +} From 1ddab4627f9120c21cf3ad1bd21a15b527f2286f Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Wed, 10 Jun 2015 13:05:48 -0700 Subject: [PATCH 6/8] Start the shard-precreation service --- cmd/influxd/run/config.go | 11 +++++++---- cmd/influxd/run/server.go | 16 ++++++++++++++++ 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/cmd/influxd/run/config.go b/cmd/influxd/run/config.go index 81a67f8dede..06b05e8c1c2 100644 --- a/cmd/influxd/run/config.go +++ b/cmd/influxd/run/config.go @@ -17,16 +17,18 @@ import ( "github.com/influxdb/influxdb/services/monitor" "github.com/influxdb/influxdb/services/opentsdb" "github.com/influxdb/influxdb/services/retention" + "github.com/influxdb/influxdb/services/shard_precreation" "github.com/influxdb/influxdb/services/udp" "github.com/influxdb/influxdb/tsdb" ) // Config represents the configuration format for the influxd binary. type Config struct { - Meta meta.Config `toml:"meta"` - Data tsdb.Config `toml:"data"` - Cluster cluster.Config `toml:"cluster"` - Retention retention.Config `toml:"retention"` + Meta meta.Config `toml:"meta"` + Data tsdb.Config `toml:"data"` + Cluster cluster.Config `toml:"cluster"` + Retention retention.Config `toml:"retention"` + ShardPrecreation shard_precreation.Config `toml:"shard-precreation"` Admin admin.Config `toml:"admin"` HTTPD httpd.Config `toml:"http"` @@ -48,6 +50,7 @@ func NewConfig() *Config { c.Meta = meta.NewConfig() c.Data = tsdb.NewConfig() c.Cluster = cluster.NewConfig() + c.ShardPrecreation = shard_precreation.NewConfig() c.Admin = admin.NewConfig() c.HTTPD = httpd.NewConfig() diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index f8c85a4c374..95d164235ed 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -15,6 +15,7 @@ import ( "github.com/influxdb/influxdb/services/httpd" "github.com/influxdb/influxdb/services/opentsdb" "github.com/influxdb/influxdb/services/retention" + "github.com/influxdb/influxdb/services/shard_precreation" "github.com/influxdb/influxdb/services/udp" "github.com/influxdb/influxdb/tcp" "github.com/influxdb/influxdb/tsdb" @@ -76,6 +77,7 @@ func NewServer(c *Config) (*Server, error) { // Append services. s.appendClusterService(c.Cluster) + s.appendShardPrecreationService(c.ShardPrecreation) s.appendAdminService(c.Admin) s.appendContinuousQueryService(c.ContinuousQuery) s.appendHTTPDService(c.HTTPD) @@ -174,6 +176,20 @@ func (s *Server) appendGraphiteService(c graphite.Config) error { return nil } +func (s *Server) appendShardPrecreationService(c shard_precreation.Config) error { + if !c.Enabled { + return nil + } + srv, err := shard_precreation.NewService(c) + if err != nil { + return err + } + + srv.MetaStore = s.MetaStore + s.Services = append(s.Services, srv) + return nil +} + func (s *Server) appendUDPService(c udp.Config) { if !c.Enabled { return From f679c81136b6ed93ee04b92ce3281e98c3eee7b4 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Wed, 10 Jun 2015 14:53:12 -0700 Subject: [PATCH 7/8] Simply use "precreator" for shard precreation --- cmd/influxd/run/config.go | 14 +++++++------- cmd/influxd/run/server.go | 8 ++++---- .../{shard_precreation => precreator}/config.go | 2 +- .../config_test.go | 6 +++--- .../{shard_precreation => precreator}/notes.md | 0 .../{shard_precreation => precreator}/service.go | 2 +- .../service_test.go | 2 +- 7 files changed, 17 insertions(+), 17 deletions(-) rename services/{shard_precreation => precreator}/config.go (96%) rename services/{shard_precreation => precreator}/config_test.go (83%) rename services/{shard_precreation => precreator}/notes.md (100%) rename services/{shard_precreation => precreator}/service.go (99%) rename services/{shard_precreation => precreator}/service_test.go (99%) diff --git a/cmd/influxd/run/config.go b/cmd/influxd/run/config.go index 06b05e8c1c2..898425b2a65 100644 --- a/cmd/influxd/run/config.go +++ b/cmd/influxd/run/config.go @@ -16,19 +16,19 @@ import ( "github.com/influxdb/influxdb/services/httpd" "github.com/influxdb/influxdb/services/monitor" "github.com/influxdb/influxdb/services/opentsdb" + "github.com/influxdb/influxdb/services/precreator" "github.com/influxdb/influxdb/services/retention" - "github.com/influxdb/influxdb/services/shard_precreation" "github.com/influxdb/influxdb/services/udp" "github.com/influxdb/influxdb/tsdb" ) // Config represents the configuration format for the influxd binary. type Config struct { - Meta meta.Config `toml:"meta"` - Data tsdb.Config `toml:"data"` - Cluster cluster.Config `toml:"cluster"` - Retention retention.Config `toml:"retention"` - ShardPrecreation shard_precreation.Config `toml:"shard-precreation"` + Meta meta.Config `toml:"meta"` + Data tsdb.Config `toml:"data"` + Cluster cluster.Config `toml:"cluster"` + Retention retention.Config `toml:"retention"` + Precreator precreator.Config `toml:"shard-precreation"` Admin admin.Config `toml:"admin"` HTTPD httpd.Config `toml:"http"` @@ -50,7 +50,7 @@ func NewConfig() *Config { c.Meta = meta.NewConfig() c.Data = tsdb.NewConfig() c.Cluster = cluster.NewConfig() - c.ShardPrecreation = shard_precreation.NewConfig() + c.Precreator = precreator.NewConfig() c.Admin = admin.NewConfig() c.HTTPD = httpd.NewConfig() diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 95d164235ed..db4516bb640 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -14,8 +14,8 @@ import ( "github.com/influxdb/influxdb/services/hh" "github.com/influxdb/influxdb/services/httpd" "github.com/influxdb/influxdb/services/opentsdb" + "github.com/influxdb/influxdb/services/precreator" "github.com/influxdb/influxdb/services/retention" - "github.com/influxdb/influxdb/services/shard_precreation" "github.com/influxdb/influxdb/services/udp" "github.com/influxdb/influxdb/tcp" "github.com/influxdb/influxdb/tsdb" @@ -77,7 +77,7 @@ func NewServer(c *Config) (*Server, error) { // Append services. s.appendClusterService(c.Cluster) - s.appendShardPrecreationService(c.ShardPrecreation) + s.appendPrecreatorService(c.Precreator) s.appendAdminService(c.Admin) s.appendContinuousQueryService(c.ContinuousQuery) s.appendHTTPDService(c.HTTPD) @@ -176,11 +176,11 @@ func (s *Server) appendGraphiteService(c graphite.Config) error { return nil } -func (s *Server) appendShardPrecreationService(c shard_precreation.Config) error { +func (s *Server) appendPrecreatorService(c precreator.Config) error { if !c.Enabled { return nil } - srv, err := shard_precreation.NewService(c) + srv, err := precreator.NewService(c) if err != nil { return err } diff --git a/services/shard_precreation/config.go b/services/precreator/config.go similarity index 96% rename from services/shard_precreation/config.go rename to services/precreator/config.go index 8213fc197df..af0f34b16e6 100644 --- a/services/shard_precreation/config.go +++ b/services/precreator/config.go @@ -1,4 +1,4 @@ -package shard_precreation +package precreator import ( "time" diff --git a/services/shard_precreation/config_test.go b/services/precreator/config_test.go similarity index 83% rename from services/shard_precreation/config_test.go rename to services/precreator/config_test.go index 1dcbdff944a..e247672830f 100644 --- a/services/shard_precreation/config_test.go +++ b/services/precreator/config_test.go @@ -1,16 +1,16 @@ -package shard_precreation_test +package precreator_test import ( "testing" "time" "github.com/BurntSushi/toml" - "github.com/influxdb/influxdb/services/shard_precreation" + "github.com/influxdb/influxdb/services/precreator" ) func TestConfig_Parse(t *testing.T) { // Parse configuration. - var c shard_precreation.Config + var c precreator.Config if _, err := toml.Decode(` enabled = true check-interval = "2m" diff --git a/services/shard_precreation/notes.md b/services/precreator/notes.md similarity index 100% rename from services/shard_precreation/notes.md rename to services/precreator/notes.md diff --git a/services/shard_precreation/service.go b/services/precreator/service.go similarity index 99% rename from services/shard_precreation/service.go rename to services/precreator/service.go index 1d7edfb8549..a9f320768c9 100644 --- a/services/shard_precreation/service.go +++ b/services/precreator/service.go @@ -1,4 +1,4 @@ -package shard_precreation +package precreator import ( "log" diff --git a/services/shard_precreation/service_test.go b/services/precreator/service_test.go similarity index 99% rename from services/shard_precreation/service_test.go rename to services/precreator/service_test.go index 3d824fa7774..2a01006861c 100644 --- a/services/shard_precreation/service_test.go +++ b/services/precreator/service_test.go @@ -1,4 +1,4 @@ -package shard_precreation +package precreator import ( "testing" From 5798d996925845ce136b8064e008132341735f9b Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Wed, 10 Jun 2015 15:26:24 -0700 Subject: [PATCH 8/8] Move shard precreation logic to meta/store --- meta/store.go | 28 ++++++++++++++ meta/store_test.go | 32 +++++++++++++++ services/precreator/service.go | 51 +++++++----------------- services/precreator/service_test.go | 60 ++++++++--------------------- 4 files changed, 88 insertions(+), 83 deletions(-) diff --git a/meta/store.go b/meta/store.go index 2115734cc2f..c1e424bed77 100644 --- a/meta/store.go +++ b/meta/store.go @@ -997,6 +997,34 @@ func (s *Store) UserCount() (count int, err error) { return } +// PrecreateShardGroups creates shard groups whose endtime is before the cutoff time passed in. This +// avoid the need for these shards to be created when data for the corresponding time range arrives. +// Shard creation involves Raft consensus, and precreation avoids taking the hit at write-time. +func (s *Store) PrecreateShardGroups(cutoff time.Time) error { + s.read(func(data *Data) error { + for _, di := range data.Databases { + for _, rp := range di.RetentionPolicies { + for _, g := range rp.ShardGroups { + // Check to see if it is going to end before our interval + if g.EndTime.Before(cutoff) { + s.Logger.Printf("pre-creating successive shard group for group %d, database %s, policy %s", + g.ID, di.Name, rp.Name) + if newGroup, err := s.CreateShardGroupIfNotExists(di.Name, rp.Name, g.EndTime.Add(1*time.Nanosecond)); err != nil { + s.Logger.Printf("failed to create successive shard group for group %d: %s", + g.ID, err.Error()) + } else { + s.Logger.Printf("new shard group %d successfully created", newGroup.ID) + } + } + } + + } + } + return nil + }) + return nil +} + // read executes a function with the current metadata. // If an error is returned then the cache is invalidated and retried. // diff --git a/meta/store_test.go b/meta/store_test.go index 4f9248052fa..ce1382a30a3 100644 --- a/meta/store_test.go +++ b/meta/store_test.go @@ -387,6 +387,38 @@ func TestStore_DeleteShardGroup(t *testing.T) { } } +// Ensure the store correctly precreates shard groups. +func TestStore_PrecreateShardGroup(t *testing.T) { + t.Parallel() + s := MustOpenStore() + defer s.Close() + + // Create node, database, policy, & group. + if _, err := s.CreateNode("host0"); err != nil { + t.Fatal(err) + } else if _, err := s.CreateDatabase("db0"); err != nil { + t.Fatal(err) + } else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1, Duration: 1 * time.Hour}); err != nil { + t.Fatal(err) + } else if _, err := s.CreateShardGroup("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil { + t.Fatal(err) + } else if err := s.PrecreateShardGroups(time.Date(2001, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil { + t.Fatal(err) + } + + groups, err := s.ShardGroups("db0", "rp0") + if err != nil { + t.Fatal(err) + } + if len(groups) != 2 { + t.Fatalf("shard group precreation failed to create new shard group") + } + if groups[1].StartTime != time.Date(2000, time.January, 1, 1, 0, 0, 0, time.UTC) { + t.Fatalf("precreated shard group has wrong start time, exp %s, got %s", + time.Date(2000, time.January, 1, 1, 0, 0, 0, time.UTC), groups[1].StartTime) + } +} + // Ensure the store can create a new continuous query. func TestStore_CreateContinuousQuery(t *testing.T) { t.Parallel() diff --git a/services/precreator/service.go b/services/precreator/service.go index a9f320768c9..eea833072ae 100644 --- a/services/precreator/service.go +++ b/services/precreator/service.go @@ -5,8 +5,6 @@ import ( "os" "sync" "time" - - "github.com/influxdb/influxdb/meta" ) type Service struct { @@ -20,13 +18,11 @@ type Service struct { MetaStore interface { IsLeader() bool - VisitRetentionPolicies(f func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo)) - ShardGroups(database, policy string) ([]meta.ShardGroupInfo, error) - CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) + PrecreateShardGroups(cutoff time.Time) error } } -// NewService returns an instance of the Graphite service. +// NewService returns an instance of the precreation service. func NewService(c Config) (*Service, error) { s := Service{ checkInterval: time.Duration(c.CheckInterval), @@ -42,7 +38,7 @@ func (s *Service) SetLogger(l *log.Logger) { s.Logger = l } -// Open starts the shard precreation service. +// Open starts the precreation service. func (s *Service) Open() error { if s.done != nil { return nil @@ -55,7 +51,7 @@ func (s *Service) Open() error { return nil } -// Close stops the shard precreation service. +// Close stops the precreation service. func (s *Service) Close() error { if s.done == nil { return nil @@ -68,7 +64,7 @@ func (s *Service) Close() error { return nil } -// runPrecreation continually checks if shards need precreation. +// runPrecreation continually checks if resources need precreation. func (s *Service) runPrecreation() { defer s.wg.Done() @@ -81,42 +77,21 @@ func (s *Service) runPrecreation() { continue } - if _, err := s.precreate(time.Now().UTC()); err != nil { + if err := s.precreate(time.Now().UTC()); err != nil { s.Logger.Printf("failed to precreate shards: %s", err.Error()) } case <-s.done: - s.Logger.Println("shard precreation service terminating") + s.Logger.Println("precreation service terminating") return } } } -// precreate performs actual shard precreation. Returns the number of groups that were created. -func (s *Service) precreate(t time.Time) (int, error) { +// precreate performs actual resource precreation. +func (s *Service) precreate(t time.Time) error { cutoff := t.Add(s.advancePeriod).UTC() - numCreated := 0 - - s.MetaStore.VisitRetentionPolicies(func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo) { - groups, err := s.MetaStore.ShardGroups(d.Name, r.Name) - if err != nil { - s.Logger.Printf("failed to retrieve shard groups for database %s, policy %s: %s", - d.Name, r.Name, err.Error()) - return - } - for _, g := range groups { - // Check to see if it is going to end before our interval - if g.EndTime.Before(cutoff) { - s.Logger.Printf("pre-creating successive shard group for group %d, database %s, policy %s", - g.ID, d.Name, r.Name) - if newGroup, err := s.MetaStore.CreateShardGroupIfNotExists(d.Name, r.Name, g.EndTime.Add(1*time.Nanosecond)); err != nil { - s.Logger.Printf("failed to create successive shard group for group %d: %s", - g.ID, err.Error()) - } else { - numCreated++ - s.Logger.Printf("new shard group %d successfully created", newGroup.ID) - } - } - } - }) - return numCreated, nil + if err := s.MetaStore.PrecreateShardGroups(cutoff); err != nil { + return err + } + return nil } diff --git a/services/precreator/service_test.go b/services/precreator/service_test.go index 2a01006861c..da73a7ea60b 100644 --- a/services/precreator/service_test.go +++ b/services/precreator/service_test.go @@ -1,10 +1,10 @@ package precreator import ( + "sync" "testing" "time" - "github.com/influxdb/influxdb/meta" "github.com/influxdb/influxdb/toml" ) @@ -12,78 +12,48 @@ func Test_ShardPrecreation(t *testing.T) { t.Parallel() now := time.Now().UTC() + advancePeriod := 5 * time.Minute // A test metastaore which returns 2 shard groups, only 1 of which requires a successor. + var wg sync.WaitGroup + wg.Add(1) ms := metaStore{ - VisitRetentionPoliciesFn: func(f func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo)) { - f(meta.DatabaseInfo{Name: "mydb"}, meta.RetentionPolicyInfo{Name: "myrp"}) - }, - ShardGroupsFn: func(database, policy string) ([]meta.ShardGroupInfo, error) { - if database != "mydb" || policy != "myrp" { - t.Fatalf("ShardGroups called with incorrect database or policy") - } - - // Make two shard groups, 1 which needs a successor, the other does not. - groups := make([]meta.ShardGroupInfo, 2) - groups[0] = meta.ShardGroupInfo{ - ID: 1, - StartTime: now.Add(-1 * time.Hour), - EndTime: now, - } - groups[1] = meta.ShardGroupInfo{ - ID: 2, - StartTime: now, - EndTime: now.Add(time.Hour), - } - return groups, nil - }, - CreateShardGroupIfNotExistFn: func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) { - if database != "mydb" || policy != "myrp" { - t.Fatalf("ShardGroups called with incorrect database or policy") + PrecreateShardGroupsFn: func(u time.Time) error { + wg.Done() + if u != now.Add(advancePeriod) { + t.Fatalf("precreation called with wrong time, got %s, exp %s", u, now) } - return &meta.ShardGroupInfo{ID: 3}, nil + return nil }, } srv, err := NewService(Config{ CheckInterval: toml.Duration(time.Minute), - AdvancePeriod: toml.Duration(5 * time.Minute), + AdvancePeriod: toml.Duration(advancePeriod), }) if err != nil { t.Fatalf("failed to create shard precreation service: %s", err.Error()) } srv.MetaStore = ms - n, err := srv.precreate(now) + err = srv.precreate(now) if err != nil { t.Fatalf("failed to precreate shards: %s", err.Error()) } - if n != 1 { - t.Fatalf("incorrect number of shard groups created, exp 1, got %d", n) - } + wg.Wait() // Ensure metastore test function is called. return } // PointsWriter represents a mock impl of PointsWriter. type metaStore struct { - VisitRetentionPoliciesFn func(f func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo)) - ShardGroupsFn func(database, policy string) ([]meta.ShardGroupInfo, error) - CreateShardGroupIfNotExistFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) + PrecreateShardGroupsFn func(cutoff time.Time) error } func (m metaStore) IsLeader() bool { return true } -func (m metaStore) VisitRetentionPolicies(f func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo)) { - m.VisitRetentionPoliciesFn(f) -} - -func (m metaStore) ShardGroups(database, policy string) ([]meta.ShardGroupInfo, error) { - return m.ShardGroupsFn(database, policy) -} - -func (m metaStore) CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) { - return m.CreateShardGroupIfNotExistFn(database, policy, timestamp) +func (m metaStore) PrecreateShardGroups(timestamp time.Time) error { + return m.PrecreateShardGroupsFn(timestamp) }