Skip to content

Commit

Permalink
compute continuous queries back to 'recompute-no-older-than', bucketi…
Browse files Browse the repository at this point in the history
…ng with GROUP BY clause
  • Loading branch information
kruckenb committed Jul 18, 2015
1 parent f404a8a commit 86f1e47
Show file tree
Hide file tree
Showing 4 changed files with 2 additions and 36 deletions.
1 change: 0 additions & 1 deletion etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ reporting-disabled = false

[continuous_queries]
enabled = true
recompute-previous-n = 2
recompute-no-older-than = "10m"
compute-runs-per-interval = 10
compute-no-more-than = "2m"
Expand Down
9 changes: 0 additions & 9 deletions services/continuous_querier/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
)

const (
DefaultRecomputePreviousN = 2

DefaultRecomputeNoOlderThan = 10 * time.Minute

DefaultComputeRunsPerInterval = 10
Expand All @@ -21,12 +19,6 @@ type Config struct {
// If this flag is set to false, both the brokers and data nodes should ignore any CQ processing.
Enabled bool `toml:"enabled"`

// when continuous queries are run we'll automatically recompute previous intervals
// in case lagged data came in. Set to zero if you never have lagged data. We do
// it this way because invalidating previously computed intervals would be insanely hard
// and expensive.
RecomputePreviousN int `toml:"recompute-previous-n"`

// The RecomputePreviousN setting provides guidance for how far back to recompute, the RecomputeNoOlderThan
// setting sets a ceiling on how far back in time it will go. For example, if you have 2 PreviousN
// and have this set to 10m, then we'd only compute the previous two intervals for any
Expand All @@ -53,7 +45,6 @@ type Config struct {
func NewConfig() Config {
return Config{
Enabled: true,
RecomputePreviousN: DefaultRecomputePreviousN,
RecomputeNoOlderThan: toml.Duration(DefaultRecomputeNoOlderThan),
ComputeRunsPerInterval: DefaultComputeRunsPerInterval,
ComputeNoMoreThan: toml.Duration(DefaultComputeNoMoreThan),
Expand Down
5 changes: 1 addition & 4 deletions services/continuous_querier/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ func TestConfig_Parse(t *testing.T) {
// Parse configuration.
var c continuous_querier.Config
if _, err := toml.Decode(`
recompute-previous-n = 1
recompute-no-older-than = "10s"
compute-runs-per-interval = 2
compute-no-more-than = "20s"
Expand All @@ -22,9 +21,7 @@ enabled = true
}

// Validate configuration.
if c.RecomputePreviousN != 1 {
t.Fatalf("unexpected recompute previous n: %d", c.RecomputePreviousN)
} else if time.Duration(c.RecomputeNoOlderThan) != 10*time.Second {
if time.Duration(c.RecomputeNoOlderThan) != 10*time.Second {
t.Fatalf("unexpected recompute no older than: %v", c.RecomputeNoOlderThan)
} else if c.ComputeRunsPerInterval != 2 {
t.Fatalf("unexpected compute runs per interval: %d", c.ComputeRunsPerInterval)
Expand Down
23 changes: 1 addition & 22 deletions services/continuous_querier/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
startTime = startTime.Add(-interval)
}

if err := cq.q.SetTimeRange(startTime, startTime.Add(interval)); err != nil {
if err := cq.q.SetTimeRange(now.Add(-time.Duration(s.Config.RecomputeNoOlderThan)), startTime.Add(interval)); err != nil {
s.Logger.Printf("error setting time range: %s\n", err)
}

Expand All @@ -240,27 +240,6 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
return err
}

recomputeNoOlderThan := time.Duration(s.Config.RecomputeNoOlderThan)

for i := 0; i < s.Config.RecomputePreviousN; i++ {
// if we're already more time past the previous window than we're going to look back, stop
if now.Sub(startTime) > recomputeNoOlderThan {
return nil
}
newStartTime := startTime.Add(-interval)

if err := cq.q.SetTimeRange(newStartTime, startTime); err != nil {
s.Logger.Printf("error setting time range: %s\n", err)
return err
}

if err := s.runContinuousQueryAndWriteResult(cq); err != nil {
s.Logger.Printf("error during recompute previous: %s. running: %s\n", err, cq.q.String())
return err
}

startTime = newStartTime
}
return nil
}

Expand Down

0 comments on commit 86f1e47

Please sign in to comment.