Skip to content

Commit

Permalink
Merge pull request #472 from influxdb/fix-323-continuous-queries-shou…
Browse files Browse the repository at this point in the history
…ld-guard-against-data-loops

Fix #323. Continuous queries should guard against data loops.
  • Loading branch information
jvshahid committed Apr 25, 2014
2 parents 0d35128 + f38e0a5 commit c335f71
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/coordinator/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ func (s *RaftServer) CreateContinuousQuery(db string, query string) error {
return fmt.Errorf("Continuous queries with a group by clause must include time(...) as one of the elements")
}

if !selectQuery.IsNonRecursiveContinuousQuery() {
return fmt.Errorf("Continuous queries with :series_name interpolation must use a regular expression in the from clause that prevents recursion")
}

duration, err := selectQuery.GetGroupByClause().GetGroupByTime()
if err != nil {
return fmt.Errorf("Couldn't get group by time for continuous query: %s", err)
Expand Down
33 changes: 33 additions & 0 deletions src/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"reflect"
"regexp"
"strconv"
"strings"
"time"
"unsafe"
)
Expand Down Expand Up @@ -253,6 +254,38 @@ func (self *SelectQuery) IsValidContinuousQuery() bool {
return false
}

func (self *SelectQuery) IsNonRecursiveContinuousQuery() bool {
fromClause := self.GetFromClause()
intoClause := self.GetIntoClause()

for _, from := range fromClause.Names {
regex, ok := from.Name.GetCompiledRegex()

if !ok {
continue
}

regexString := regex.String()
intoTarget := intoClause.Target.Name

if !strings.Contains(intoTarget, ":series_name") {
continue
} else {
if strings.HasPrefix(regexString, "^") && !strings.HasPrefix(intoTarget, ":series_name") {
continue
}

if strings.HasSuffix(regexString, "$") && !strings.HasSuffix(intoTarget, ":series_name") {
continue
}

return false
}
}

return true
}

func (self *SelectQuery) GetIntoClause() *IntoClause {
return self.IntoClause
}
Expand Down
32 changes: 32 additions & 0 deletions src/parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,38 @@ func (self *QueryParserSuite) TestParseContinuousQueryCreation(c *C) {
c.Assert(clause.Target, DeepEquals, &Value{"bar", "", ValueSimpleName, nil, nil, false})
}

func (self *QueryParserSuite) TestParseRecursiveContinuousQueries(c *C) {
query := `select * from /^stats\\..*/ into bar;`
q, err := ParseSelectQuery(query)
c.Assert(err, IsNil)
c.Assert(q.IsNonRecursiveContinuousQuery(), Equals, true)

query = `select * from /^stats\\..*/ group by time(5m) into rollups.:series_name.5m;`
q, err = ParseSelectQuery(query)
c.Assert(err, IsNil)
c.Assert(q.IsNonRecursiveContinuousQuery(), Equals, true)

query = `select * from /.*stats$/ group by time(5m) into :series_name.5m;`
q, err = ParseSelectQuery(query)
c.Assert(err, IsNil)
c.Assert(q.IsNonRecursiveContinuousQuery(), Equals, true)

query = `select * from /.*/ into :series_name.foo;`
q, err = ParseSelectQuery(query)
c.Assert(err, IsNil)
c.Assert(q.IsNonRecursiveContinuousQuery(), Equals, false)

query = `select * from /^stats\..*/ into :series_name.foo;`
q, err = ParseSelectQuery(query)
c.Assert(err, IsNil)
c.Assert(q.IsNonRecursiveContinuousQuery(), Equals, false)

query = `select * from /stats$/ into foo.:series_name;`
q, err = ParseSelectQuery(query)
c.Assert(err, IsNil)
c.Assert(q.IsNonRecursiveContinuousQuery(), Equals, false)
}

func (self *QueryParserSuite) TestParseInterpolatedContinuousQueryCreation(c *C) {
query := "select * from foo into bar.[c4];"
q, err := ParseSelectQuery(query)
Expand Down

0 comments on commit c335f71

Please sign in to comment.