diff --git a/api/http/api.go b/api/http/api.go index 65c2d436d61..94099af7fed 100644 --- a/api/http/api.go +++ b/api/http/api.go @@ -1210,8 +1210,8 @@ func (self *HttpServer) configureDatabase(w libhttp.ResponseWriter, r *libhttp.R return libhttp.StatusBadRequest, err.Error() } for _, query := range q { - if !query.SelectQuery.IsContinuousQuery() { - return libhttp.StatusBadRequest, fmt.Errorf("This query isn't a continuous query. Use 'into'. %s", query.QueryString) + if !query.IsContinuousQuery() { + return libhttp.StatusBadRequest, fmt.Errorf("This query isn't a continuous query. Use 'into'. %s", query.GetQueryString()) } } } @@ -1238,7 +1238,7 @@ func (self *HttpServer) configureDatabase(w libhttp.ResponseWriter, r *libhttp.R for _, queryString := range databaseConfig.ContinuousQueries { q, _ := parser.ParseQuery(queryString) for _, query := range q { - err := self.coordinator.CreateContinuousQuery(u, database, query.QueryString) + err := self.coordinator.CreateContinuousQuery(u, database, query.GetQueryString()) if err != nil { return libhttp.StatusInternalServerError, err.Error() } diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 8e9bf1febbf..5b175574314 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -91,65 +91,40 @@ func (self *CoordinatorImpl) RunQuery(user common.User, database string, querySt } for _, query := range q { - querySpec := parser.NewQuerySpec(user, database, query) - - if query.DeleteQuery != nil { - if err := self.clusterConfiguration.CreateCheckpoint(); err != nil { - return err - } - - if err := self.runDeleteQuery(querySpec, seriesWriter); err != nil { - return err - } - continue - } - - if query.DropQuery != nil { - if err := self.DeleteContinuousQuery(user, database, uint32(query.DropQuery.Id)); err != nil { - return err - } - continue - } - - if query.IsListQuery() { - if query.IsListSeriesQuery() { - self.runListSeriesQuery(querySpec, seriesWriter) - } else if query.IsListContinuousQueriesQuery() { - queries, err := self.ListContinuousQueries(user, database) - if err != nil { - return err - } - for _, q := range queries { - if err := seriesWriter.Write(q); err != nil { - return err - } - } - } - continue - } - - if query.DropSeriesQuery != nil { - err := self.runDropSeriesQuery(querySpec, seriesWriter) - if err != nil { - return err - } - continue - } - - selectQuery := query.SelectQuery - - if selectQuery.IsContinuousQuery() { - return self.CreateContinuousQuery(user, database, queryString) - } - if err := self.checkPermission(user, querySpec); err != nil { + err := self.runSingleQuery(user, database, query, seriesWriter) + if err != nil { return err } - return self.runQuery(querySpec, seriesWriter) } seriesWriter.Close() return nil } +func (self *CoordinatorImpl) runSingleQuery(user common.User, db string, q *parser.Query, sw SeriesWriter) error { + querySpec := parser.NewQuerySpec(user, db, q) + + switch qt := q.Type(); qt { + // administrative + case parser.DropContinuousQuery: + return self.runDropContinuousQuery(user, db, uint32(q.DropQuery.Id)) + case parser.ListContinuousQueries: + return self.runListContinuousQueries(user, db, sw) + case parser.Continuous: + return self.runContinuousQuery(user, db, q.GetQueryString()) + case parser.ListSeries: + return self.runListSeriesQuery(querySpec, sw) + // Data queries + case parser.Delete: + return self.runDeleteQuery(querySpec, sw) + case parser.DropSeries: + return self.runDropSeriesQuery(querySpec, sw) + case parser.Select: + return self.runSelectQuery(user, querySpec, sw) + default: + return fmt.Errorf("Can't handle query %s", qt) + } +} + func (self *CoordinatorImpl) checkPermission(user common.User, querySpec *parser.QuerySpec) error { // if this isn't a regex query do the permission check here fromClause := querySpec.SelectQuery().GetFromClause() @@ -165,10 +140,26 @@ func (self *CoordinatorImpl) checkPermission(user common.User, querySpec *parser } // This should only get run for SelectQuery types -func (self *CoordinatorImpl) runQuery(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error { +func (self *CoordinatorImpl) runSelectQuery(user common.User, querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error { + if err := self.checkPermission(user, querySpec); err != nil { + return err + } return self.runQuerySpec(querySpec, seriesWriter) } +func (self *CoordinatorImpl) runListContinuousQueries(user common.User, db string, sw SeriesWriter) error { + queries, err := self.ListContinuousQueries(user, db) + if err != nil { + return err + } + for _, q := range queries { + if err := sw.Write(q); err != nil { + return err + } + } + return nil +} + func (self *CoordinatorImpl) runListSeriesQuery(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error { allSeries := self.clusterConfiguration.MetaStore.GetSeriesForDatabase(querySpec.Database()) matchingSeries := allSeries @@ -198,6 +189,10 @@ func (self *CoordinatorImpl) runListSeriesQuery(querySpec *parser.QuerySpec, ser } func (self *CoordinatorImpl) runDeleteQuery(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error { + if err := self.clusterConfiguration.CreateCheckpoint(); err != nil { + return err + } + user := querySpec.User() db := querySpec.Database() if ok, err := self.permissions.AuthorizeDeleteQuery(user, db); !ok { @@ -753,7 +748,7 @@ func (self *CoordinatorImpl) writeWithoutAssigningId(db string, series []*protoc return shard.Write(request) } -func (self *CoordinatorImpl) CreateContinuousQuery(user common.User, db string, query string) error { +func (self *CoordinatorImpl) runContinuousQuery(user common.User, db string, query string) error { if ok, err := self.permissions.AuthorizeCreateContinuousQuery(user, db); !ok { return err } @@ -765,7 +760,7 @@ func (self *CoordinatorImpl) CreateContinuousQuery(user common.User, db string, return nil } -func (self *CoordinatorImpl) DeleteContinuousQuery(user common.User, db string, id uint32) error { +func (self *CoordinatorImpl) runDropContinuousQuery(user common.User, db string, id uint32) error { if ok, err := self.permissions.AuthorizeDeleteContinuousQuery(user, db); !ok { return err } @@ -1038,3 +1033,16 @@ func (self *CoordinatorImpl) ConnectToProtobufServers(localRaftName string) erro func isValidName(name string) bool { return !strings.Contains(name, "%") } + +// TODO: this is for backward compatability only, remove after 0.8 +func (self *CoordinatorImpl) DeleteContinuousQuery(user common.User, db string, id uint32) error { + return self.runDropContinuousQuery(user, db, id) +} + +func (self *CoordinatorImpl) CreateContinuousQuery(user common.User, db string, query string) error { + return self.RunContinuousQuery(user, db, query) +} + +func (self *CoordinatorImpl) RunContinuousQuery(user common.User, db string, query string) error { + return self.runContinuousQuery(user, db, query) +} diff --git a/parser/frees.c b/parser/frees.c index 71a75da5c60..be992917a2a 100644 --- a/parser/frees.c +++ b/parser/frees.c @@ -134,12 +134,24 @@ free_drop_series_query (drop_series_query *q) } void -close_query (query *q) +close_queries (queries *q) { - if (q->error) { + if (q->error) { free_error(q->error); - } + } + + while (q->size > 0) { + query *query = q->qs[--q->size]; + close_query(query); + free(query); + } + free(q->qs); + q->qs = NULL; +} +void +close_query (query *q) +{ if (q->select_query) { free_select_query(q->select_query); free(q->select_query); diff --git a/parser/parser.go b/parser/parser.go index 1bd377656f4..e16ba8ed542 100644 --- a/parser/parser.go +++ b/parser/parser.go @@ -80,12 +80,12 @@ type DeleteQuery struct { } type Query struct { - QueryString string SelectQuery *SelectQuery DeleteQuery *DeleteQuery ListQuery *ListQuery DropSeriesQuery *DropSeriesQuery DropQuery *DropQuery + qType QueryType } func (self *IntoClause) GetString() string { @@ -98,6 +98,10 @@ func (self *IntoClause) GetString() string { return buffer.String() } +func (self *Query) Type() QueryType { + return self.qType +} + func (self *Query) GetQueryString() string { return self.commonGetQueryString(false) } @@ -107,17 +111,25 @@ func (self *Query) GetQueryStringWithTimeCondition() string { } func (self *Query) commonGetQueryString(withTime bool) string { - if self.SelectQuery != nil { + switch self.qType { + case Select, Continuous: if withTime { return self.SelectQuery.GetQueryStringWithTimeCondition() } return self.SelectQuery.GetQueryString() - } else if self.ListQuery != nil { - return "list series" - } else if self.DeleteQuery != nil { + case Delete: return self.DeleteQuery.GetQueryString(withTime) + case DropContinuousQuery: + return fmt.Sprintf("drop continuous query %d", self.DropQuery.Id) + case ListSeries: + return "list series" + case ListContinuousQueries: + return "list continuous queries" + case DropSeries: + return "drop series " + self.DropSeriesQuery.tableName + default: + panic(fmt.Errorf("Unknown query type %s", self.qType)) } - return self.QueryString } func (self *Query) IsListQuery() bool { @@ -262,8 +274,8 @@ func (self *SelectQuery) GetSinglePointQuerySequenceNumber() (int64, error) { return sequence_number, nil } -func (self *SelectQuery) IsContinuousQuery() bool { - return self.GetIntoClause() != nil +func (self *Query) IsContinuousQuery() bool { + return self.qType == Continuous } func (self *SelectQuery) IsValidContinuousQuery() bool { @@ -597,24 +609,7 @@ func ParseSelectQuery(query string) (*SelectQuery, error) { return selectQuery, nil } -func ParseQuery(query string) ([]*Query, error) { - queryString := C.CString(query) - defer C.free(unsafe.Pointer(queryString)) - q := C.parse_query(queryString) - defer C.close_query(&q) - - if q.error != nil { - str := C.GoString(q.error.err) - return nil, &QueryError{ - firstLine: int(q.error.first_line), - firstColumn: int(q.error.first_column) - 1, - lastLine: int(q.error.last_line), - lastColumn: int(q.error.last_column) - 1, - errorString: str, - queryString: query, - } - } - +func parseSingleQuery(q *C.query) (*Query, error) { if q.list_series_query != nil { var value *Value var err error @@ -626,11 +621,11 @@ func ParseQuery(query string) ([]*Query, error) { return nil, err } } - return []*Query{{QueryString: query, ListQuery: &ListQuery{Type: t, value: value}}}, nil + return &Query{ListQuery: &ListQuery{Type: t, value: value}, qType: ListSeries}, nil } if q.list_continuous_queries_query != 0 { - return []*Query{{QueryString: query, ListQuery: &ListQuery{Type: ContinuousQueries}}}, nil + return &Query{ListQuery: &ListQuery{Type: ContinuousQueries}, qType: ListContinuousQueries}, nil } if q.select_query != nil { @@ -639,26 +634,62 @@ func ParseQuery(query string) ([]*Query, error) { return nil, err } - return []*Query{{QueryString: query, SelectQuery: selectQuery}}, nil + qType := Select + if selectQuery.IntoClause != nil { + qType = Continuous + } + return &Query{SelectQuery: selectQuery, qType: qType}, nil } else if q.delete_query != nil { deleteQuery, err := parseDeleteQuery(q.delete_query) if err != nil { return nil, err } - return []*Query{{QueryString: query, DeleteQuery: deleteQuery}}, nil + return &Query{DeleteQuery: deleteQuery, qType: Delete}, nil } else if q.drop_series_query != nil { - dropSeriesQuery, err := parseDropSeriesQuery(query, q.drop_series_query) + dropSeriesQuery, err := parseDropSeriesQuery(q.drop_series_query) if err != nil { return nil, err } - return []*Query{{QueryString: query, DropSeriesQuery: dropSeriesQuery}}, nil + return &Query{DropSeriesQuery: dropSeriesQuery, qType: DropSeries}, nil } else if q.drop_query != nil { - return []*Query{{QueryString: query, DropQuery: &DropQuery{Id: int(q.drop_query.id)}}}, nil + return &Query{DropQuery: &DropQuery{Id: int(q.drop_query.id)}, qType: DropContinuousQuery}, nil } return nil, fmt.Errorf("Unknown query type encountered") } -func parseDropSeriesQuery(queryStirng string, dropSeriesQuery *C.drop_series_query) (*DropSeriesQuery, error) { +func ParseQuery(queryStr string) ([]*Query, error) { + queryString := C.CString(queryStr) + defer C.free(unsafe.Pointer(queryString)) + q := C.parse_query(queryString) + defer C.close_queries(&q) + + if q.error != nil { + str := C.GoString(q.error.err) + return nil, &QueryError{ + firstLine: int(q.error.first_line), + firstColumn: int(q.error.first_column) - 1, + lastLine: int(q.error.last_line), + lastColumn: int(q.error.last_column) - 1, + errorString: str, + queryString: queryStr, + } + } + + var queries []*C.query + setupSlice((*reflect.SliceHeader)((unsafe.Pointer(&queries))), unsafe.Pointer(q.qs), q.size) + + parsedQueries := make([]*Query, len(queries)) + for i, query := range queries { + query, err := parseSingleQuery(query) + if err != nil { + return nil, err + } + parsedQueries[i] = query + } + return parsedQueries, nil +} + +func parseDropSeriesQuery(dropSeriesQuery *C.drop_series_query) (*DropSeriesQuery, error) { name, err := GetValue(dropSeriesQuery.name) if err != nil { return nil, err diff --git a/parser/parser_test.go b/parser/parser_test.go index afbee67ea61..e6fd3d56b68 100644 --- a/parser/parser_test.go +++ b/parser/parser_test.go @@ -100,8 +100,6 @@ func (self *QueryParserSuite) TestGetQueryString(c *C) { actualQuery, err := ParseQuery(queryString) c.Assert(err, IsNil) c.Assert(actualQuery, HasLen, 1) - expectedQuery[0].QueryString = "" - actualQuery[0].QueryString = "" if expectedQuery[0].DeleteQuery != nil { expectedQuery[0].DeleteQuery.startTimeSpecified = false actualQuery[0].DeleteQuery.startTimeSpecified = false @@ -869,10 +867,7 @@ func (self *QueryParserSuite) TestIsSinglePointQuery(c *C) { } func (self *QueryParserSuite) TestParseContinuousQueryCreation(c *C) { - query := "select * from foo into bar;" - q, err := ParseSelectQuery(query) - c.Assert(err, IsNil) - c.Assert(q.IsContinuousQuery(), Equals, true) + q := getContinuousQuery("select * from foo into bar;", c) c.Assert(q.IsValidContinuousQuery(), Equals, true) clause := q.GetIntoClause() c.Assert(clause.Target, DeepEquals, &Value{"bar", "", ValueSimpleName, nil, nil, false}) @@ -910,37 +905,33 @@ func (self *QueryParserSuite) TestParseRecursiveContinuousQueries(c *C) { c.Assert(q.IsNonRecursiveContinuousQuery(), Equals, false) } -func (self *QueryParserSuite) TestParseInterpolatedContinuousQueryCreation(c *C) { - query := "select * from foo into bar.[c4];" - q, err := ParseSelectQuery(query) +func getContinuousQuery(q string, c *C) *SelectQuery { + queries, err := ParseQuery(q) c.Assert(err, IsNil) - c.Assert(q.IsContinuousQuery(), Equals, true) + c.Assert(queries, HasLen, 1) + query := queries[0] + c.Assert(query.IsContinuousQuery(), Equals, true) + return query.SelectQuery +} + +func (self *QueryParserSuite) TestParseInterpolatedContinuousQueryCreation(c *C) { + q := getContinuousQuery("select * from foo into bar.[c4];", c) clause := q.GetIntoClause() c.Assert(clause.Target, DeepEquals, &Value{"bar.[c4]", "", ValueIntoName, nil, nil, false}) - query = "select * from foo into [c5].bar.[c4];" - q, err = ParseSelectQuery(query) - c.Assert(err, IsNil) - c.Assert(q.IsContinuousQuery(), Equals, true) + q = getContinuousQuery("select * from foo into [c5].bar.[c4];", c) clause = q.GetIntoClause() c.Assert(clause.Target, DeepEquals, &Value{"[c5].bar.[c4]", "", ValueIntoName, nil, nil, false}) - query = "select average(c4), count(c5) from s3 group by time(1h) into [average].[count];" - q, err = ParseSelectQuery(query) - c.Assert(err, IsNil) - c.Assert(q.IsContinuousQuery(), Equals, true) + q = getContinuousQuery("select average(c4), count(c5) from s3 group by time(1h) into [average].[count];", c) clause = q.GetIntoClause() c.Assert(clause.Target, DeepEquals, &Value{"[average].[count]", "", ValueIntoName, nil, nil, false}) - query = "select * from foo into :series_name.foo;" - q, err = ParseSelectQuery(query) - c.Assert(err, IsNil) - c.Assert(q.IsContinuousQuery(), Equals, true) + q = getContinuousQuery("select * from foo into :series_name.foo;", c) clause = q.GetIntoClause() c.Assert(clause.Target, DeepEquals, &Value{":series_name.foo", "", ValueIntoName, nil, nil, false}) - query = "select * from foo into ]bar" - q, err = ParseSelectQuery(query) + _, err := ParseQuery("select * from foo into ]bar") c.Assert(err, NotNil) } @@ -979,6 +970,7 @@ func (self *QueryParserSuite) TestQueryErrorShouldHaveQueryString(c *C) { query := "select ! from foo;" _, err := ParseSelectQuery(query) e, _ := err.(*QueryError) + c.Assert(e, NotNil) c.Assert(e.queryString, Equals, query) } diff --git a/parser/query.yacc b/parser/query.yacc index ee39156cb39..5502caa23cb 100644 --- a/parser/query.yacc +++ b/parser/query.yacc @@ -70,7 +70,7 @@ value *create_expression_value(char *operator, size_t size, ...) { // declare that we want a reentrant parser %define api.pure -%parse-param {query *q} +%parse-param {queries *q} %parse-param {void *scanner} %lex-param {void *scanner} @@ -127,20 +127,20 @@ value *create_expression_value(char *operator, size_t size, ...) { ALL_QUERIES: QUERY { - *q = *$1; - free($1); + q->qs = realloc(q->qs, (q->size + 1) * sizeof(query)); + q->qs[q->size++] = $1; } | QUERY ';' { - *q = *$1; - free($1); + q->qs = realloc(q->qs, (q->size + 1) * sizeof(query)); + q->qs[q->size++] = $1; } | QUERY ';' ALL_QUERIES { - *q = *$1; - free($1); + q->qs = realloc(q->qs, (q->size + 1) * sizeof(query)); + q->qs[q->size++] = $1; } QUERY: @@ -702,10 +702,10 @@ BOOL_OPERATION: void *yy_scan_string(char *, void *); void yy_delete_buffer(void *, void *); -query +queries parse_query(char *const query_s) { - query q = {NULL, NULL, NULL, NULL, FALSE, FALSE, NULL}; + queries q = {0, NULL}; void *scanner; yylex_init(&scanner); #ifdef DEBUG @@ -719,7 +719,7 @@ parse_query(char *const query_s) return q; } -int yyerror(YYLTYPE *locp, query *q, void *s, char *err) { +int yyerror(YYLTYPE *locp, queries *q, void *s, char *err) { q->error = malloc(sizeof(error)); q->error->err = strdup(err); q->error->first_line = locp->first_line; diff --git a/parser/query_type.go b/parser/query_type.go new file mode 100644 index 00000000000..cb904efe5df --- /dev/null +++ b/parser/query_type.go @@ -0,0 +1,36 @@ +package parser + +import "fmt" + +type QueryType int + +const ( + Select QueryType = iota + Delete + DropContinuousQuery + ListSeries + ListContinuousQueries + DropSeries + Continuous +) + +func (qt QueryType) String() string { + switch qt { + case Select: + return "select" + case Delete: + return "delete" + case DropContinuousQuery: + return "drop continuous query" + case ListSeries: + return "list series" + case ListContinuousQueries: + return "list continuous queries" + case DropSeries: + return "drop series" + case Continuous: + return "continuous" + default: + return fmt.Sprintf("Unknown(%d)", qt) + } +} diff --git a/parser/query_types.h b/parser/query_types.h index fc5b7ea5a0c..9bfe1a1ce72 100644 --- a/parser/query_types.h +++ b/parser/query_types.h @@ -121,9 +121,15 @@ typedef struct { drop_query *drop_query; list_series_query *list_series_query; char list_continuous_queries_query; - error *error; } query; +// queries is an array of query +typedef struct { + size_t size; + query **qs; + error *error; +} queries; + // some funcs for freeing our types void free_array(array *array); void free_value_array(value_array *array); @@ -132,5 +138,6 @@ void free_condition(condition *condition); void free_error (error *error); // this is the api that is used in GO -query parse_query(char *const query_s); +queries parse_query(char *const query_s); void close_query (query *q); +void close_queries (queries *queries); diff --git a/parser/test_memory_leaks.sh b/parser/test_memory_leaks.sh index 4ac11ce52e0..58d6a2989b7 100755 --- a/parser/test_memory_leaks.sh +++ b/parser/test_memory_leaks.sh @@ -5,77 +5,77 @@ cat > test_memory.c <now()-1d;"); - close_query(&q); + queries q = parse_query("select count(*) from users.events group_by user_email,time(1h) where time>now()-1d;"); + close_queries(&q); q = parse_query("explain select users.events group_by user_email,time(1h) where time>now()-1d;"); - close_query(&q); + close_queries(&q); q = parse_query("select * from foo where time < -1s"); - close_query(&q); + close_queries(&q); // test partial regex q = parse_query("list series /"); - close_query(&q); + close_queries(&q); // test freeing list series query q = parse_query("list series /foo/ bar"); - close_query(&q); + close_queries(&q); // test freeing on error q = parse_query("select count(*) from users.events group_by user_email,time(1h) where time >> now()-1d;"); - close_query(&q); + close_queries(&q); // test freeing alias q = parse_query("select count(bar) as the_count from users.events group_by user_email,time(1h);"); - close_query(&q); + close_queries(&q); // test freeing where conditions q = parse_query("select value from t where c == 5 and b == 6;"); - close_query(&q); + close_queries(&q); // test freeing where conditions q = parse_query("select -1 * value from t where c == 5 and b == 6;"); - close_query(&q); + close_queries(&q); // test freeing simple query q = parse_query("select value from t where c == '5';"); - close_query(&q); + close_queries(&q); // test freeing on error q = parse_query("select value from t where c = '5';"); - close_query(&q); + close_queries(&q); q = parse_query("select value from cpu.idle where value > 90 and (time > now() - 1d or value > 80) and time < now() - 1w;"); - close_query(&q); + close_queries(&q); q = parse_query("select value from cpu.idle where value > 90 and (time > now() - 1d or value > 80) and time < now() - 1w last 10;"); - close_query(&q); + close_queries(&q); q = parse_query("select email from users.events where email =~ /gmail\\\\.com/i and time>now()-2d;"); - close_query(&q); + close_queries(&q); q = parse_query("select email from users.events as events where email === /gmail\\\\.com/i and time>now()-2d;"); - close_query(&q); + close_queries(&q); q = parse_query("select email from users.events where email in ('jvshahid@gmail.com')"); - close_query(&q); + close_queries(&q); q = parse_query("drop series foobar"); - close_query(&q); + close_queries(&q); q = parse_query("select * from foobar limit"); - close_query(&q); + close_queries(&q); // test continuous queries q = parse_query("select * from foo into bar;"); - close_query(&q); + close_queries(&q); q = parse_query("list continuous queries;"); - close_query(&q); + close_queries(&q); q = parse_query("drop continuous query 5;"); - close_query(&q); + close_queries(&q); return 0; }