Skip to content

Commit

Permalink
Merge pull request #3426 from jhorwit2/jah/continuous-logging
Browse files Browse the repository at this point in the history
Added additional logging to continuous queries
  • Loading branch information
pauldix committed Jul 27, 2015
2 parents b7bbebd + e722b4b commit fb76c34
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 7 deletions.
1 change: 1 addition & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ reporting-disabled = false
###

[continuous_queries]
log-enabled = true
enabled = true
recompute-previous-n = 2
recompute-no-older-than = "10m"
Expand Down
4 changes: 4 additions & 0 deletions services/continuous_querier/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ const (

// Config represents a configuration for the continuous query service.
type Config struct {
// Enables logging in CQ service to display when CQ's are processed and how many points are wrote.
LogEnabled bool `toml:"log-enabled"`

// If this flag is set to false, both the brokers and data nodes should ignore any CQ processing.
Enabled bool `toml:"enabled"`

Expand Down Expand Up @@ -52,6 +55,7 @@ type Config struct {
// NewConfig returns a new instance of Config with defaults.
func NewConfig() Config {
return Config{
LogEnabled: true,
Enabled: true,
RecomputePreviousN: DefaultRecomputePreviousN,
RecomputeNoOlderThan: toml.Duration(DefaultRecomputeNoOlderThan),
Expand Down
27 changes: 20 additions & 7 deletions services/continuous_querier/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ type Service struct {
Config *Config
RunInterval time.Duration
// RunCh can be used by clients to signal service to run CQs.
RunCh chan struct{}
Logger *log.Logger
RunCh chan struct{}
Logger *log.Logger
loggingEnabled bool
// lastRuns maps CQ name to last time it was run.
lastRuns map[string]time.Time
stop chan struct{}
Expand All @@ -62,12 +63,15 @@ type Service struct {
// NewService returns a new instance of Service.
func NewService(c Config) *Service {
s := &Service{
Config: &c,
RunInterval: time.Second,
RunCh: make(chan struct{}),
Logger: log.New(os.Stderr, "[continuous_querier] ", log.LstdFlags),
lastRuns: map[string]time.Time{},
Config: &c,
RunInterval: time.Second,
RunCh: make(chan struct{}),
loggingEnabled: c.LogEnabled,
Logger: log.New(os.Stderr, "[continuous_querier] ", log.LstdFlags),
lastRuns: map[string]time.Time{},
}

s.Logger.Println("starting continuous query service")
return s
}

Expand Down Expand Up @@ -150,6 +154,7 @@ func (s *Service) backgroundLoop() {
for {
select {
case <-s.stop:
s.Logger.Println("continuous query service terminating")
return
case <-s.RunCh:
if s.MetaStore.IsLeader() {
Expand Down Expand Up @@ -234,6 +239,10 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
s.Logger.Printf("error setting time range: %s\n", err)
}

if s.loggingEnabled {
s.Logger.Printf("executing continuous query %s", cq.Info.Name)
}

// Do the actual processing of the query & writing of results.
if err := s.runContinuousQueryAndWriteResult(cq); err != nil {
s.Logger.Printf("error: %s. running: %s\n", err, cq.q.String())
Expand Down Expand Up @@ -319,6 +328,10 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
s.Logger.Println(err)
return err
}

if s.loggingEnabled {
s.Logger.Printf("wrote %d point(s) to %s.%s.%s", len(points), cq.intoDB(), cq.intoRP(), cq.Info.Name)
}
}
}

Expand Down

0 comments on commit fb76c34

Please sign in to comment.