Skip to content

Commit

Permalink
Merge pull request #2878 from influxdb/shard_precreation
Browse files Browse the repository at this point in the history
Shard precreation service
  • Loading branch information
otoolep committed Jun 10, 2015
2 parents 269a9c6 + 5798d99 commit 01f2d7d
Show file tree
Hide file tree
Showing 11 changed files with 316 additions and 78 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
- [#2857](https://github.com/influxdb/influxdb/issues/2857): Fix parsing commas in string field values.
- [#2833](https://github.com/influxdb/influxdb/pull/2833): Make the default config valid.
- [#2859](https://github.com/influxdb/influxdb/pull/2859): Fix panic on aggregate functions.
- [#2878](https://github.com/influxdb/influxdb/pull/2878): Re-enable shard precreation.

### Features
- [2858](https://github.com/influxdb/influxdb/pull/2858): Support setting openTSDB write consistency.
Expand Down
11 changes: 7 additions & 4 deletions cmd/influxd/run/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@ import (
"github.com/influxdb/influxdb/services/httpd"
"github.com/influxdb/influxdb/services/monitor"
"github.com/influxdb/influxdb/services/opentsdb"
"github.com/influxdb/influxdb/services/precreator"
"github.com/influxdb/influxdb/services/retention"
"github.com/influxdb/influxdb/services/udp"
"github.com/influxdb/influxdb/tsdb"
)

// Config represents the configuration format for the influxd binary.
type Config struct {
Meta meta.Config `toml:"meta"`
Data tsdb.Config `toml:"data"`
Cluster cluster.Config `toml:"cluster"`
Retention retention.Config `toml:"retention"`
Meta meta.Config `toml:"meta"`
Data tsdb.Config `toml:"data"`
Cluster cluster.Config `toml:"cluster"`
Retention retention.Config `toml:"retention"`
Precreator precreator.Config `toml:"shard-precreation"`

Admin admin.Config `toml:"admin"`
HTTPD httpd.Config `toml:"http"`
Expand All @@ -48,6 +50,7 @@ func NewConfig() *Config {
c.Meta = meta.NewConfig()
c.Data = tsdb.NewConfig()
c.Cluster = cluster.NewConfig()
c.Precreator = precreator.NewConfig()

c.Admin = admin.NewConfig()
c.HTTPD = httpd.NewConfig()
Expand Down
16 changes: 16 additions & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/influxdb/influxdb/services/hh"
"github.com/influxdb/influxdb/services/httpd"
"github.com/influxdb/influxdb/services/opentsdb"
"github.com/influxdb/influxdb/services/precreator"
"github.com/influxdb/influxdb/services/retention"
"github.com/influxdb/influxdb/services/udp"
"github.com/influxdb/influxdb/tcp"
Expand Down Expand Up @@ -76,6 +77,7 @@ func NewServer(c *Config) (*Server, error) {

// Append services.
s.appendClusterService(c.Cluster)
s.appendPrecreatorService(c.Precreator)
s.appendAdminService(c.Admin)
s.appendContinuousQueryService(c.ContinuousQuery)
s.appendHTTPDService(c.HTTPD)
Expand Down Expand Up @@ -174,6 +176,20 @@ func (s *Server) appendGraphiteService(c graphite.Config) error {
return nil
}

func (s *Server) appendPrecreatorService(c precreator.Config) error {
if !c.Enabled {
return nil
}
srv, err := precreator.NewService(c)
if err != nil {
return err
}

srv.MetaStore = s.MetaStore
s.Services = append(s.Services, srv)
return nil
}

func (s *Server) appendUDPService(c udp.Config) {
if !c.Enabled {
return
Expand Down
74 changes: 0 additions & 74 deletions meta/shard_group_precreator.go

This file was deleted.

28 changes: 28 additions & 0 deletions meta/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,34 @@ func (s *Store) UserCount() (count int, err error) {
return
}

// PrecreateShardGroups creates shard groups whose endtime is before the cutoff time passed in. This
// avoid the need for these shards to be created when data for the corresponding time range arrives.
// Shard creation involves Raft consensus, and precreation avoids taking the hit at write-time.
func (s *Store) PrecreateShardGroups(cutoff time.Time) error {
s.read(func(data *Data) error {
for _, di := range data.Databases {
for _, rp := range di.RetentionPolicies {
for _, g := range rp.ShardGroups {
// Check to see if it is going to end before our interval
if g.EndTime.Before(cutoff) {
s.Logger.Printf("pre-creating successive shard group for group %d, database %s, policy %s",
g.ID, di.Name, rp.Name)
if newGroup, err := s.CreateShardGroupIfNotExists(di.Name, rp.Name, g.EndTime.Add(1*time.Nanosecond)); err != nil {
s.Logger.Printf("failed to create successive shard group for group %d: %s",
g.ID, err.Error())
} else {
s.Logger.Printf("new shard group %d successfully created", newGroup.ID)
}
}
}

}
}
return nil
})
return nil
}

// read executes a function with the current metadata.
// If an error is returned then the cache is invalidated and retried.
//
Expand Down
32 changes: 32 additions & 0 deletions meta/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,38 @@ func TestStore_DeleteShardGroup(t *testing.T) {
}
}

// Ensure the store correctly precreates shard groups.
func TestStore_PrecreateShardGroup(t *testing.T) {
t.Parallel()
s := MustOpenStore()
defer s.Close()

// Create node, database, policy, & group.
if _, err := s.CreateNode("host0"); err != nil {
t.Fatal(err)
} else if _, err := s.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1, Duration: 1 * time.Hour}); err != nil {
t.Fatal(err)
} else if _, err := s.CreateShardGroup("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil {
t.Fatal(err)
} else if err := s.PrecreateShardGroups(time.Date(2001, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil {
t.Fatal(err)
}

groups, err := s.ShardGroups("db0", "rp0")
if err != nil {
t.Fatal(err)
}
if len(groups) != 2 {
t.Fatalf("shard group precreation failed to create new shard group")
}
if groups[1].StartTime != time.Date(2000, time.January, 1, 1, 0, 0, 0, time.UTC) {
t.Fatalf("precreated shard group has wrong start time, exp %s, got %s",
time.Date(2000, time.January, 1, 1, 0, 0, 0, time.UTC), groups[1].StartTime)
}
}

// Ensure the store can create a new continuous query.
func TestStore_CreateContinuousQuery(t *testing.T) {
t.Parallel()
Expand Down
32 changes: 32 additions & 0 deletions services/precreator/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package precreator

import (
"time"

"github.com/influxdb/influxdb/toml"
)

const (
// DefaultCheckInterval is the shard precreation check time if none is specified.
DefaultCheckInterval = 10 * time.Minute

// DefaultAdvancePeriod is the default period ahead of the endtime of a shard group
// that its successor group is created.
DefaultAdvancePeriod = 30 * time.Minute
)

// Config represents the configuration for shard precreation.
type Config struct {
Enabled bool `toml:"enabled"`
CheckInterval toml.Duration `toml:"check-interval"`
AdvancePeriod toml.Duration `toml:"advance-period"`
}

// NewConfig returns a new Config with defaults.
func NewConfig() Config {
return Config{
Enabled: true,
CheckInterval: toml.Duration(DefaultCheckInterval),
AdvancePeriod: toml.Duration(DefaultAdvancePeriod),
}
}
31 changes: 31 additions & 0 deletions services/precreator/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package precreator_test

import (
"testing"
"time"

"github.com/BurntSushi/toml"
"github.com/influxdb/influxdb/services/precreator"
)

func TestConfig_Parse(t *testing.T) {
// Parse configuration.
var c precreator.Config
if _, err := toml.Decode(`
enabled = true
check-interval = "2m"
advance-period = "10m"
`, &c); err != nil {

t.Fatal(err)
}

// Validate configuration.
if !c.Enabled {
t.Fatalf("unexpected enabled state: %v", c.Enabled)
} else if time.Duration(c.CheckInterval) != 2*time.Minute {
t.Fatalf("unexpected check interval: %s", c.CheckInterval)
} else if time.Duration(c.AdvancePeriod) != 10*time.Minute {
t.Fatalf("unexpected advance period: %s", c.AdvancePeriod)
}
}
13 changes: 13 additions & 0 deletions services/precreator/notes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Shard Precreation
============

During normal operation when InfluxDB receives time-series data, it writes the data to files known as _shards_. Each shard only contains data for a specific range of time. Therefore, before data can be accepted by the system, the shards must exist and InfluxDB always checks that the required shards exist for every incoming data point. If the required shards do not exist, InfluxDB will create those shards. Because this requires a cluster to reach consensus, the process is not instantaneous and can temporarily impact write-throughput.

Since almost all time-series data is written sequentially in time, the system has an excellent idea of the timestamps of future data. Shard precreation takes advantage of this fact by creating required shards ahead of time, thereby ensuring the required shards exist by the time new time-series data actually arrives. Write-throughput is therefore not affected when data is first received for a range of time that would normally trigger shard creation.

Note that the shard-existence check must remain in place in the code, even with shard precreation. This is because while most data is written sequentially in time, this is not always the case. Data may be written with timestamps in the past, or farther in the future than shard precreation handles.

## Configuration
Shard precreation can be disabled if necessary, though this is not recommended. If it is disabled, then shards will be only be created when explicitly needed.

The interval between runs of the shard precreation service, as well as the time-in-advance the shards are created, are also configurable. The defaults should work for most deployments.
Loading

0 comments on commit 01f2d7d

Please sign in to comment.