Skip to content

Commit

Permalink
fix #2337: panic if tag key isn't double quoted
Browse files Browse the repository at this point in the history
  • Loading branch information
dgnorton committed Apr 18, 2015
1 parent e890468 commit 2ad737d
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- [#2324](https://github.com/influxdb/influxdb/issues/2324): Race in Broker.Close()/Broker.RunContinousQueryProcessing()
- [#2325](https://github.com/influxdb/influxdb/pull/2325): Cluster open fixes
- [#2326](https://github.com/influxdb/influxdb/pull/2326): Fix parse error in CREATE CONTINUOUS QUERY
- [#2338](https://github.com/influxdb/influxdb/pull/2338): Fix panic if tag key isn't double quoted when it should have been

## v0.9.0-rc25 [2015-04-15]

Expand Down
6 changes: 6 additions & 0 deletions cmd/influxd/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,12 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
query: `select foo from "%DB%"."%RP%".where_events where tennant = 'paul' AND time > 1s AND (foo = 'bar' OR foo = 'baz')`,
expected: `{"results":[{"series":[{"name":"where_events","columns":["time","foo"],"values":[["2009-11-10T23:00:02Z","bar"],["2009-11-10T23:00:03Z","baz"]]}]}]}`,
},
{
name: "where on tag that should be double quoted but isn't",
queryDb: "%DB%",
query: `show series where data-center = 'foo'`,
expectPattern: `invalid expression: .*`,
},

// LIMIT and OFFSET tests

Expand Down
73 changes: 49 additions & 24 deletions database.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,15 +281,19 @@ func (m *Measurement) seriesByTags(tags map[string]string) *Series {

// filters walks the where clause of a select statement and returns a map with all series ids
// matching the where clause and any filter expression that should be applied to each
func (m *Measurement) filters(stmt *influxql.SelectStatement) map[uint64]influxql.Expr {
func (m *Measurement) filters(stmt *influxql.SelectStatement) (map[uint64]influxql.Expr, error) {
seriesIdsToExpr := make(map[uint64]influxql.Expr)
if stmt.Condition == nil || stmt.OnlyTimeDimensions() {
for _, id := range m.seriesIDs {
seriesIdsToExpr[id] = nil
}
return seriesIdsToExpr
return seriesIdsToExpr, nil
}
ids, _, _, err := m.walkWhereForSeriesIds(stmt.Condition, seriesIdsToExpr)

if err != nil {
return nil, err
}
ids, _, _ := m.walkWhereForSeriesIds(stmt.Condition, seriesIdsToExpr)

// ensure every id is in the map
for _, id := range ids {
Expand All @@ -298,7 +302,7 @@ func (m *Measurement) filters(stmt *influxql.SelectStatement) map[uint64]influxq
}
}

return seriesIdsToExpr
return seriesIdsToExpr, nil
}

// tagSets returns the unique tag sets that exist for the given tag keys. This is used to determine
Expand All @@ -308,9 +312,12 @@ func (m *Measurement) filters(stmt *influxql.SelectStatement) map[uint64]influxq
// {"region": "uswest", "service": "redis"}, {"region": "uswest", "service": "mysql"}, etc...
// This will also populate the TagSet objects with the series IDs that match each tagset and any
// influx filter expression that goes with the series
func (m *Measurement) tagSets(stmt *influxql.SelectStatement, dimensions []string) []*influxql.TagSet {
func (m *Measurement) tagSets(stmt *influxql.SelectStatement, dimensions []string) ([]*influxql.TagSet, error) {
// get the unique set of series ids and the filters that should be applied to each
filters := m.filters(stmt)
filters, err := m.filters(stmt)
if err != nil {
return nil, err
}

// build the tag sets
var tagStrings []string
Expand Down Expand Up @@ -348,32 +355,35 @@ func (m *Measurement) tagSets(stmt *influxql.SelectStatement, dimensions []strin
a = append(a, tagSets[s])
}

return a
return a, nil
}

// idsForExpr will return a collection of series ids, a bool indicating if the result should be
// used (it'll be false if it's a time expr) and a field expression if the passed in expression is against a field.
func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (seriesIDs, bool, influxql.Expr) {
func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (seriesIDs, bool, influxql.Expr, error) {
name, ok := n.LHS.(*influxql.VarRef)
value := n.RHS
if !ok {
name, _ = n.RHS.(*influxql.VarRef)
name, ok = n.RHS.(*influxql.VarRef)
if !ok {
return nil, false, nil, fmt.Errorf("invalid expression: %s", n.String())
}
value = n.LHS
}

// ignore time literals
if _, ok := value.(*influxql.TimeLiteral); ok || name.Val == "time" {
return nil, false, nil
return nil, false, nil, nil
}

// if it's a field we can't collapse it so we have to look at all series ids for this
if m.FieldByName(name.Val) != nil {
return m.seriesIDs, true, n
return m.seriesIDs, true, n, nil
}

tagVals, ok := m.seriesByTagKeyValue[name.Val]
if !ok {
return nil, true, nil
return nil, true, nil, nil
}

// if we're looking for series with specific tag values
Expand All @@ -386,7 +396,7 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (seriesIDs, bool, influ
} else if n.Op == influxql.NEQ {
ids = m.seriesIDs.reject(tagVals[str.Val])
}
return ids, true, nil
return ids, true, nil, nil
}

// if we're looking for series with tag values that match a regex
Expand All @@ -408,32 +418,44 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (seriesIDs, bool, influ
ids = ids.reject(tagVals[k])
}
}
return ids, true, nil
return ids, true, nil, nil
}

return nil, true, nil
return nil, true, nil, nil
}

// walkWhereForSeriesIds will recursively walk the where clause and return a collection of series ids, a boolean indicating if this return
// value should be included in the resulting set, and an expression if the return is a field expression.
// The map that it takes maps each series id to the field expression that should be used to evaluate it when iterating over its cursor.
// Series that have no field expressions won't be in the map
func (m *Measurement) walkWhereForSeriesIds(expr influxql.Expr, filters map[uint64]influxql.Expr) (seriesIDs, bool, influxql.Expr) {
func (m *Measurement) walkWhereForSeriesIds(expr influxql.Expr, filters map[uint64]influxql.Expr) (seriesIDs, bool, influxql.Expr, error) {
switch n := expr.(type) {
case *influxql.BinaryExpr:
switch n.Op {
case influxql.EQ, influxql.NEQ, influxql.LT, influxql.LTE, influxql.GT, influxql.GTE, influxql.EQREGEX, influxql.NEQREGEX:
// if it's a compare, then it's either a field expression or against a tag. we can return this
ids, shouldInclude, expr := m.idsForExpr(n)
ids, shouldInclude, expr, err := m.idsForExpr(n)
if err != nil {
return nil, false, nil, err
}

for _, id := range ids {
filters[id] = expr
}
return ids, shouldInclude, expr

return ids, shouldInclude, expr, nil
case influxql.AND, influxql.OR:
// if it's an AND or OR we need to union or intersect the results
var ids seriesIDs
l, il, lexpr := m.walkWhereForSeriesIds(n.LHS, filters)
r, ir, rexpr := m.walkWhereForSeriesIds(n.RHS, filters)
l, il, lexpr, err := m.walkWhereForSeriesIds(n.LHS, filters)
if err != nil {
return nil, false, nil, err
}

r, ir, rexpr, err := m.walkWhereForSeriesIds(n.RHS, filters)
if err != nil {
return nil, false, nil, err
}

if il && ir { // we should include both the LHS and RHS of the BinaryExpr in the return
if n.Op == influxql.AND {
Expand All @@ -442,7 +464,7 @@ func (m *Measurement) walkWhereForSeriesIds(expr influxql.Expr, filters map[uint
ids = l.union(r)
}
} else if !il && !ir { // we don't need to include either so return nothing
return nil, false, nil
return nil, false, nil, nil
} else if il { // just include the left side
ids = l
} else { // just include the right side
Expand Down Expand Up @@ -492,15 +514,15 @@ func (m *Measurement) walkWhereForSeriesIds(expr influxql.Expr, filters map[uint
}

// finally return the ids and say that we should include them
return ids, true, nil
return ids, true, nil, nil
}

return m.idsForExpr(n)
case *influxql.ParenExpr:
// walk down the tree
return m.walkWhereForSeriesIds(n.Expr, filters)
default:
return nil, false, nil
return nil, false, nil, nil
}
}

Expand Down Expand Up @@ -574,7 +596,10 @@ func (m *Measurement) seriesIDsAllOrByExpr(expr influxql.Expr) (seriesIDs, error

// Get series IDs that match the WHERE clause.
filters := map[uint64]influxql.Expr{}
ids, _, _ := m.walkWhereForSeriesIds(expr, filters)
ids, _, _, err := m.walkWhereForSeriesIds(expr, filters)
if err != nil {
return nil, err
}

return ids, nil
}
Expand Down
15 changes: 12 additions & 3 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2503,7 +2503,10 @@ func (s *Server) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement,
if stmt.Condition != nil {
// Get series IDs that match the WHERE clause.
filters := map[uint64]influxql.Expr{}
ids, _, _ = m.walkWhereForSeriesIds(stmt.Condition, filters)
ids, _, _, err = m.walkWhereForSeriesIds(stmt.Condition, filters)
if err != nil {
return &Result{Err: err}
}

// TODO: check return of walkWhereForSeriesIds for fields
} else {
Expand Down Expand Up @@ -2546,7 +2549,10 @@ func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement,
if stmt.Condition != nil {
// Get series IDs that match the WHERE clause.
filters := map[uint64]influxql.Expr{}
ids, _, _ = m.walkWhereForSeriesIds(stmt.Condition, filters)
ids, _, _, err = m.walkWhereForSeriesIds(stmt.Condition, filters)
if err != nil {
return &Result{Err: err}
}

// If no series matched, then go to the next measurement.
if len(ids) == 0 {
Expand Down Expand Up @@ -2761,7 +2767,10 @@ func (s *Server) executeShowTagValuesStatement(stmt *influxql.ShowTagValuesState
if stmt.Condition != nil {
// Get series IDs that match the WHERE clause.
filters := map[uint64]influxql.Expr{}
ids, _, _ = m.walkWhereForSeriesIds(stmt.Condition, filters)
ids, _, _, err = m.walkWhereForSeriesIds(stmt.Condition, filters)
if err != nil {
return &Result{Err: err}
}

// If no series matched, then go to the next measurement.
if len(ids) == 0 {
Expand Down
5 changes: 4 additions & 1 deletion tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri
}

// get the sorted unique tag sets for this query.
tagSets := m.tagSets(stmt, tagKeys)
tagSets, err := m.tagSets(stmt, tagKeys)
if err != nil {
return nil, err
}

//jobs := make([]*influxql.MapReduceJob, 0, len(tagSets))
for _, t := range tagSets {
Expand Down

0 comments on commit 2ad737d

Please sign in to comment.