Skip to content

Commit

Permalink
Move shard precreation logic to meta/store
Browse files Browse the repository at this point in the history
  • Loading branch information
otoolep committed Jun 10, 2015
1 parent f679c81 commit 5798d99
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 83 deletions.
28 changes: 28 additions & 0 deletions meta/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,34 @@ func (s *Store) UserCount() (count int, err error) {
return
}

// PrecreateShardGroups creates shard groups whose endtime is before the cutoff time passed in. This
// avoid the need for these shards to be created when data for the corresponding time range arrives.
// Shard creation involves Raft consensus, and precreation avoids taking the hit at write-time.
func (s *Store) PrecreateShardGroups(cutoff time.Time) error {
s.read(func(data *Data) error {
for _, di := range data.Databases {
for _, rp := range di.RetentionPolicies {
for _, g := range rp.ShardGroups {
// Check to see if it is going to end before our interval
if g.EndTime.Before(cutoff) {
s.Logger.Printf("pre-creating successive shard group for group %d, database %s, policy %s",
g.ID, di.Name, rp.Name)
if newGroup, err := s.CreateShardGroupIfNotExists(di.Name, rp.Name, g.EndTime.Add(1*time.Nanosecond)); err != nil {
s.Logger.Printf("failed to create successive shard group for group %d: %s",
g.ID, err.Error())
} else {
s.Logger.Printf("new shard group %d successfully created", newGroup.ID)
}
}
}

}
}
return nil
})
return nil
}

// read executes a function with the current metadata.
// If an error is returned then the cache is invalidated and retried.
//
Expand Down
32 changes: 32 additions & 0 deletions meta/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,38 @@ func TestStore_DeleteShardGroup(t *testing.T) {
}
}

// Ensure the store correctly precreates shard groups.
func TestStore_PrecreateShardGroup(t *testing.T) {
t.Parallel()
s := MustOpenStore()
defer s.Close()

// Create node, database, policy, & group.
if _, err := s.CreateNode("host0"); err != nil {
t.Fatal(err)
} else if _, err := s.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1, Duration: 1 * time.Hour}); err != nil {
t.Fatal(err)
} else if _, err := s.CreateShardGroup("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil {
t.Fatal(err)
} else if err := s.PrecreateShardGroups(time.Date(2001, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil {
t.Fatal(err)
}

groups, err := s.ShardGroups("db0", "rp0")
if err != nil {
t.Fatal(err)
}
if len(groups) != 2 {
t.Fatalf("shard group precreation failed to create new shard group")
}
if groups[1].StartTime != time.Date(2000, time.January, 1, 1, 0, 0, 0, time.UTC) {
t.Fatalf("precreated shard group has wrong start time, exp %s, got %s",
time.Date(2000, time.January, 1, 1, 0, 0, 0, time.UTC), groups[1].StartTime)
}
}

// Ensure the store can create a new continuous query.
func TestStore_CreateContinuousQuery(t *testing.T) {
t.Parallel()
Expand Down
51 changes: 13 additions & 38 deletions services/precreator/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"os"
"sync"
"time"

"github.com/influxdb/influxdb/meta"
)

type Service struct {
Expand All @@ -20,13 +18,11 @@ type Service struct {

MetaStore interface {
IsLeader() bool
VisitRetentionPolicies(f func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo))
ShardGroups(database, policy string) ([]meta.ShardGroupInfo, error)
CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
PrecreateShardGroups(cutoff time.Time) error
}
}

// NewService returns an instance of the Graphite service.
// NewService returns an instance of the precreation service.
func NewService(c Config) (*Service, error) {
s := Service{
checkInterval: time.Duration(c.CheckInterval),
Expand All @@ -42,7 +38,7 @@ func (s *Service) SetLogger(l *log.Logger) {
s.Logger = l
}

// Open starts the shard precreation service.
// Open starts the precreation service.
func (s *Service) Open() error {
if s.done != nil {
return nil
Expand All @@ -55,7 +51,7 @@ func (s *Service) Open() error {
return nil
}

// Close stops the shard precreation service.
// Close stops the precreation service.
func (s *Service) Close() error {
if s.done == nil {
return nil
Expand All @@ -68,7 +64,7 @@ func (s *Service) Close() error {
return nil
}

// runPrecreation continually checks if shards need precreation.
// runPrecreation continually checks if resources need precreation.
func (s *Service) runPrecreation() {
defer s.wg.Done()

Expand All @@ -81,42 +77,21 @@ func (s *Service) runPrecreation() {
continue
}

if _, err := s.precreate(time.Now().UTC()); err != nil {
if err := s.precreate(time.Now().UTC()); err != nil {
s.Logger.Printf("failed to precreate shards: %s", err.Error())
}
case <-s.done:
s.Logger.Println("shard precreation service terminating")
s.Logger.Println("precreation service terminating")
return
}
}
}

// precreate performs actual shard precreation. Returns the number of groups that were created.
func (s *Service) precreate(t time.Time) (int, error) {
// precreate performs actual resource precreation.
func (s *Service) precreate(t time.Time) error {
cutoff := t.Add(s.advancePeriod).UTC()
numCreated := 0

s.MetaStore.VisitRetentionPolicies(func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo) {
groups, err := s.MetaStore.ShardGroups(d.Name, r.Name)
if err != nil {
s.Logger.Printf("failed to retrieve shard groups for database %s, policy %s: %s",
d.Name, r.Name, err.Error())
return
}
for _, g := range groups {
// Check to see if it is going to end before our interval
if g.EndTime.Before(cutoff) {
s.Logger.Printf("pre-creating successive shard group for group %d, database %s, policy %s",
g.ID, d.Name, r.Name)
if newGroup, err := s.MetaStore.CreateShardGroupIfNotExists(d.Name, r.Name, g.EndTime.Add(1*time.Nanosecond)); err != nil {
s.Logger.Printf("failed to create successive shard group for group %d: %s",
g.ID, err.Error())
} else {
numCreated++
s.Logger.Printf("new shard group %d successfully created", newGroup.ID)
}
}
}
})
return numCreated, nil
if err := s.MetaStore.PrecreateShardGroups(cutoff); err != nil {
return err
}
return nil
}
60 changes: 15 additions & 45 deletions services/precreator/service_test.go
Original file line number Diff line number Diff line change
@@ -1,89 +1,59 @@
package precreator

import (
"sync"
"testing"
"time"

"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/toml"
)

func Test_ShardPrecreation(t *testing.T) {
t.Parallel()

now := time.Now().UTC()
advancePeriod := 5 * time.Minute

// A test metastaore which returns 2 shard groups, only 1 of which requires a successor.
var wg sync.WaitGroup
wg.Add(1)
ms := metaStore{
VisitRetentionPoliciesFn: func(f func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo)) {
f(meta.DatabaseInfo{Name: "mydb"}, meta.RetentionPolicyInfo{Name: "myrp"})
},
ShardGroupsFn: func(database, policy string) ([]meta.ShardGroupInfo, error) {
if database != "mydb" || policy != "myrp" {
t.Fatalf("ShardGroups called with incorrect database or policy")
}

// Make two shard groups, 1 which needs a successor, the other does not.
groups := make([]meta.ShardGroupInfo, 2)
groups[0] = meta.ShardGroupInfo{
ID: 1,
StartTime: now.Add(-1 * time.Hour),
EndTime: now,
}
groups[1] = meta.ShardGroupInfo{
ID: 2,
StartTime: now,
EndTime: now.Add(time.Hour),
}
return groups, nil
},
CreateShardGroupIfNotExistFn: func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
if database != "mydb" || policy != "myrp" {
t.Fatalf("ShardGroups called with incorrect database or policy")
PrecreateShardGroupsFn: func(u time.Time) error {
wg.Done()
if u != now.Add(advancePeriod) {
t.Fatalf("precreation called with wrong time, got %s, exp %s", u, now)
}
return &meta.ShardGroupInfo{ID: 3}, nil
return nil
},
}

srv, err := NewService(Config{
CheckInterval: toml.Duration(time.Minute),
AdvancePeriod: toml.Duration(5 * time.Minute),
AdvancePeriod: toml.Duration(advancePeriod),
})
if err != nil {
t.Fatalf("failed to create shard precreation service: %s", err.Error())
}
srv.MetaStore = ms

n, err := srv.precreate(now)
err = srv.precreate(now)
if err != nil {
t.Fatalf("failed to precreate shards: %s", err.Error())
}
if n != 1 {
t.Fatalf("incorrect number of shard groups created, exp 1, got %d", n)
}

wg.Wait() // Ensure metastore test function is called.
return
}

// PointsWriter represents a mock impl of PointsWriter.
type metaStore struct {
VisitRetentionPoliciesFn func(f func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo))
ShardGroupsFn func(database, policy string) ([]meta.ShardGroupInfo, error)
CreateShardGroupIfNotExistFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
PrecreateShardGroupsFn func(cutoff time.Time) error
}

func (m metaStore) IsLeader() bool {
return true
}

func (m metaStore) VisitRetentionPolicies(f func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo)) {
m.VisitRetentionPoliciesFn(f)
}

func (m metaStore) ShardGroups(database, policy string) ([]meta.ShardGroupInfo, error) {
return m.ShardGroupsFn(database, policy)
}

func (m metaStore) CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
return m.CreateShardGroupIfNotExistFn(database, policy, timestamp)
func (m metaStore) PrecreateShardGroups(timestamp time.Time) error {
return m.PrecreateShardGroupsFn(timestamp)
}

0 comments on commit 5798d99

Please sign in to comment.