Skip to content

Commit

Permalink
Fix parsing of multiple queries
Browse files Browse the repository at this point in the history
Fix #885
  • Loading branch information
jvshahid committed Sep 24, 2014
1 parent a62e8c8 commit 9b9ac93
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 153 deletions.
6 changes: 3 additions & 3 deletions api/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,8 +1210,8 @@ func (self *HttpServer) configureDatabase(w libhttp.ResponseWriter, r *libhttp.R
return libhttp.StatusBadRequest, err.Error()
}
for _, query := range q {
if !query.SelectQuery.IsContinuousQuery() {
return libhttp.StatusBadRequest, fmt.Errorf("This query isn't a continuous query. Use 'into'. %s", query.QueryString)
if !query.IsContinuousQuery() {
return libhttp.StatusBadRequest, fmt.Errorf("This query isn't a continuous query. Use 'into'. %s", query.GetQueryString())
}
}
}
Expand All @@ -1238,7 +1238,7 @@ func (self *HttpServer) configureDatabase(w libhttp.ResponseWriter, r *libhttp.R
for _, queryString := range databaseConfig.ContinuousQueries {
q, _ := parser.ParseQuery(queryString)
for _, query := range q {
err := self.coordinator.CreateContinuousQuery(u, database, query.QueryString)
err := self.coordinator.CreateContinuousQuery(u, database, query.GetQueryString())
if err != nil {
return libhttp.StatusInternalServerError, err.Error()
}
Expand Down
118 changes: 63 additions & 55 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,65 +91,40 @@ func (self *CoordinatorImpl) RunQuery(user common.User, database string, querySt
}

for _, query := range q {
querySpec := parser.NewQuerySpec(user, database, query)

if query.DeleteQuery != nil {
if err := self.clusterConfiguration.CreateCheckpoint(); err != nil {
return err
}

if err := self.runDeleteQuery(querySpec, seriesWriter); err != nil {
return err
}
continue
}

if query.DropQuery != nil {
if err := self.DeleteContinuousQuery(user, database, uint32(query.DropQuery.Id)); err != nil {
return err
}
continue
}

if query.IsListQuery() {
if query.IsListSeriesQuery() {
self.runListSeriesQuery(querySpec, seriesWriter)
} else if query.IsListContinuousQueriesQuery() {
queries, err := self.ListContinuousQueries(user, database)
if err != nil {
return err
}
for _, q := range queries {
if err := seriesWriter.Write(q); err != nil {
return err
}
}
}
continue
}

if query.DropSeriesQuery != nil {
err := self.runDropSeriesQuery(querySpec, seriesWriter)
if err != nil {
return err
}
continue
}

selectQuery := query.SelectQuery

if selectQuery.IsContinuousQuery() {
return self.CreateContinuousQuery(user, database, queryString)
}
if err := self.checkPermission(user, querySpec); err != nil {
err := self.runSingleQuery(user, database, query, seriesWriter)
if err != nil {
return err
}
return self.runQuery(querySpec, seriesWriter)
}
seriesWriter.Close()
return nil
}

func (self *CoordinatorImpl) runSingleQuery(user common.User, db string, q *parser.Query, sw SeriesWriter) error {
querySpec := parser.NewQuerySpec(user, db, q)

switch qt := q.Type(); qt {
// administrative
case parser.DropContinuousQuery:
return self.runDropContinuousQuery(user, db, uint32(q.DropQuery.Id))
case parser.ListContinuousQueries:
return self.runListContinuousQueries(user, db, sw)
case parser.Continuous:
return self.runContinuousQuery(user, db, q.GetQueryString())
case parser.ListSeries:
return self.runListSeriesQuery(querySpec, sw)
// Data queries
case parser.Delete:
return self.runDeleteQuery(querySpec, sw)
case parser.DropSeries:
return self.runDropSeriesQuery(querySpec, sw)
case parser.Select:
return self.runSelectQuery(user, querySpec, sw)
default:
return fmt.Errorf("Can't handle query %s", qt)
}
}

func (self *CoordinatorImpl) checkPermission(user common.User, querySpec *parser.QuerySpec) error {
// if this isn't a regex query do the permission check here
fromClause := querySpec.SelectQuery().GetFromClause()
Expand All @@ -165,10 +140,26 @@ func (self *CoordinatorImpl) checkPermission(user common.User, querySpec *parser
}

// This should only get run for SelectQuery types
func (self *CoordinatorImpl) runQuery(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error {
func (self *CoordinatorImpl) runSelectQuery(user common.User, querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error {
if err := self.checkPermission(user, querySpec); err != nil {
return err
}
return self.runQuerySpec(querySpec, seriesWriter)
}

func (self *CoordinatorImpl) runListContinuousQueries(user common.User, db string, sw SeriesWriter) error {
queries, err := self.ListContinuousQueries(user, db)
if err != nil {
return err
}
for _, q := range queries {
if err := sw.Write(q); err != nil {
return err
}
}
return nil
}

func (self *CoordinatorImpl) runListSeriesQuery(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error {
allSeries := self.clusterConfiguration.MetaStore.GetSeriesForDatabase(querySpec.Database())
matchingSeries := allSeries
Expand Down Expand Up @@ -198,6 +189,10 @@ func (self *CoordinatorImpl) runListSeriesQuery(querySpec *parser.QuerySpec, ser
}

func (self *CoordinatorImpl) runDeleteQuery(querySpec *parser.QuerySpec, seriesWriter SeriesWriter) error {
if err := self.clusterConfiguration.CreateCheckpoint(); err != nil {
return err
}

user := querySpec.User()
db := querySpec.Database()
if ok, err := self.permissions.AuthorizeDeleteQuery(user, db); !ok {
Expand Down Expand Up @@ -753,7 +748,7 @@ func (self *CoordinatorImpl) writeWithoutAssigningId(db string, series []*protoc
return shard.Write(request)
}

func (self *CoordinatorImpl) CreateContinuousQuery(user common.User, db string, query string) error {
func (self *CoordinatorImpl) runContinuousQuery(user common.User, db string, query string) error {
if ok, err := self.permissions.AuthorizeCreateContinuousQuery(user, db); !ok {
return err
}
Expand All @@ -765,7 +760,7 @@ func (self *CoordinatorImpl) CreateContinuousQuery(user common.User, db string,
return nil
}

func (self *CoordinatorImpl) DeleteContinuousQuery(user common.User, db string, id uint32) error {
func (self *CoordinatorImpl) runDropContinuousQuery(user common.User, db string, id uint32) error {
if ok, err := self.permissions.AuthorizeDeleteContinuousQuery(user, db); !ok {
return err
}
Expand Down Expand Up @@ -1038,3 +1033,16 @@ func (self *CoordinatorImpl) ConnectToProtobufServers(localRaftName string) erro
func isValidName(name string) bool {
return !strings.Contains(name, "%")
}

// TODO: this is for backward compatability only, remove after 0.8
func (self *CoordinatorImpl) DeleteContinuousQuery(user common.User, db string, id uint32) error {
return self.runDropContinuousQuery(user, db, id)
}

func (self *CoordinatorImpl) CreateContinuousQuery(user common.User, db string, query string) error {
return self.RunContinuousQuery(user, db, query)
}

func (self *CoordinatorImpl) RunContinuousQuery(user common.User, db string, query string) error {
return self.runContinuousQuery(user, db, query)
}
18 changes: 15 additions & 3 deletions parser/frees.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,24 @@ free_drop_series_query (drop_series_query *q)
}

void
close_query (query *q)
close_queries (queries *q)
{
if (q->error) {
if (q->error) {
free_error(q->error);
}
}

while (q->size > 0) {
query *query = q->qs[--q->size];
close_query(query);
free(query);
}
free(q->qs);
q->qs = NULL;
}

void
close_query (query *q)
{
if (q->select_query) {
free_select_query(q->select_query);
free(q->select_query);
Expand Down
99 changes: 65 additions & 34 deletions parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ type DeleteQuery struct {
}

type Query struct {
QueryString string
SelectQuery *SelectQuery
DeleteQuery *DeleteQuery
ListQuery *ListQuery
DropSeriesQuery *DropSeriesQuery
DropQuery *DropQuery
qType QueryType
}

func (self *IntoClause) GetString() string {
Expand All @@ -98,6 +98,10 @@ func (self *IntoClause) GetString() string {
return buffer.String()
}

func (self *Query) Type() QueryType {
return self.qType
}

func (self *Query) GetQueryString() string {
return self.commonGetQueryString(false)
}
Expand All @@ -107,17 +111,25 @@ func (self *Query) GetQueryStringWithTimeCondition() string {
}

func (self *Query) commonGetQueryString(withTime bool) string {
if self.SelectQuery != nil {
switch self.qType {
case Select, Continuous:
if withTime {
return self.SelectQuery.GetQueryStringWithTimeCondition()
}
return self.SelectQuery.GetQueryString()
} else if self.ListQuery != nil {
return "list series"
} else if self.DeleteQuery != nil {
case Delete:
return self.DeleteQuery.GetQueryString(withTime)
case DropContinuousQuery:
return fmt.Sprintf("drop continuous query %d", self.DropQuery.Id)
case ListSeries:
return "list series"
case ListContinuousQueries:
return "list continuous queries"
case DropSeries:
return "drop series " + self.DropSeriesQuery.tableName
default:
panic(fmt.Errorf("Unknown query type %s", self.qType))
}
return self.QueryString
}

func (self *Query) IsListQuery() bool {
Expand Down Expand Up @@ -262,8 +274,8 @@ func (self *SelectQuery) GetSinglePointQuerySequenceNumber() (int64, error) {
return sequence_number, nil
}

func (self *SelectQuery) IsContinuousQuery() bool {
return self.GetIntoClause() != nil
func (self *Query) IsContinuousQuery() bool {
return self.qType == Continuous
}

func (self *SelectQuery) IsValidContinuousQuery() bool {
Expand Down Expand Up @@ -597,24 +609,7 @@ func ParseSelectQuery(query string) (*SelectQuery, error) {
return selectQuery, nil
}

func ParseQuery(query string) ([]*Query, error) {
queryString := C.CString(query)
defer C.free(unsafe.Pointer(queryString))
q := C.parse_query(queryString)
defer C.close_query(&q)

if q.error != nil {
str := C.GoString(q.error.err)
return nil, &QueryError{
firstLine: int(q.error.first_line),
firstColumn: int(q.error.first_column) - 1,
lastLine: int(q.error.last_line),
lastColumn: int(q.error.last_column) - 1,
errorString: str,
queryString: query,
}
}

func parseSingleQuery(q *C.query) (*Query, error) {
if q.list_series_query != nil {
var value *Value
var err error
Expand All @@ -626,11 +621,11 @@ func ParseQuery(query string) ([]*Query, error) {
return nil, err
}
}
return []*Query{{QueryString: query, ListQuery: &ListQuery{Type: t, value: value}}}, nil
return &Query{ListQuery: &ListQuery{Type: t, value: value}, qType: ListSeries}, nil
}

if q.list_continuous_queries_query != 0 {
return []*Query{{QueryString: query, ListQuery: &ListQuery{Type: ContinuousQueries}}}, nil
return &Query{ListQuery: &ListQuery{Type: ContinuousQueries}, qType: ListContinuousQueries}, nil
}

if q.select_query != nil {
Expand All @@ -639,26 +634,62 @@ func ParseQuery(query string) ([]*Query, error) {
return nil, err
}

return []*Query{{QueryString: query, SelectQuery: selectQuery}}, nil
qType := Select
if selectQuery.IntoClause != nil {
qType = Continuous
}
return &Query{SelectQuery: selectQuery, qType: qType}, nil
} else if q.delete_query != nil {
deleteQuery, err := parseDeleteQuery(q.delete_query)
if err != nil {
return nil, err
}
return []*Query{{QueryString: query, DeleteQuery: deleteQuery}}, nil
return &Query{DeleteQuery: deleteQuery, qType: Delete}, nil
} else if q.drop_series_query != nil {
dropSeriesQuery, err := parseDropSeriesQuery(query, q.drop_series_query)
dropSeriesQuery, err := parseDropSeriesQuery(q.drop_series_query)
if err != nil {
return nil, err
}
return []*Query{{QueryString: query, DropSeriesQuery: dropSeriesQuery}}, nil
return &Query{DropSeriesQuery: dropSeriesQuery, qType: DropSeries}, nil
} else if q.drop_query != nil {
return []*Query{{QueryString: query, DropQuery: &DropQuery{Id: int(q.drop_query.id)}}}, nil
return &Query{DropQuery: &DropQuery{Id: int(q.drop_query.id)}, qType: DropContinuousQuery}, nil
}
return nil, fmt.Errorf("Unknown query type encountered")
}

func parseDropSeriesQuery(queryStirng string, dropSeriesQuery *C.drop_series_query) (*DropSeriesQuery, error) {
func ParseQuery(queryStr string) ([]*Query, error) {
queryString := C.CString(queryStr)
defer C.free(unsafe.Pointer(queryString))
q := C.parse_query(queryString)
defer C.close_queries(&q)

if q.error != nil {
str := C.GoString(q.error.err)
return nil, &QueryError{
firstLine: int(q.error.first_line),
firstColumn: int(q.error.first_column) - 1,
lastLine: int(q.error.last_line),
lastColumn: int(q.error.last_column) - 1,
errorString: str,
queryString: queryStr,
}
}

var queries []*C.query
setupSlice((*reflect.SliceHeader)((unsafe.Pointer(&queries))), unsafe.Pointer(q.qs), q.size)

parsedQueries := make([]*Query, len(queries))
for i, query := range queries {
query, err := parseSingleQuery(query)
if err != nil {
return nil, err
}
parsedQueries[i] = query
}
return parsedQueries, nil
}

func parseDropSeriesQuery(dropSeriesQuery *C.drop_series_query) (*DropSeriesQuery, error) {
name, err := GetValue(dropSeriesQuery.name)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 9b9ac93

Please sign in to comment.