Skip to content

Commit

Permalink
Fix #387. Aggregation by time interval should respect the natural bou…
Browse files Browse the repository at this point in the history
…ndaries of the argument.
  • Loading branch information
toddboom committed Oct 9, 2014
1 parent e5c9fb7 commit a286f5f
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 29 deletions.
2 changes: 1 addition & 1 deletion cluster/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func (self *ShardData) ShouldAggregateLocally(querySpec *parser.QuerySpec) bool
}
return true
}
return self.shardDuration%*groupByInterval == 0
return (self.shardDuration%*groupByInterval == 0) && !querySpec.GroupByIrregularInterval
}

type Shards []*ShardData
Expand Down
8 changes: 4 additions & 4 deletions cluster/shard_space.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ func (s *ShardSpace) Validate(clusterConfig *ClusterConfiguration, checkForDb bo
s.ReplicationFactor = DEFAULT_REPLICATION_FACTOR
}
if s.ShardDuration != "" {
if _, err := common.ParseTimeDuration(s.ShardDuration); err != nil {
if _, _, err := common.ParseTimeDuration(s.ShardDuration); err != nil {
return err
}
}
if s.RetentionPolicy != "" && s.RetentionPolicy != "inf" {
if _, err := common.ParseTimeDuration(s.RetentionPolicy); err != nil {
if _, _, err := common.ParseTimeDuration(s.RetentionPolicy); err != nil {
return err
}
}
Expand Down Expand Up @@ -121,13 +121,13 @@ func (s *ShardSpace) ParsedRetentionPeriod() time.Duration {
} else if s.RetentionPolicy == "inf" {
return time.Duration(0)
}
d, _ := common.ParseTimeDuration(s.RetentionPolicy)
d, _, _ := common.ParseTimeDuration(s.RetentionPolicy)
return time.Duration(d)
}

func (s *ShardSpace) ParsedShardDuration() time.Duration {
if s.ShardDuration != "" {
d, _ := common.ParseTimeDuration(s.ShardDuration)
d, _, _ := common.ParseTimeDuration(s.ShardDuration)
return time.Duration(d)
}
return DEFAULT_SHARD_DURATION
Expand Down
20 changes: 14 additions & 6 deletions common/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import (

// Returns the parsed duration in nanoseconds, support 'u', 's', 'm',
// 'h', 'd' and 'w' suffixes.
func ParseTimeDuration(value string) (int64, error) {
func ParseTimeDuration(value string) (int64, bool, error) {
var constant time.Duration

prefixSize := 1
irregularInterval := false

switch value[len(value)-1] {
case 'u':
Expand All @@ -27,10 +29,15 @@ func ParseTimeDuration(value string) (int64, error) {
constant = time.Hour
case 'd':
constant = 24 * time.Hour
case 'w':
case 'w', 'W':
constant = 7 * 24 * time.Hour
case 'y':
irregularInterval = true
case 'M':
constant = 30 * 24 * time.Hour
irregularInterval = true
case 'y', 'Y':
constant = 365 * 24 * time.Hour
irregularInterval = true
default:
prefixSize = 0
}
Expand All @@ -48,19 +55,20 @@ func ParseTimeDuration(value string) (int64, error) {

_, err := fmt.Sscan(timeString, &t)
if err != nil {
return 0, err
return 0, false, err
}

if prefixSize > 0 {
c := big.Rat{}
c.SetFrac64(int64(constant), 1)
t.Mul(&t, &c)
}

if t.IsInt() {
return t.Num().Int64(), nil
return t.Num().Int64(), irregularInterval, nil
}
f, _ := t.Float64()
return int64(f), nil
return int64(f), irregularInterval, nil
}

func GetFileSize(path string) (int64, error) {
Expand Down
4 changes: 2 additions & 2 deletions coordinator/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (s *RaftServer) CreateContinuousQuery(db string, query string) error {
return fmt.Errorf("Continuous queries with :series_name interpolation must use a regular expression in the from clause that prevents recursion")
}

duration, err := selectQuery.GetGroupByClause().GetGroupByTime()
duration, _, err := selectQuery.GetGroupByClause().GetGroupByTime()
if err != nil {
return fmt.Errorf("Couldn't get group by time for continuous query: %s", err)
}
Expand Down Expand Up @@ -497,7 +497,7 @@ func (s *RaftServer) checkContinuousQueries() {
continue
}

duration, err := query.GetGroupByClause().GetGroupByTime()
duration, _, err := query.GetGroupByClause().GetGroupByTime()
if err != nil {
log.Error("Couldn't get group by time for continuous query:", err)
continue
Expand Down
41 changes: 33 additions & 8 deletions engine/aggregator_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ type AggregatorEngine struct {
next Processor

// variables for aggregate queries
aggregators []Aggregator
elems []*parser.Value // group by columns other than time()
duration *time.Duration // the time by duration if any
seriesStates map[string]*SeriesState
aggregators []Aggregator
elems []*parser.Value // group by columns other than time()
duration *time.Duration // the time by duration if any
irregularInterval bool // group by time is week, month, or year
seriesStates map[string]*SeriesState
}

func (self *AggregatorEngine) Name() string {
Expand All @@ -57,9 +58,33 @@ func (self *AggregatorEngine) getTimestampFromPoint(point *protocol.Point) int64
}

func (self *AggregatorEngine) getTimestampBucket(timestampMicroseconds uint64) int64 {
timestampMicroseconds *= 1000 // convert to nanoseconds
multiplier := uint64(*self.duration)
return int64(timestampMicroseconds / multiplier * multiplier / 1000)
timestampSeconds := int64(timestampMicroseconds / 1000000)
timestampNanoseconds := int64(timestampMicroseconds%1000000) * 1000
timestamp := time.Unix(timestampSeconds, timestampNanoseconds)

if self.irregularInterval {
if *self.duration == 168*time.Hour {
// the duration is exactly 1 week = 168 hours
year, month, day := timestamp.Date()
weekday := timestamp.Weekday()
offset := day - int(weekday)
boundaryTime := time.Date(year, month, offset, 0, 0, 0, 0, time.UTC)
return boundaryTime.Unix() * 1000000
} else if *self.duration == 720*time.Hour {
// the duration is exactly 1 month = 30 days = 720 hours
year, month, _ := timestamp.Date()
boundaryTime := time.Date(year, month, 1, 0, 0, 0, 0, time.UTC)
return boundaryTime.Unix() * 1000000
} else if *self.duration == 8760*time.Hour {
// the duration is exactly 1 year = 365 days = 8,760 hours
year := timestamp.Year()
boundaryTime := time.Date(year, time.January, 1, 0, 0, 0, 0, time.UTC)
return boundaryTime.Unix() * 1000000
}
}

// the duration is a non-special interval
return int64(timestamp.Truncate(*self.duration).UnixNano() / 1000)
}

func (self *AggregatorEngine) Yield(s *protocol.Series) (bool, error) {
Expand Down Expand Up @@ -311,7 +336,7 @@ func NewAggregatorEngine(query *parser.SelectQuery, next Processor) (*Aggregator
}

var err error
ae.duration, err = query.GetGroupByClause().GetGroupByTime()
ae.duration, ae.irregularInterval, err = query.GetGroupByClause().GetGroupByTime()
if err != nil {
return nil, err
}
Expand Down
32 changes: 32 additions & 0 deletions integration/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1454,6 +1454,38 @@ func (self *DataTestSuite) TestGroupByDay(c *C) {
c.Assert(maps[1]["count"], Equals, 1.0)
}

func (self *DataTestSuite) TestLogicalGroupByBoundaries(c *C) {
// 1412092800s = Tuesday, 2014-10-07 12:00:00 -0400
// 1412352000s = Friday, 2014-10-10 12:00:00 -0400
// 1412611200s = Monday, 2014-10-13 12:00:00 -0400
data := `
[{
"name": "test_group_by_week",
"columns": ["value", "time"],
"points": [
[1, 1412092800],
[2, 1412092800],
[3, 1412092800],
[4, 1412352000],
[5, 1412352000],
[6, 1412352000],
[7, 1412352000],
[8, 1412611200],
[9, 1412611200]
]
}]`

self.client.WriteJsonData(data, c, "s")
collection := self.client.RunQuery("select count(value) from test_group_by_week group by time(1w)", c)
c.Assert(collection, HasLen, 1)
maps := ToMap(collection[0])
c.Assert(maps, HasLen, 2)
c.Assert(maps[0]["count"], Equals, 2.0)
c.Assert(maps[1]["count"], Equals, 7.0)
}

func (self *DataTestSuite) TestLimitQueryOnSingleShard(c *C) {
data := `[{"points": [[4], [10], [5]], "name": "test_limit_query_single_shard", "columns": ["value"]}]`
self.client.WriteJsonData(data, c)
Expand Down
12 changes: 6 additions & 6 deletions parser/group_by.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,27 @@ type GroupByClause struct {
Elems []*Value
}

func (self GroupByClause) GetGroupByTime() (*time.Duration, error) {
func (self GroupByClause) GetGroupByTime() (*time.Duration, bool, error) {
for _, groupBy := range self.Elems {
if groupBy.IsFunctionCall() && strings.ToLower(groupBy.Name) == "time" {
// TODO: check the number of arguments and return an error
if len(groupBy.Elems) != 1 {
return nil, common.NewQueryError(common.WrongNumberOfArguments, "time function only accepts one argument")
return nil, false, common.NewQueryError(common.WrongNumberOfArguments, "time function only accepts one argument")
}

if groupBy.Elems[0].Type != ValueDuration {
log.Debug("Get a time function without a duration argument %v", groupBy.Elems[0].Type)
}
arg := groupBy.Elems[0].Name
durationInt, err := common.ParseTimeDuration(arg)
durationInt, irregularInterval, err := common.ParseTimeDuration(arg)
if err != nil {
return nil, common.NewQueryError(common.InvalidArgument, fmt.Sprintf("invalid argument %s to the time function", arg))
return nil, false, common.NewQueryError(common.InvalidArgument, fmt.Sprintf("invalid argument %s to the time function", arg))
}
duration := time.Duration(durationInt)
return &duration, nil
return &duration, irregularInterval, nil
}
}
return nil, nil
return nil, false, nil
}

func (self *GroupByClause) GetString() string {
Expand Down
4 changes: 3 additions & 1 deletion parser/query_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,9 @@ func parseTime(value *Value) (int64, error) {
return t.UnixNano(), err
}

return common.ParseTimeDuration(value.Name)
duration, _, err := common.ParseTimeDuration(value.Name)

return duration, err
}

leftValue, err := parseTime(value.Elems[0])
Expand Down
3 changes: 2 additions & 1 deletion parser/query_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type QuerySpec struct {
endTime time.Time
seriesValuesAndColumns map[*Value][]string
RunAgainstAllServersInShard bool
GroupByIrregularInterval bool
groupByInterval *time.Duration
groupByColumnCount int
}
Expand Down Expand Up @@ -127,7 +128,7 @@ func (self *QuerySpec) GetGroupByInterval() *time.Duration {
return nil
}
if self.groupByInterval == nil {
self.groupByInterval, _ = self.query.SelectQuery.GetGroupByClause().GetGroupByTime()
self.groupByInterval, self.GroupByIrregularInterval, _ = self.query.SelectQuery.GetGroupByClause().GetGroupByTime()
}
return self.groupByInterval
}
Expand Down

0 comments on commit a286f5f

Please sign in to comment.