Skip to content

Commit

Permalink
Monitor retention policy is configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
otoolep committed Sep 5, 2015
1 parent cfd0acd commit 214cfea
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 19 deletions.
3 changes: 3 additions & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ reporting-disabled = false
[monitor]
store-enabled = true # Whether to record statistics internally.
store-database = "_internal" # The destination database for recorded statistics
store-retention-policy = "monitor" # The destination retention policy
store-retention-duration = "168h" # How long to keep the data
store-replication-factor = 1 # How many copies of the data to keep
store-interval = "1m" # The interval at which to record statistics

###
Expand Down
27 changes: 21 additions & 6 deletions monitor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,37 @@ const (
// DefaultStoreDatabase is the name of the database where gathered information is written
DefaultStoreDatabase = "_internal"

// DefaultStoreRetentionPolicy is the name of the retention policy for monitor data.
DefaultStoreRetentionPolicy = "monitor"

// DefaultRetentionPolicyDuration is the duration the data is retained.
DefaultStoreRetentionPolicyDuration = 168 * time.Hour

// DefaultStoreReplicationFactor is the default replication factor for the data.
DefaultStoreReplicationFactor = 1

// DefaultStoreInterval is the period between storing gathered information.
DefaultStoreInterval = time.Minute
)

// Config represents the configuration for the monitor service.
type Config struct {
StoreEnabled bool `toml:"store-enabled"`
StoreDatabase string `toml:"store-database"`
StoreInterval toml.Duration `toml:"store-interval"`
StoreEnabled bool `toml:"store-enabled"`
StoreDatabase string `toml:"store-database"`
StoreRetentionPolicy string `toml:"store-retention-policy"`
StoreRetentionDuration toml.Duration `toml:"store-retention-duration"`
StoreReplicationFactor int `toml:"store-replication-factor"`
StoreInterval toml.Duration `toml:"store-interval"`
}

// NewConfig returns an instance of Config with defaults.
func NewConfig() Config {
return Config{
StoreEnabled: true,
StoreDatabase: DefaultStoreDatabase,
StoreInterval: toml.Duration(DefaultStoreInterval),
StoreEnabled: true,
StoreDatabase: DefaultStoreDatabase,
StoreRetentionPolicy: DefaultStoreRetentionPolicy,
StoreRetentionDuration: toml.Duration(DefaultStoreRetentionPolicyDuration),
StoreReplicationFactor: DefaultStoreReplicationFactor,
StoreInterval: toml.Duration(DefaultStoreInterval),
}
}
9 changes: 9 additions & 0 deletions monitor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ func TestConfig_Parse(t *testing.T) {
if _, err := toml.Decode(`
store-enabled=true
store-database="the_db"
store-retention-policy="the_rp"
store-retention-duration="1h"
store-replication-factor=1234
store-interval="10m"
`, &c); err != nil {
t.Fatal(err)
Expand All @@ -24,6 +27,12 @@ store-interval="10m"
t.Fatalf("unexpected store-enabled: %v", c.StoreEnabled)
} else if c.StoreDatabase != "the_db" {
t.Fatalf("unexpected store-database: %s", c.StoreDatabase)
} else if c.StoreRetentionPolicy != "the_rp" {
t.Fatalf("unexpected store-retention-policy: %s", c.StoreRetentionPolicy)
} else if time.Duration(c.StoreRetentionDuration) != 1*time.Hour {
t.Fatalf("unexpected store-retention-duration: %s", c.StoreRetentionDuration)
} else if c.StoreReplicationFactor != 1234 {
t.Fatalf("unexpected store-replication-factor: %d", c.StoreReplicationFactor)
} else if time.Duration(c.StoreInterval) != 10*time.Minute {
t.Fatalf("unexpected store-interval: %s", c.StoreInterval)
}
Expand Down
43 changes: 30 additions & 13 deletions monitor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,20 @@ type Monitor struct {

diagRegistrations map[string]DiagsClient

storeEnabled bool
storeDatabase string
storeAddress string
storeInterval time.Duration
storeEnabled bool
storeDatabase string
storeRetentionPolicy string
storeRetentionDuration time.Duration
storeReplicationFactor int
storeAddress string
storeInterval time.Duration

MetaStore interface {
ClusterID() (uint64, error)
NodeID() uint64
WaitForLeader(d time.Duration) error
CreateDatabaseIfNotExists(name string) (*meta.DatabaseInfo, error)
CreateRetentionPolicyIfNotExists(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error)
}

PointsWriter interface {
Expand All @@ -85,12 +89,15 @@ type Monitor struct {
// New returns a new instance of the monitor system.
func New(c Config) *Monitor {
return &Monitor{
done: make(chan struct{}),
diagRegistrations: make(map[string]DiagsClient),
storeEnabled: c.StoreEnabled,
storeDatabase: c.StoreDatabase,
storeInterval: time.Duration(c.StoreInterval),
Logger: log.New(os.Stderr, "[monitor] ", log.LstdFlags),
done: make(chan struct{}),
diagRegistrations: make(map[string]DiagsClient),
storeEnabled: c.StoreEnabled,
storeDatabase: c.StoreDatabase,
storeRetentionPolicy: c.StoreRetentionPolicy,
storeRetentionDuration: time.Duration(c.StoreRetentionDuration),
storeReplicationFactor: c.StoreReplicationFactor,
storeInterval: time.Duration(c.StoreInterval),
Logger: log.New(os.Stderr, "[monitor] ", log.LstdFlags),
}
}

Expand Down Expand Up @@ -255,8 +262,8 @@ func (m *Monitor) Diagnostics() (map[string]*Diagnostic, error) {
// storeStatistics writes the statistics to an InfluxDB system.
func (m *Monitor) storeStatistics() {
defer m.wg.Done()
m.Logger.Printf("storing statistics in database '%s', interval %s",
m.storeDatabase, m.storeInterval)
m.Logger.Printf("Storing statistics in database '%s' retention policy '%s', at interval %s",
m.storeDatabase, m.storeRetentionPolicy, m.storeInterval)

if err := m.MetaStore.WaitForLeader(leaderWaitTimeout); err != nil {
m.Logger.Printf("failed to detect a cluster leader, terminating storage: %s", err.Error())
Expand All @@ -269,6 +276,16 @@ func (m *Monitor) storeStatistics() {
return
}

rpi := meta.NewRetentionPolicyInfo(m.storeRetentionPolicy)
rpi.Duration = m.storeRetentionDuration
rpi.ReplicaN = m.storeReplicationFactor

if _, err := m.MetaStore.CreateRetentionPolicyIfNotExists(m.storeDatabase, rpi); err != nil {
m.Logger.Printf("failed to create retention policy '%s', terminating storage: %s",
m.storeRetentionPolicy, err.Error())
return
}

tick := time.NewTicker(m.storeInterval)
defer tick.Stop()
for {
Expand All @@ -287,7 +304,7 @@ func (m *Monitor) storeStatistics() {

err = m.PointsWriter.WritePoints(&cluster.WritePointsRequest{
Database: m.storeDatabase,
RetentionPolicy: "",
RetentionPolicy: m.storeRetentionPolicy,
ConsistencyLevel: cluster.ConsistencyLevelOne,
Points: points,
})
Expand Down
3 changes: 3 additions & 0 deletions monitor/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ func (m *mockMetastore) WaitForLeader(d time.Duration) error { return nil }
func (m *mockMetastore) CreateDatabaseIfNotExists(name string) (*meta.DatabaseInfo, error) {
return nil, nil
}
func (m *mockMetastore) CreateRetentionPolicyIfNotExists(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) {
return nil, nil
}

func openMonitor(t *testing.T) *Monitor {
monitor := New(NewConfig())
Expand Down

0 comments on commit 214cfea

Please sign in to comment.