From 889b218a3ea803b11f9ca7f5d2b7068a2cbb648f Mon Sep 17 00:00:00 2001 From: Todd Persen Date: Mon, 23 Dec 2013 17:30:21 -0500 Subject: [PATCH] Add support for continuous queries. --- Makefile.in | 2 +- src/api/http/api.go | 55 ++++++++ src/api/http/api_test.go | 137 ++++++++++++++++++- src/coordinator/cluster_configuration.go | 81 +++++++++++ src/coordinator/command.go | 60 +++++++++ src/coordinator/coordinator.go | 105 ++++++++++++++- src/coordinator/coordinator_test.go | 56 ++++++++ src/coordinator/interface.go | 8 ++ src/coordinator/raft_server.go | 139 +++++++++++++++++++ src/engine/engine.go | 34 ++++- src/integration/server_test.go | 165 +++++++++++++++++++++++ src/parser/frees.c | 9 ++ src/parser/parser.go | 71 +++++++++- src/parser/parser_test.go | 60 +++++++++ src/parser/query.lex | 8 +- src/parser/query.yacc | 51 ++++++- src/parser/query_api.go | 27 ++++ src/parser/query_types.h | 11 ++ src/parser/test_memory_leaks.sh | 10 ++ src/server/server.go | 1 + 20 files changed, 1065 insertions(+), 25 deletions(-) diff --git a/Makefile.in b/Makefile.in index 146ba2ec618..0ee2ae89943 100644 --- a/Makefile.in +++ b/Makefile.in @@ -148,7 +148,7 @@ ifneq ($(only),) GOTEST_OPTS = -gocheck.f $(only) endif ifneq ($(verbose),off) - GOTEST_OPTS += -v -gocheck.v + GOTEST_OPTS += -v -gocheck.v -gocheck.vv endif test: test_dependencies parser diff --git a/src/api/http/api.go b/src/api/http/api.go index e679d9854dd..c57ec751c49 100644 --- a/src/api/http/api.go +++ b/src/api/http/api.go @@ -15,6 +15,7 @@ import ( "path/filepath" "protocol" "regexp" + "strconv" "strings" "time" ) @@ -99,6 +100,11 @@ func (self *HttpServer) Serve(listener net.Listener) { self.registerEndpoint(p, "del", "/db/:db/users/:user", self.deleteDbUser) self.registerEndpoint(p, "post", "/db/:db/users/:user", self.updateDbUser) + // continuous queries management interface + self.registerEndpoint(p, "get", "/db/:db/continuous_queries", self.listDbContinuousQueries) + self.registerEndpoint(p, "post", "/db/:db/continuous_queries", self.createDbContinuousQueries) + self.registerEndpoint(p, "del", "/db/:db/continuous_queries/:id", self.deleteDbContinuousQueries) + // healthcheck self.registerEndpoint(p, "get", "/ping", self.ping) @@ -622,6 +628,10 @@ type User struct { Name string `json:"username"` } +type NewContinuousQuery struct { + Query string `json:"query"` +} + func (self *HttpServer) listClusterAdmins(w libhttp.ResponseWriter, r *libhttp.Request) { self.tryAsClusterAdmin(w, r, func(u common.User) (int, interface{}) { names, err := self.userManager.ListClusterAdmins(u) @@ -906,3 +916,48 @@ func (self *HttpServer) listInterfaces(w libhttp.ResponseWriter, r *libhttp.Requ w.Write(body) } } + +func (self *HttpServer) listDbContinuousQueries(w libhttp.ResponseWriter, r *libhttp.Request) { + db := r.URL.Query().Get(":db") + + self.tryAsDbUserAndClusterAdmin(w, r, func(u common.User) (int, interface{}) { + queries, err := self.coordinator.ListContinuousQueries(u, db) + if err != nil { + return errorToStatusCode(err), err.Error() + } + + return libhttp.StatusOK, queries + }) +} + +func (self *HttpServer) createDbContinuousQueries(w libhttp.ResponseWriter, r *libhttp.Request) { + db := r.URL.Query().Get(":db") + + self.tryAsDbUserAndClusterAdmin(w, r, func(u common.User) (int, interface{}) { + var values interface{} + body, err := ioutil.ReadAll(r.Body) + if err != nil { + return libhttp.StatusInternalServerError, err.Error() + } + json.Unmarshal(body, &values) + query := values.(map[string]interface{})["query"].(string) + fmt.Println(query) + + if err := self.coordinator.CreateContinuousQuery(u, db, query); err != nil { + return errorToStatusCode(err), err.Error() + } + return libhttp.StatusOK, nil + }) +} + +func (self *HttpServer) deleteDbContinuousQueries(w libhttp.ResponseWriter, r *libhttp.Request) { + db := r.URL.Query().Get(":db") + id, _ := strconv.ParseInt(r.URL.Query().Get(":id"), 10, 64) + + self.tryAsDbUserAndClusterAdmin(w, r, func(u common.User) (int, interface{}) { + if err := self.coordinator.DeleteContinuousQuery(u, db, uint32(id)); err != nil { + return errorToStatusCode(err), err.Error() + } + return libhttp.StatusOK, nil + }) +} diff --git a/src/api/http/api_test.go b/src/api/http/api_test.go index 2f1d1b7b9b1..a200476a9fe 100644 --- a/src/api/http/api_test.go +++ b/src/api/http/api_test.go @@ -97,10 +97,11 @@ func (self *MockEngine) RunQuery(_ common.User, _ string, query string, localOnl type MockCoordinator struct { coordinator.Coordinator - series []*protocol.Series - deleteQueries []*parser.DeleteQuery - db string - droppedDb string + series []*protocol.Series + continuousQueries map[string][]*coordinator.ContinuousQuery + deleteQueries []*parser.DeleteQuery + db string + droppedDb string } func (self *MockCoordinator) WriteSeriesData(_ common.User, db string, series *protocol.Series) error { @@ -127,6 +128,42 @@ func (self *MockCoordinator) DropDatabase(_ common.User, db string) error { return nil } +func (self *MockCoordinator) ListContinuousQueries(_ common.User, db string) ([]*protocol.Series, error) { + points := []*protocol.Point{} + + for _, query := range self.continuousQueries[db] { + queryId := int64(query.Id) + queryString := query.Query + points = append(points, &protocol.Point{ + Values: []*protocol.FieldValue{ + &protocol.FieldValue{Int64Value: &queryId}, + &protocol.FieldValue{StringValue: &queryString}, + }, + Timestamp: nil, + SequenceNumber: nil, + }) + } + + seriesName := "continuous queries" + series := []*protocol.Series{&protocol.Series{ + Name: &seriesName, + Fields: []string{"id", "query"}, + Points: points, + }} + return series, nil +} + +func (self *MockCoordinator) CreateContinuousQuery(_ common.User, db string, query string) error { + self.continuousQueries[db] = append(self.continuousQueries[db], &coordinator.ContinuousQuery{2, query}) + return nil +} + +func (self *MockCoordinator) DeleteContinuousQuery(_ common.User, db string, id uint32) error { + length := len(self.continuousQueries[db]) + _, self.continuousQueries[db] = self.continuousQueries[db][length-1], self.continuousQueries[db][:length-1] + return nil +} + func (self *ApiSuite) formatUrl(path string, args ...interface{}) string { path = fmt.Sprintf(path, args...) port := self.listener.Addr().(*net.TCPAddr).Port @@ -134,7 +171,13 @@ func (self *ApiSuite) formatUrl(path string, args ...interface{}) string { } func (self *ApiSuite) SetUpSuite(c *C) { - self.coordinator = &MockCoordinator{} + self.coordinator = &MockCoordinator{ + continuousQueries: map[string][]*coordinator.ContinuousQuery{ + "db1": []*coordinator.ContinuousQuery{ + &coordinator.ContinuousQuery{1, "select * from foo into bar;"}, + }, + }, + } self.manager = &MockUserManager{ clusterAdmins: []string{"root"}, dbUsers: map[string][]string{"db1": []string{"db_user1"}}, @@ -733,3 +776,87 @@ func (self *ApiSuite) TestBasicAuthentication(c *C) { c.Assert(err, IsNil) c.Assert(users, DeepEquals, []*coordinator.Database{&coordinator.Database{"db1", 1}, &coordinator.Database{"db2", 1}}) } + +func (self *ApiSuite) TestContinuousQueryOperations(c *C) { + // verify current continuous query index + url := self.formatUrl("/db/db1/continuous_queries?u=root&p=root") + resp, err := libhttp.Get(url) + c.Assert(err, IsNil) + c.Assert(resp.Header.Get("content-type"), Equals, "application/json") + body, err := ioutil.ReadAll(resp.Body) + c.Assert(err, IsNil) + series := []*protocol.Series{} + err = json.Unmarshal(body, &series) + c.Assert(err, IsNil) + c.Assert(series, HasLen, 1) + c.Assert(series[0].Points, HasLen, 1) + c.Assert(series[0].Points[0].Values, HasLen, 2) + + c.Assert(*series[0].Name, Equals, "continuous queries") + c.Assert(*series[0].Points[0].Values[0].Int64Value, Equals, int64(1)) + c.Assert(*series[0].Points[0].Values[1].StringValue, Equals, "select * from foo into bar;") + + resp.Body.Close() + + // add a new continuous query + data := `{"query": "select * from quu into qux;"}` + url = self.formatUrl("/db/db1/continuous_queries?u=root&p=root") + resp, err = libhttp.Post(url, "application/json", bytes.NewBufferString(data)) + c.Assert(err, IsNil) + c.Assert(resp.StatusCode, Equals, libhttp.StatusOK) + resp.Body.Close() + + // verify updated continuous query index + url = self.formatUrl("/db/db1/continuous_queries?u=root&p=root") + resp, err = libhttp.Get(url) + c.Assert(err, IsNil) + c.Assert(resp.Header.Get("content-type"), Equals, "application/json") + body, err = ioutil.ReadAll(resp.Body) + c.Assert(err, IsNil) + series = []*protocol.Series{} + err = json.Unmarshal(body, &series) + c.Assert(err, IsNil) + + c.Assert(series, HasLen, 1) + c.Assert(series[0].Points, HasLen, 2) + c.Assert(series[0].Points[0].Values, HasLen, 2) + c.Assert(series[0].Points[1].Values, HasLen, 2) + + c.Assert(*series[0].Name, Equals, "continuous queries") + c.Assert(*series[0].Points[0].Values[0].Int64Value, Equals, int64(1)) + c.Assert(*series[0].Points[0].Values[1].StringValue, Equals, "select * from foo into bar;") + c.Assert(*series[0].Points[1].Values[0].Int64Value, Equals, int64(2)) + c.Assert(*series[0].Points[1].Values[1].StringValue, Equals, "select * from quu into qux;") + + resp.Body.Close() + + // delete the newly-created query + url = self.formatUrl("/db/db1/continuous_queries/2?u=root&p=root") + req, err := libhttp.NewRequest("DELETE", url, nil) + c.Assert(err, IsNil) + resp, err = libhttp.DefaultClient.Do(req) + c.Assert(err, IsNil) + _, err = ioutil.ReadAll(resp.Body) + c.Assert(err, IsNil) + c.Assert(resp.StatusCode, Equals, libhttp.StatusOK) + resp.Body.Close() + + // verify updated continuous query index + url = self.formatUrl("/db/db1/continuous_queries?u=root&p=root") + resp, err = libhttp.Get(url) + c.Assert(err, IsNil) + c.Assert(resp.Header.Get("content-type"), Equals, "application/json") + body, err = ioutil.ReadAll(resp.Body) + c.Assert(err, IsNil) + series = []*protocol.Series{} + err = json.Unmarshal(body, &series) + c.Assert(err, IsNil) + c.Assert(series, HasLen, 1) + c.Assert(series[0].Points, HasLen, 1) + c.Assert(series[0].Points[0].Values, HasLen, 2) + + c.Assert(*series[0].Name, Equals, "continuous queries") + c.Assert(*series[0].Points[0].Values[0].Int64Value, Equals, int64(1)) + c.Assert(*series[0].Points[0].Values[1].StringValue, Equals, "select * from foo into bar;") + resp.Body.Close() +} diff --git a/src/coordinator/cluster_configuration.go b/src/coordinator/cluster_configuration.go index 9d24d00d30d..1307760ca19 100644 --- a/src/coordinator/cluster_configuration.go +++ b/src/coordinator/cluster_configuration.go @@ -8,8 +8,10 @@ import ( "encoding/gob" "errors" "fmt" + "parser" "sync" "sync/atomic" + "time" ) /* @@ -29,6 +31,10 @@ type ClusterConfiguration struct { dbUsers map[string]map[string]*dbUser servers []*ClusterServer serversLock sync.RWMutex + continuousQueries map[string][]*ContinuousQuery + continuousQueriesLock sync.RWMutex + parsedContinuousQueries map[string]map[uint32]*parser.SelectQuery + continuousQueryTimestamp time.Time hasRunningServers bool localServerId uint32 ClusterVersion uint32 @@ -36,6 +42,11 @@ type ClusterConfiguration struct { addedLocalServerWait chan bool } +type ContinuousQuery struct { + Id uint32 + Query string +} + type Database struct { Name string `json:"name"` ReplicationFactor uint8 `json:"replicationFactor"` @@ -46,6 +57,8 @@ func NewClusterConfiguration(config *configuration.Configuration) *ClusterConfig databaseReplicationFactors: make(map[string]uint8), clusterAdmins: make(map[string]*clusterAdmin), dbUsers: make(map[string]map[string]*dbUser), + continuousQueries: make(map[string][]*ContinuousQuery), + parsedContinuousQueries: make(map[string]map[uint32]*parser.SelectQuery), servers: make([]*ClusterServer, 0), config: config, addedLocalServerWait: make(chan bool, 1), @@ -336,6 +349,74 @@ func (self *ClusterConfiguration) DropDatabase(name string) error { return nil } +func (self *ClusterConfiguration) CreateContinuousQuery(db string, query string) error { + self.continuousQueriesLock.Lock() + defer self.continuousQueriesLock.Unlock() + + if self.continuousQueries == nil { + self.continuousQueries = map[string][]*ContinuousQuery{} + } + + if self.parsedContinuousQueries == nil { + self.parsedContinuousQueries = map[string]map[uint32]*parser.SelectQuery{} + } + + maxId := uint32(0) + for _, query := range self.continuousQueries[db] { + if query.Id > maxId { + maxId = query.Id + } + } + + selectQuery, err := parser.ParseSelectQuery(query) + if err != nil { + return fmt.Errorf("Failed to parse continuous query: %s", query) + } + + queryId := maxId + 1 + if self.parsedContinuousQueries[db] == nil { + self.parsedContinuousQueries[db] = map[uint32]*parser.SelectQuery{queryId: selectQuery} + } else { + self.parsedContinuousQueries[db][queryId] = selectQuery + } + self.continuousQueries[db] = append(self.continuousQueries[db], &ContinuousQuery{queryId, query}) + + return nil +} + +func (self *ClusterConfiguration) SetContinuousQueryTimestamp(timestamp time.Time) error { + self.continuousQueriesLock.Lock() + defer self.continuousQueriesLock.Unlock() + + self.continuousQueryTimestamp = timestamp + + return nil +} + +func (self *ClusterConfiguration) DeleteContinuousQuery(db string, id uint32) error { + self.continuousQueriesLock.Lock() + defer self.continuousQueriesLock.Unlock() + + for i, query := range self.continuousQueries[db] { + if query.Id == id { + q := self.continuousQueries[db] + q[len(q)-1], q[i], q = nil, q[len(q)-1], q[:len(q)-1] + self.continuousQueries[db] = q + delete(self.parsedContinuousQueries[db], id) + break + } + } + + return nil +} + +func (self *ClusterConfiguration) GetContinuousQueries(db string) []*ContinuousQuery { + self.continuousQueriesLock.Lock() + defer self.continuousQueriesLock.Unlock() + + return self.continuousQueries[db] +} + func (self *ClusterConfiguration) GetDbUsers(db string) (names []string) { self.usersLock.RLock() defer self.usersLock.RUnlock() diff --git a/src/coordinator/command.go b/src/coordinator/command.go index 25606757a41..e1b28f72000 100644 --- a/src/coordinator/command.go +++ b/src/coordinator/command.go @@ -3,6 +3,7 @@ package coordinator import ( log "code.google.com/p/log4go" "github.com/goraft/raft" + "time" ) var internalRaftCommands map[string]raft.Command @@ -17,11 +18,70 @@ func init() { &SaveDbUserCommand{}, &SaveClusterAdminCommand{}, &ChangeDbUserPassword{}, + &CreateContinuousQueryCommand{}, + &DeleteContinuousQueryCommand{}, + &SetContinuousQueryTimestampCommand{}, } { internalRaftCommands[command.CommandName()] = command } } +type SetContinuousQueryTimestampCommand struct { + Timestamp time.Time `json:"timestamp"` +} + +func NewSetContinuousQueryTimestampCommand(timestamp time.Time) *SetContinuousQueryTimestampCommand { + return &SetContinuousQueryTimestampCommand{timestamp} +} + +func (c *SetContinuousQueryTimestampCommand) CommandName() string { + return "set_cq_ts" +} + +func (c *SetContinuousQueryTimestampCommand) Apply(server raft.Server) (interface{}, error) { + config := server.Context().(*ClusterConfiguration) + err := config.SetContinuousQueryTimestamp(c.Timestamp) + return nil, err +} + +type CreateContinuousQueryCommand struct { + Database string `json:"database"` + Query string `json:"query"` +} + +func NewCreateContinuousQueryCommand(database string, query string) *CreateContinuousQueryCommand { + return &CreateContinuousQueryCommand{database, query} +} + +func (c *CreateContinuousQueryCommand) CommandName() string { + return "create_cq" +} + +func (c *CreateContinuousQueryCommand) Apply(server raft.Server) (interface{}, error) { + config := server.Context().(*ClusterConfiguration) + err := config.CreateContinuousQuery(c.Database, c.Query) + return nil, err +} + +type DeleteContinuousQueryCommand struct { + Database string `json:"database"` + Id uint32 `json:"id"` +} + +func NewDeleteContinuousQueryCommand(database string, id uint32) *DeleteContinuousQueryCommand { + return &DeleteContinuousQueryCommand{database, id} +} + +func (c *DeleteContinuousQueryCommand) CommandName() string { + return "delete_cq" +} + +func (c *DeleteContinuousQueryCommand) Apply(server raft.Server) (interface{}, error) { + config := server.Context().(*ClusterConfiguration) + err := config.DeleteContinuousQuery(c.Database, c.Id) + return nil, err +} + type DropDatabaseCommand struct { Name string `json:"name"` } diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index c648cc0a3f3..090ac3f2143 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -11,6 +11,7 @@ import ( "protocol" "regexp" "runtime" + "strings" "sync" "sync/atomic" "time" @@ -86,10 +87,10 @@ func (self *CoordinatorImpl) DistributeQuery(user common.User, db string, query return self.datastore.ExecuteQuery(user, db, query, yield, nil) } servers, replicationFactor := self.clusterConfiguration.GetServersToMakeQueryTo(&db) - queryString := query.GetQueryString() id := atomic.AddUint32(&self.requestId, uint32(1)) userName := user.GetName() responseChannels := make([]chan *protocol.Response, 0, len(servers)+1) + queryString := query.GetQueryString() var localServerToQuery *serverToQuery for _, server := range servers { if server.server.Id == self.clusterConfiguration.localServerId { @@ -587,7 +588,6 @@ func (self *CoordinatorImpl) handleReplayRequest(r *protocol.Request, replicatio err = self.datastore.DeleteSeriesData(*r.Database, query[0].DeleteQuery) } } - func (self *CoordinatorImpl) WriteSeriesData(user common.User, db string, series *protocol.Series) error { if !user.HasWriteAccess(db) { return common.NewAuthorizationError("Insufficient permission to write to %s", db) @@ -596,6 +596,51 @@ func (self *CoordinatorImpl) WriteSeriesData(user common.User, db string, series return fmt.Errorf("Can't write series with zero points.") } + err := self.CommitSeriesData(db, series) + + self.ProcessContinuousQueries(db, series) + + return err +} + +func (self *CoordinatorImpl) ProcessContinuousQueries(db string, series *protocol.Series) { + if self.clusterConfiguration.parsedContinuousQueries != nil { + incomingSeriesName := *series.Name + for _, query := range self.clusterConfiguration.parsedContinuousQueries[db] { + groupByClause := query.GetGroupByClause() + if groupByClause.Elems != nil { + continue + } + + fromClause := query.GetFromClause() + intoClause := query.GetIntoClause() + targetName := intoClause.Target.Name + + interpolatedTargetName := strings.Replace(targetName, ":series_name", incomingSeriesName, -1) + + for _, table := range fromClause.Names { + tableValue := table.Name + if regex, ok := tableValue.GetCompiledRegex(); ok { + if regex.MatchString(incomingSeriesName) { + series.Name = &interpolatedTargetName + if e := self.CommitSeriesData(db, series); e != nil { + log.Error("Couldn't write data for continuous query: ", e) + } + } + } else { + if tableValue.Name == incomingSeriesName { + series.Name = &interpolatedTargetName + if e := self.CommitSeriesData(db, series); e != nil { + log.Error("Couldn't write data for continuous query: ", e) + } + } + } + } + } + } +} + +func (self *CoordinatorImpl) CommitSeriesData(db string, series *protocol.Series) error { // break the series object into separate ones based on their ring location // if times server assigned, all the points will go to the same place @@ -655,6 +700,7 @@ func (self *CoordinatorImpl) WriteSeriesData(user common.User, db string, series return err } } + return nil } @@ -860,6 +906,61 @@ func (self *CoordinatorImpl) proxyWrite(clusterServer *ClusterServer, request *p } } +func (self *CoordinatorImpl) CreateContinuousQuery(user common.User, db string, query string) error { + if !user.IsClusterAdmin() && !user.IsDbAdmin(db) { + return common.NewAuthorizationError("Insufficient permission to create continuous query") + } + + err := self.raftServer.CreateContinuousQuery(db, query) + if err != nil { + return err + } + return nil +} + +func (self *CoordinatorImpl) DeleteContinuousQuery(user common.User, db string, id uint32) error { + if !user.IsClusterAdmin() && !user.IsDbAdmin(db) { + return common.NewAuthorizationError("Insufficient permission to delete continuous query") + } + + err := self.raftServer.DeleteContinuousQuery(db, id) + if err != nil { + return err + } + return nil +} + +func (self *CoordinatorImpl) ListContinuousQueries(user common.User, db string) ([]*protocol.Series, error) { + if !user.IsClusterAdmin() && !user.IsDbAdmin(db) { + return nil, common.NewAuthorizationError("Insufficient permission to list continuous queries") + } + + queries := self.clusterConfiguration.GetContinuousQueries(db) + points := []*protocol.Point{} + + for _, query := range queries { + queryId := int64(query.Id) + queryString := query.Query + timestamp := time.Now().Unix() + sequenceNumber := uint64(1) + points = append(points, &protocol.Point{ + Values: []*protocol.FieldValue{ + &protocol.FieldValue{Int64Value: &queryId}, + &protocol.FieldValue{StringValue: &queryString}, + }, + Timestamp: ×tamp, + SequenceNumber: &sequenceNumber, + }) + } + seriesName := "continuous queries" + series := []*protocol.Series{&protocol.Series{ + Name: &seriesName, + Fields: []string{"id", "query"}, + Points: points, + }} + return series, nil +} + func (self *CoordinatorImpl) CreateDatabase(user common.User, db string, replicationFactor uint8) error { if !user.IsClusterAdmin() { return common.NewAuthorizationError("Insufficient permission to create database") diff --git a/src/coordinator/coordinator_test.go b/src/coordinator/coordinator_test.go index 34ec37df3ff..7aa8b5a3f31 100644 --- a/src/coordinator/coordinator_test.go +++ b/src/coordinator/coordinator_test.go @@ -369,6 +369,62 @@ func (self *CoordinatorSuite) TestAdminOperations(c *C) { c.Assert(coordinator.DeleteClusterAdminUser(root, "another_cluster_admin"), IsNil) } +func (self *CoordinatorSuite) TestContinuousQueryOperations(c *C) { + servers := startAndVerifyCluster(3, c) + defer clean(servers...) + + coordinator := NewCoordinatorImpl(nil, servers[0], servers[0].clusterConfig) + + time.Sleep(REPLICATION_LAG) + + // create users + root, _ := coordinator.AuthenticateClusterAdmin("root", "root") + + coordinator.CreateDbUser(root, "db1", "db_admin") + coordinator.ChangeDbUserPassword(root, "db1", "db_admin", "db_pass") + coordinator.SetDbAdmin(root, "db1", "db_admin", true) + dbAdmin, _ := coordinator.AuthenticateDbUser("db1", "db_admin", "db_pass") + + coordinator.CreateDbUser(root, "db1", "db_user") + coordinator.ChangeDbUserPassword(root, "db1", "db_user", "db_pass") + dbUser, _ := coordinator.AuthenticateDbUser("db1", "db_user", "db_pass") + + allowedUsers := []*User{&root, &dbAdmin} + disallowedUsers := []*User{&dbUser} + + // cluster admins and db admins should be able to do everything + for _, user := range allowedUsers { + results, err := coordinator.ListContinuousQueries(*user, "db1") + c.Assert(err, IsNil) + c.Assert(results[0].Points, HasLen, 0) + + c.Assert(coordinator.CreateContinuousQuery(*user, "db1", "select * from foo into bar;"), IsNil) + + results, err = coordinator.ListContinuousQueries(*user, "db1") + c.Assert(err, IsNil) + c.Assert(results[0].Points, HasLen, 1) + c.Assert(*results[0].Points[0].Values[0].Int64Value, Equals, int64(1)) + c.Assert(*results[0].Points[0].Values[1].StringValue, Equals, "select * from foo into bar;") + + c.Assert(coordinator.DeleteContinuousQuery(*user, "db1", 1), IsNil) + + results, err = coordinator.ListContinuousQueries(*user, "db1") + c.Assert(err, IsNil) + c.Assert(results[0].Points, HasLen, 0) + } + + // regular database users shouldn't be able to do anything + for _, user := range disallowedUsers { + _, err := coordinator.ListContinuousQueries(*user, "db1") + c.Assert(err, NotNil) + c.Assert(coordinator.CreateContinuousQuery(*user, "db1", "select * from foo into bar;"), NotNil) + c.Assert(coordinator.DeleteContinuousQuery(*user, "db1", 1), NotNil) + } + + coordinator.DeleteDbUser(root, "db1", "db_admin") + coordinator.DeleteDbUser(root, "db1", "db_user") +} + func (self *CoordinatorSuite) TestDbAdminOperations(c *C) { servers := startAndVerifyCluster(3, c) defer clean(servers...) diff --git a/src/coordinator/interface.go b/src/coordinator/interface.go index b63ba65744a..40fa4be9c02 100644 --- a/src/coordinator/interface.go +++ b/src/coordinator/interface.go @@ -28,6 +28,9 @@ type Coordinator interface { ReplicateDelete(request *protocol.Request) error ReplayReplication(request *protocol.Request, replicationFactor *uint8, owningServerId *uint32, lastSeenSequenceNumber *uint64) GetLastSequenceNumber(replicationFactor uint8, ownerServerId, originatingServerId uint32) (uint64, error) + DeleteContinuousQuery(user common.User, db string, id uint32) error + CreateContinuousQuery(user common.User, db string, query string) error + ListContinuousQueries(user common.User, db string) ([]*protocol.Series, error) } type UserManager interface { @@ -61,6 +64,8 @@ type UserManager interface { type ClusterConsensus interface { CreateDatabase(name string, replicationFactor uint8) error DropDatabase(name string) error + CreateContinuousQuery(db string, query string) error + DeleteContinuousQuery(db string, id uint32) error SaveClusterAdminUser(u *clusterAdmin) error SaveDbUser(user *dbUser) error ChangeDbUserPassword(db, username string, hash []byte) error @@ -77,10 +82,13 @@ type ClusterConsensus interface { delete all of the data that they no longer have to keep from the ring */ ActivateServer(server *ClusterServer) error + // Efficient method to have a potential server take the place of a running (or downed) // server. The replacement must have a state of "Potential" for this to work. ReplaceServer(oldServer *ClusterServer, replacement *ClusterServer) error + AssignEngineAndCoordinator(engine queryRunner, coordinator *CoordinatorImpl) error + // When a cluster is turned on for the first time. CreateRootUser() error } diff --git a/src/coordinator/raft_server.go b/src/coordinator/raft_server.go index 26daf97682c..ff5a4b3965c 100644 --- a/src/coordinator/raft_server.go +++ b/src/coordinator/raft_server.go @@ -16,6 +16,7 @@ import ( "net" "net/http" "os" + "parser" "path/filepath" "protocol" "strings" @@ -42,6 +43,13 @@ type RaftServer struct { listener net.Listener closing bool config *configuration.Configuration + notLeader chan bool + engine queryRunner + coordinator *CoordinatorImpl +} + +type queryRunner interface { + RunQuery(user common.User, database string, query string, localOnly bool, yield func(*protocol.Series) error) error } var registeredCommands bool @@ -62,6 +70,7 @@ func NewRaftServer(config *configuration.Configuration, clusterConfig *ClusterCo port: config.RaftServerPort, path: config.RaftDir, clusterConfig: clusterConfig, + notLeader: make(chan bool, 1), router: mux.NewRouter(), config: config, } @@ -187,6 +196,43 @@ func (s *RaftServer) CreateRootUser() error { return s.SaveClusterAdminUser(u) } +func (s *RaftServer) SetContinuousQueryTimestamp(timestamp time.Time) error { + command := NewSetContinuousQueryTimestampCommand(timestamp) + _, err := s.doOrProxyCommand(command, "set_cq_ts") + return err +} + +func (s *RaftServer) CreateContinuousQuery(db string, query string) error { + // if there are already-running queries, we need to initiate a backfill + if !s.clusterConfig.continuousQueryTimestamp.IsZero() { + selectQuery, err := parser.ParseSelectQuery(query) + if err != nil { + return fmt.Errorf("Failed to parse continuous query: %s", query) + } + + duration, err := selectQuery.GetGroupByClause().GetGroupByTime() + if err != nil { + return fmt.Errorf("Couldn't get group by time for continuous query: %s", err) + } + + if duration != nil { + zeroTime := time.Time{} + currentBoundary := time.Now().Truncate(*duration) + go s.runContinuousQuery(db, selectQuery, zeroTime, currentBoundary) + } + } + + command := NewCreateContinuousQueryCommand(db, query) + _, err := s.doOrProxyCommand(command, "create_cq") + return err +} + +func (s *RaftServer) DeleteContinuousQuery(db string, id uint32) error { + command := NewDeleteContinuousQueryCommand(db, id) + _, err := s.doOrProxyCommand(command, "delete_cq") + return err +} + func (s *RaftServer) ActivateServer(server *ClusterServer) error { return errors.New("not implemented") } @@ -203,6 +249,12 @@ func (s *RaftServer) ReplaceServer(oldServer *ClusterServer, replacement *Cluste return errors.New("not implemented") } +func (s *RaftServer) AssignEngineAndCoordinator(engine queryRunner, coordinator *CoordinatorImpl) error { + s.engine = engine + s.coordinator = coordinator + return nil +} + func (s *RaftServer) connectionString() string { return fmt.Sprintf("http://%s:%d", s.host, s.port) } @@ -255,6 +307,8 @@ func (s *RaftServer) startRaft() error { s.raftServer.LoadSnapshot() // ignore errors + s.raftServer.AddEventListener(raft.StateChangeEventType, s.raftEventHandler) + transporter.Install(s.raftServer, s) s.raftServer.Start() @@ -307,9 +361,93 @@ func (s *RaftServer) startRaft() error { log.Warn("Couldn't join any of the seeds, sleeping and retrying...") time.Sleep(100 * time.Millisecond) } + return nil } +func (s *RaftServer) raftEventHandler(e raft.Event) { + if e.Value() == "leader" { + log.Info("(raft:%s) Selected as leader. Starting leader loop.", s.raftServer.Name()) + go s.raftLeaderLoop(time.NewTicker(1 * time.Second)) + } + + if e.PrevValue() == "leader" { + log.Info("(raft:%s) Demoted from leader. Ending leader loop.", s.raftServer.Name()) + s.notLeader <- true + } +} + +func (s *RaftServer) raftLeaderLoop(loopTimer *time.Ticker) { + for { + select { + case <-loopTimer.C: + log.Debug("(raft:%s) Executing leader loop.", s.raftServer.Name()) + s.checkContinuousQueries() + break + case <-s.notLeader: + log.Debug("(raft:%s) Exiting leader loop.", s.raftServer.Name()) + return + } + } +} + +func (s *RaftServer) checkContinuousQueries() { + if s.clusterConfig.continuousQueries == nil || len(s.clusterConfig.continuousQueries) == 0 { + return + } + + runTime := time.Now() + queriesDidRun := false + + for db, queries := range s.clusterConfig.parsedContinuousQueries { + for _, query := range queries { + groupByClause := query.GetGroupByClause() + + // if there's no group by clause, it's handled as a fanout query + if groupByClause.Elems == nil { + continue + } + + duration, err := query.GetGroupByClause().GetGroupByTime() + if err != nil { + log.Error("Couldn't get group by time for continuous query:", err) + continue + } + + currentBoundary := runTime.Truncate(*duration) + lastBoundary := s.clusterConfig.continuousQueryTimestamp.Truncate(*duration) + + if currentBoundary.After(s.clusterConfig.continuousQueryTimestamp) { + s.runContinuousQuery(db, query, lastBoundary, currentBoundary) + queriesDidRun = true + } + } + } + + if queriesDidRun { + s.clusterConfig.continuousQueryTimestamp = runTime + s.SetContinuousQueryTimestamp(runTime) + } +} + +func (s *RaftServer) runContinuousQuery(db string, query *parser.SelectQuery, start time.Time, end time.Time) { + clusterAdmin := s.clusterConfig.clusterAdmins["root"] + intoClause := query.GetIntoClause() + targetName := intoClause.Target.Name + sequenceNumber := uint64(1) + queryString := query.GetQueryStringForContinuousQuery(start, end) + + s.engine.RunQuery(clusterAdmin, db, queryString, false, func(series *protocol.Series) error { + interpolatedTargetName := strings.Replace(targetName, ":series_name", *series.Name, -1) + series.Name = &interpolatedTargetName + for _, point := range series.Points { + point.SequenceNumber = &sequenceNumber + } + + return s.coordinator.WriteSeriesData(clusterAdmin, db, series) + }) +} + func (s *RaftServer) ListenAndServe() error { l, err := net.Listen("tcp", fmt.Sprintf(":%d", s.port)) if err != nil { @@ -355,6 +493,7 @@ func (self *RaftServer) Close() { self.closing = true self.raftServer.Stop() self.listener.Close() + self.notLeader <- true } } diff --git a/src/engine/engine.go b/src/engine/engine.go index 4a3f571eb1b..e6141cf4b88 100644 --- a/src/engine/engine.go +++ b/src/engine/engine.go @@ -36,15 +36,34 @@ func (self *QueryEngine) RunQuery(user common.User, database string, queryString continue } - if query.IsListQuery() { - series, err := self.coordinator.ListSeries(user, database) - if err != nil { + if query.DropQuery != nil { + if err := self.coordinator.DeleteContinuousQuery(user, database, uint32(query.DropQuery.Id)); err != nil { return err } - for _, s := range series { - if err := yield(s); err != nil { + continue + } + + if query.IsListQuery() { + if query.IsListSeriesQuery() { + series, err := self.coordinator.ListSeries(user, database) + if err != nil { return err } + for _, s := range series { + if err := yield(s); err != nil { + return err + } + } + } else if query.IsListContinuousQueriesQuery() { + queries, err := self.coordinator.ListContinuousQueries(user, database) + if err != nil { + return err + } + for _, q := range queries { + if err := yield(q); err != nil { + return err + } + } } continue } @@ -58,6 +77,11 @@ func (self *QueryEngine) RunQuery(user common.User, database string, queryString } selectQuery := query.SelectQuery + + if selectQuery.IsContinuousQuery() { + return self.coordinator.CreateContinuousQuery(user, database, queryString) + } + if isAggregateQuery(selectQuery) { return self.executeCountQueryWithGroupBy(user, database, selectQuery, localOnly, yield) } else if containsArithmeticOperators(selectQuery) { diff --git a/src/integration/server_test.go b/src/integration/server_test.go index 447c03b33c9..8330bd4e8fa 100644 --- a/src/integration/server_test.go +++ b/src/integration/server_test.go @@ -152,6 +152,27 @@ func (self *ServerProcess) Query(database, query string, onlyLocal bool, c *C) * return ResultsToSeriesCollection(js) } +func (self *ServerProcess) QueryAsRoot(database, query string, onlyLocal bool, c *C) *SeriesCollection { + encodedQuery := url.QueryEscape(query) + fullUrl := fmt.Sprintf("http://localhost:%d/db/%s/series?u=root&p=root&q=%s", self.apiPort, database, encodedQuery) + if onlyLocal { + fullUrl = fullUrl + "&force_local=true" + } + resp, err := http.Get(fullUrl) + c.Assert(err, IsNil) + defer resp.Body.Close() + c.Assert(resp.StatusCode, Equals, http.StatusOK) + body, err := ioutil.ReadAll(resp.Body) + c.Assert(err, IsNil) + var js []interface{} + err = json.Unmarshal(body, &js) + if err != nil { + fmt.Println("NOT JSON: ", string(body)) + } + c.Assert(err, IsNil) + return ResultsToSeriesCollection(js) +} + func (self *ServerProcess) Post(url, data string, c *C) *http.Response { return self.Request("POST", url, data, c) } @@ -177,9 +198,11 @@ func (self *ServerSuite) SetUpSuite(c *C) { self.serverProcesses[0].Post("/db?u=root&p=root", "{\"name\":\"full_rep\", \"replicationFactor\":3}", c) self.serverProcesses[0].Post("/db?u=root&p=root", "{\"name\":\"test_rep\", \"replicationFactor\":2}", c) self.serverProcesses[0].Post("/db?u=root&p=root", "{\"name\":\"single_rep\", \"replicationFactor\":1}", c) + self.serverProcesses[0].Post("/db?u=root&p=root", "{\"name\":\"test_cq\", \"replicationFactor\":3}", c) self.serverProcesses[0].Post("/db/full_rep/users?u=root&p=root", "{\"name\":\"paul\", \"password\":\"pass\", \"isAdmin\": true}", c) self.serverProcesses[0].Post("/db/test_rep/users?u=root&p=root", "{\"name\":\"paul\", \"password\":\"pass\", \"isAdmin\": true}", c) self.serverProcesses[0].Post("/db/single_rep/users?u=root&p=root", "{\"name\":\"paul\", \"password\":\"pass\", \"isAdmin\": true}", c) + self.serverProcesses[0].Post("/db/test_cq/users?u=root&p=root", "{\"name\":\"paul\", \"password\":\"pass\", \"isAdmin\": true}", c) time.Sleep(300 * time.Millisecond) } @@ -472,3 +495,145 @@ func (self *ServerSuite) TestSelectFromRegexInCluster(c *C) { c.Assert(series.GetValueForPointAndColumn(0, "blah", c), Equals, true) } } + +func (self *ServerSuite) TestContinuousQueryManagement(c *C) { + collection := self.serverProcesses[0].QueryAsRoot("test_cq", "list continuous queries;", false, c) + series := collection.GetSeries("continuous queries", c) + c.Assert(series.Points, HasLen, 0) + + self.serverProcesses[0].QueryAsRoot("test_cq", "select * from foo into bar;", false, c) + + collection = self.serverProcesses[0].QueryAsRoot("test_cq", "list continuous queries;", false, c) + series = collection.GetSeries("continuous queries", c) + c.Assert(series.Points, HasLen, 1) + c.Assert(series.GetValueForPointAndColumn(0, "id", c), Equals, float64(1)) + c.Assert(series.GetValueForPointAndColumn(0, "query", c), Equals, "select * from foo into bar;") + + self.serverProcesses[0].QueryAsRoot("test_cq", "select * from quu into qux;", false, c) + collection = self.serverProcesses[0].QueryAsRoot("test_cq", "list continuous queries;", false, c) + series = collection.GetSeries("continuous queries", c) + c.Assert(series.Points, HasLen, 2) + c.Assert(series.GetValueForPointAndColumn(0, "id", c), Equals, float64(1)) + c.Assert(series.GetValueForPointAndColumn(0, "query", c), Equals, "select * from foo into bar;") + c.Assert(series.GetValueForPointAndColumn(1, "id", c), Equals, float64(2)) + c.Assert(series.GetValueForPointAndColumn(1, "query", c), Equals, "select * from quu into qux;") + + self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 1;", false, c) + + collection = self.serverProcesses[0].QueryAsRoot("test_cq", "list continuous queries;", false, c) + series = collection.GetSeries("continuous queries", c) + c.Assert(series.Points, HasLen, 1) + c.Assert(series.GetValueForPointAndColumn(0, "id", c), Equals, float64(2)) + c.Assert(series.GetValueForPointAndColumn(0, "query", c), Equals, "select * from quu into qux;") + + self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 2;", false, c) +} + +func (self *ServerSuite) TestContinuousQueryFanoutOperations(c *C) { + self.serverProcesses[0].QueryAsRoot("test_cq", "select * from s1 into d1;", false, c) + self.serverProcesses[0].QueryAsRoot("test_cq", "select * from s2 into d2;", false, c) + self.serverProcesses[0].QueryAsRoot("test_cq", "select * from /s\\d/ into d3;", false, c) + self.serverProcesses[0].QueryAsRoot("test_cq", "select * from silly_name into :series_name.foo;", false, c) + collection := self.serverProcesses[0].QueryAsRoot("test_cq", "list continuous queries;", false, c) + series := collection.GetSeries("continuous queries", c) + c.Assert(series.Points, HasLen, 4) + + data := `[ + {"name": "s1", "columns": ["c1", "c2"], "points": [[1, "a"], [2, "b"]]}, + {"name": "s2", "columns": ["c3"], "points": [[3]]}, + {"name": "silly_name", "columns": ["c4", "c5"], "points": [[4,5]]} + ]` + + self.serverProcesses[0].Post("/db/test_cq/series?u=paul&p=pass", data, c) + + collection = self.serverProcesses[0].Query("test_cq", "select * from s1;", false, c) + series = collection.GetSeries("s1", c) + c.Assert(series.GetValueForPointAndColumn(0, "c1", c), Equals, float64(2)) + c.Assert(series.GetValueForPointAndColumn(0, "c2", c), Equals, "b") + c.Assert(series.GetValueForPointAndColumn(1, "c1", c), Equals, float64(1)) + c.Assert(series.GetValueForPointAndColumn(1, "c2", c), Equals, "a") + + collection = self.serverProcesses[0].Query("test_cq", "select * from s2;", false, c) + series = collection.GetSeries("s2", c) + c.Assert(series.GetValueForPointAndColumn(0, "c3", c), Equals, float64(3)) + + collection = self.serverProcesses[0].Query("test_cq", "select * from d1;", false, c) + series = collection.GetSeries("d1", c) + c.Assert(series.GetValueForPointAndColumn(0, "c1", c), Equals, float64(2)) + c.Assert(series.GetValueForPointAndColumn(0, "c2", c), Equals, "b") + c.Assert(series.GetValueForPointAndColumn(1, "c1", c), Equals, float64(1)) + c.Assert(series.GetValueForPointAndColumn(1, "c2", c), Equals, "a") + + collection = self.serverProcesses[0].Query("test_cq", "select * from d2;", false, c) + series = collection.GetSeries("d2", c) + c.Assert(series.GetValueForPointAndColumn(0, "c3", c), Equals, float64(3)) + + collection = self.serverProcesses[0].Query("test_cq", "select * from d3;", false, c) + series = collection.GetSeries("d3", c) + c.Assert(series.GetValueForPointAndColumn(0, "c3", c), Equals, float64(3)) + c.Assert(series.GetValueForPointAndColumn(1, "c1", c), Equals, float64(2)) + c.Assert(series.GetValueForPointAndColumn(1, "c2", c), Equals, "b") + c.Assert(series.GetValueForPointAndColumn(2, "c1", c), Equals, float64(1)) + c.Assert(series.GetValueForPointAndColumn(2, "c2", c), Equals, "a") + + collection = self.serverProcesses[0].Query("test_cq", "select * from silly_name.foo;", false, c) + series = collection.GetSeries("silly_name.foo", c) + c.Assert(series.GetValueForPointAndColumn(0, "c4", c), Equals, float64(4)) + c.Assert(series.GetValueForPointAndColumn(0, "c5", c), Equals, float64(5)) + + self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 1;", false, c) + self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 2;", false, c) + self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 3;", false, c) + self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 4;", false, c) +} + +func (self *ServerSuite) TestContinuousQueryGroupByOperations(c *C) { + currentTime := time.Now() + + previousTime := currentTime.Truncate(10 * time.Second) + oldTime := time.Unix(previousTime.Unix()-5, 0).Unix() + oldOldTime := time.Unix(previousTime.Unix()-10, 0).Unix() + + data := fmt.Sprintf(`[ + {"name": "s3", "columns": ["c1", "c2", "time"], "points": [ + [1, "a", %d], + [2, "b", %d], + [3, "c", %d], + [7, "x", %d], + [8, "y", %d], + [9, "z", %d] + ]} + ]`, oldTime, oldTime, oldTime, oldOldTime, oldOldTime, oldOldTime) + + fmt.Println(data) + + self.serverProcesses[0].Post("/db/test_cq/series?u=paul&p=pass&time_precision=s", data, c) + + time.Sleep(time.Second) + + self.serverProcesses[0].QueryAsRoot("test_cq", "select mean(c1) from s3 group by time(5s) into d3.mean;", false, c) + self.serverProcesses[0].QueryAsRoot("test_cq", "select count(c2) from s3 group by time(5s) into d3.count;", false, c) + + collection := self.serverProcesses[0].QueryAsRoot("test_cq", "list continuous queries;", false, c) + series := collection.GetSeries("continuous queries", c) + c.Assert(series.Points, HasLen, 2) + + time.Sleep(2 * time.Second) + + collection = self.serverProcesses[0].Query("test_cq", "select * from s3;", false, c) + series = collection.GetSeries("s3", c) + c.Assert(series.Points, HasLen, 6) + + collection = self.serverProcesses[0].Query("test_cq", "select * from d3.mean;", false, c) + series = collection.GetSeries("d3.mean", c) + c.Assert(series.GetValueForPointAndColumn(0, "mean", c), Equals, float64(2)) + c.Assert(series.GetValueForPointAndColumn(1, "mean", c), Equals, float64(8)) + + collection = self.serverProcesses[0].Query("test_cq", "select * from d3.count;", false, c) + series = collection.GetSeries("d3.count", c) + c.Assert(series.GetValueForPointAndColumn(0, "count", c), Equals, float64(3)) + c.Assert(series.GetValueForPointAndColumn(1, "count", c), Equals, float64(3)) + + self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 1;", false, c) + self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 2;", false, c) +} diff --git a/src/parser/frees.c b/src/parser/frees.c index c2bc59bf388..62b02e3962e 100644 --- a/src/parser/frees.c +++ b/src/parser/frees.c @@ -99,6 +99,11 @@ free_select_query (select_query *q) free_groupby_clause(q->group_by); } + if (q->into_clause) { + free_value(q->into_clause->target); + free(q->into_clause); + } + if (q->from_clause) { // free the from clause free_from_clause(q->from_clause); @@ -141,6 +146,10 @@ close_query (query *q) free(q->drop_series_query); } + if (q->drop_query) { + free(q->drop_query); + } + if (q->delete_query) { free_delete_query(q->delete_query); free(q->delete_query); diff --git a/src/parser/parser.go b/src/parser/parser.go index ad8bb1bcbda..4ea0326b961 100644 --- a/src/parser/parser.go +++ b/src/parser/parser.go @@ -76,6 +76,10 @@ type FromClause struct { Names []*TableName } +type IntoClause struct { + Target *Value +} + type GroupByClause struct { FillWithZero bool FillValue *Value @@ -140,11 +144,25 @@ type SelectQuery struct { SelectDeleteCommonQuery ColumnNames []*Value groupByClause *GroupByClause + IntoClause *IntoClause Limit int Ascending bool } -type ListQuery struct{} +type ListType int + +const ( + Series ListType = iota + ContinuousQueries +) + +type ListQuery struct { + Type ListType +} + +type DropQuery struct { + Id int +} type DropSeriesQuery struct { tableName string @@ -163,6 +181,7 @@ type Query struct { DeleteQuery *DeleteQuery ListQuery *ListQuery DropSeriesQuery *DropSeriesQuery + DropQuery *DropQuery } func (self *Query) GetQueryString() string { @@ -176,6 +195,14 @@ func (self *Query) IsListQuery() bool { return self.ListQuery != nil } +func (self *Query) IsListSeriesQuery() bool { + return self.ListQuery != nil && self.ListQuery.Type == Series +} + +func (self *Query) IsListContinuousQueriesQuery() bool { + return self.ListQuery != nil && self.ListQuery.Type == ContinuousQueries +} + func (self *BasicQuery) GetQueryString() string { return self.queryString } @@ -227,6 +254,14 @@ func (self *SelectQuery) GetSinglePointQuerySequenceNumber() (int64, error) { return sequence_number, nil } +func (self *SelectQuery) IsContinuousQuery() bool { + return self.GetIntoClause() != nil +} + +func (self *SelectQuery) GetIntoClause() *IntoClause { + return self.IntoClause +} + func (self *SelectDeleteCommonQuery) GetFromClause() *FromClause { return self.FromClause } @@ -367,6 +402,19 @@ func GetFromClause(fromClause *C.from_clause) (*FromClause, error) { return &FromClause{FromClauseType(fromClause.from_clause_type), arr}, nil } +func GetIntoClause(intoClause *C.into_clause) (*IntoClause, error) { + if intoClause == nil { + return nil, nil + } + + target, err := GetValue(intoClause.target) + if err != nil { + return nil, err + } + + return &IntoClause{target}, nil +} + func GetWhereCondition(condition *C.condition) (*WhereCondition, error) { if condition.is_bool_expression != 0 { expr, err := GetValue((*C.value)(condition.left)) @@ -434,12 +482,19 @@ func ParseQuery(query string) ([]*Query, error) { } if q.list_series_query != 0 { - return []*Query{&Query{ListQuery: &ListQuery{}}}, nil - } else if q.select_query != nil { + return []*Query{&Query{ListQuery: &ListQuery{Type: Series}}}, nil + } + + if q.list_continuous_queries_query != 0 { + return []*Query{&Query{ListQuery: &ListQuery{Type: ContinuousQueries}}}, nil + } + + if q.select_query != nil { selectQuery, err := parseSelectQuery(query, q.select_query) if err != nil { return nil, err } + return []*Query{&Query{SelectQuery: selectQuery}}, nil } else if q.delete_query != nil { deleteQuery, err := parseDeleteQuery(query, q.delete_query) @@ -453,6 +508,9 @@ func ParseQuery(query string) ([]*Query, error) { return nil, err } return []*Query{&Query{DropSeriesQuery: dropSeriesQuery}}, nil + } else if q.drop_query != nil { + fmt.Println(q.drop_query.id) + return []*Query{&Query{DropQuery: &DropQuery{Id: int(q.drop_query.id)}}}, nil } return nil, fmt.Errorf("Unknown query type encountered") } @@ -532,6 +590,7 @@ func parseSelectQuery(queryString string, q *C.select_query) (*SelectQuery, erro if err != nil { return nil, err } + goQuery := &SelectQuery{ SelectDeleteCommonQuery: basicQuery, Limit: int(limit), @@ -554,6 +613,12 @@ func parseSelectQuery(queryString string, q *C.select_query) (*SelectQuery, erro } } + // get the into clause + goQuery.IntoClause, err = GetIntoClause(q.into_clause) + if err != nil { + return goQuery, err + } + return goQuery, nil } diff --git a/src/parser/parser_test.go b/src/parser/parser_test.go index 53fae6a1443..3e0896ac064 100644 --- a/src/parser/parser_test.go +++ b/src/parser/parser_test.go @@ -124,6 +124,39 @@ func (self *QueryParserSuite) TestGetQueryStringWithTimeCondition(c *C) { } } +func (self *QueryParserSuite) TestGetQueryStringForContinuousQuery(c *C) { + base := time.Now().Truncate(time.Minute) + start := base.UTC() + end := base.Add(time.Minute).UTC() + + startMicroseconds := common.TimeToMicroseconds(start.UTC()) - 1 + endMicroseconds := common.TimeToMicroseconds(end.UTC()) + + inputQuery := "select count(c1) from s1 group by time(1m) into d1;" + outputQuery := fmt.Sprintf("select count(c1) from s1 group by time(1m) where time > %du and time < %du", startMicroseconds, endMicroseconds) + + queries, err := ParseQuery(inputQuery) + c.Assert(err, IsNil) + c.Assert(queries, HasLen, 1) + + query := queries[0] + c.Assert(query.SelectQuery, NotNil) + + selectQuery := query.SelectQuery + c.Assert(selectQuery.GetQueryStringForContinuousQuery(start, end), Equals, outputQuery) + + // try to parse the query with the time condition + queries, err = ParseQuery(selectQuery.GetQueryStringForContinuousQuery(start, end)) + c.Assert(err, IsNil) + + query = queries[0] + c.Assert(query.SelectQuery, NotNil) + + selectQuery = query.SelectQuery + c.Assert(selectQuery.GetStartTime().Round(time.Second), Equals, start) + c.Assert(selectQuery.GetEndTime(), Equals, end) +} + func (self *QueryParserSuite) TestParseDeleteQuery(c *C) { query := "delete from foo where time > '2012-08-13' and time < '2013-08-13'" queries, err := ParseQuery(query) @@ -687,6 +720,33 @@ func (self *QueryParserSuite) TestIsSinglePointQuery(c *C) { c.Assert(result, Equals, true) } +func (self *QueryParserSuite) TestParseContinuousQueryCreation(c *C) { + query := "select * from foo into bar;" + q, err := ParseSelectQuery(query) + c.Assert(err, IsNil) + c.Assert(q.IsContinuousQuery(), Equals, true) + clause := q.GetIntoClause() + c.Assert(clause.Target, DeepEquals, &Value{"bar", ValueSimpleName, nil, nil}) +} + +func (self *QueryParserSuite) TestParseContinuousQueryDeletion(c *C) { + query := "drop continuous query 1;" + queries, err := ParseQuery(query) + c.Assert(err, IsNil) + c.Assert(queries, HasLen, 1) + c.Assert(queries[0].DropQuery, NotNil) + c.Assert(queries[0].DropQuery.Id, Equals, 1) +} + +func (self *QueryParserSuite) TestParseContinuousQueryList(c *C) { + query := "list continuous queries;" + queries, err := ParseQuery(query) + c.Assert(err, IsNil) + c.Assert(queries, HasLen, 1) + c.Assert(queries[0].IsListQuery(), Equals, true) + c.Assert(queries[0].IsListContinuousQueriesQuery(), Equals, true) +} + // TODO: // insert into user.events.count.per_day select count(*) from user.events where timecharacter = *yytext; return *yytext; } ")" { yylval->character = *yytext; return *yytext; } "+" { yylval->character = *yytext; return *yytext; } @@ -90,7 +94,7 @@ static int yycolumn = 1; [a-zA-Z0-9_]* { yylval->string = strdup(yytext); return SIMPLE_NAME; } -[a-zA-Z0-9_][a-zA-Z0-9._-]* { yylval->string = strdup(yytext); return TABLE_NAME; } +[:a-zA-Z0-9_][a-zA-Z0-9._-]* { yylval->string = strdup(yytext); return TABLE_NAME; } \'[^\']*\' { yytext[yyleng-1] = '\0'; diff --git a/src/parser/query.yacc b/src/parser/query.yacc index cebad7acec1..7cf057bca4d 100644 --- a/src/parser/query.yacc +++ b/src/parser/query.yacc @@ -46,10 +46,12 @@ value *create_expression_value(char *operator, size_t size, ...) { value_array* value_array; value* v; from_clause* from_clause; + into_clause* into_clause; query* query; select_query* select_query; delete_query* delete_query; drop_series_query* drop_series_query; + drop_query* drop_query; groupby_clause* groupby_clause; struct { int limit; @@ -70,7 +72,7 @@ value *create_expression_value(char *operator, size_t size, ...) { %lex-param {void *scanner} // define types of tokens (terminals) -%token SELECT DELETE FROM WHERE EQUAL GROUP BY LIMIT ORDER ASC DESC MERGE INNER JOIN AS LIST SERIES DROP_SERIES +%token SELECT DELETE FROM WHERE EQUAL GROUP BY LIMIT ORDER ASC DESC MERGE INNER JOIN AS LIST SERIES INTO CONTINUOUS_QUERIES CONTINUOUS_QUERY DROP DROP_SERIES %token STRING_VALUE INT_VALUE FLOAT_VALUE TABLE_NAME SIMPLE_NAME REGEX_OP %token NEGATION_REGEX_OP REGEX_STRING INSENSITIVE_REGEX_STRING DURATION @@ -94,14 +96,16 @@ value *create_expression_value(char *operator, size_t size, ...) { %type GROUP_BY_CLAUSE %type LIMIT_CLAUSE %type ORDER_CLAUSE +%type INTO_CLAUSE %type LIMIT_AND_ORDER_CLAUSES %type QUERY %type DELETE_QUERY %type DROP_SERIES_QUERY %type SELECT_QUERY +%type DROP_QUERY // the initial token -%start QUERIES +%start ALL_QUERIES // destructors are used to free up memory in case of an error %destructor { free_value($$); } @@ -115,7 +119,7 @@ value *create_expression_value(char *operator, size_t size, ...) { // grammar %% -QUERIES: +ALL_QUERIES: QUERY { *q = *$1; @@ -128,7 +132,7 @@ QUERIES: free($1); } | - QUERY ';' QUERIES + QUERY ';' ALL_QUERIES { *q = *$1; free($1); @@ -147,6 +151,12 @@ QUERY: $$->delete_query = $1; } | + DROP_QUERY + { + $$ = calloc(1, sizeof(query)); + $$->drop_query = $1; + } + | LIST SERIES { $$ = calloc(1, sizeof(query)); @@ -158,6 +168,20 @@ QUERY: $$ = calloc(1, sizeof(query)); $$->drop_series_query = $1; } + | + LIST CONTINUOUS_QUERIES + { + $$ = calloc(1, sizeof(query)); + $$->list_continuous_queries_query = TRUE; + } + +DROP_QUERY: + DROP CONTINUOUS_QUERY INT_VALUE + { + $$ = calloc(1, sizeof(drop_query)); + $$->id = atoi($3); + free($3); + } DELETE_QUERY: DELETE FROM_CLAUSE WHERE_CLAUSE @@ -175,7 +199,7 @@ DROP_SERIES_QUERY: } SELECT_QUERY: - SELECT COLUMN_NAMES FROM_CLAUSE GROUP_BY_CLAUSE WHERE_CLAUSE LIMIT_AND_ORDER_CLAUSES + SELECT COLUMN_NAMES FROM_CLAUSE GROUP_BY_CLAUSE WHERE_CLAUSE LIMIT_AND_ORDER_CLAUSES INTO_CLAUSE { $$ = calloc(1, sizeof(select_query)); $$->c = $2; @@ -184,9 +208,10 @@ SELECT_QUERY: $$->where_condition = $5; $$->limit = $6.limit; $$->ascending = $6.ascending; + $$->into_clause = $7; } | - SELECT COLUMN_NAMES FROM_CLAUSE WHERE_CLAUSE GROUP_BY_CLAUSE LIMIT_AND_ORDER_CLAUSES + SELECT COLUMN_NAMES FROM_CLAUSE WHERE_CLAUSE GROUP_BY_CLAUSE LIMIT_AND_ORDER_CLAUSES INTO_CLAUSE { $$ = calloc(1, sizeof(select_query)); $$->c = $2; @@ -195,6 +220,7 @@ SELECT_QUERY: $$->group_by = $5; $$->limit = $6.limit; $$->ascending = $6.ascending; + $$->into_clause = $7; } LIMIT_AND_ORDER_CLAUSES: @@ -273,6 +299,17 @@ GROUP_BY_CLAUSE: $$ = NULL; } +INTO_CLAUSE: + INTO TABLE_VALUE + { + $$ = malloc(sizeof(into_clause)); + $$->target = $2; + } + | + { + $$ = NULL; + } + COLUMN_NAMES: VALUES @@ -526,7 +563,7 @@ void yy_delete_buffer(void *, void *); query parse_query(char *const query_s) { - query q = {NULL, NULL, NULL, FALSE, NULL}; + query q = {NULL, NULL, NULL, NULL, FALSE, FALSE, NULL}; void *scanner; yylex_init(&scanner); #ifdef DEBUG diff --git a/src/parser/query_api.go b/src/parser/query_api.go index 2fb72a18387..6d82929e05f 100644 --- a/src/parser/query_api.go +++ b/src/parser/query_api.go @@ -337,6 +337,33 @@ func (self *SelectDeleteCommonQuery) GetQueryStringWithTimeCondition() string { return queryString + " and time < " + timeStr + "u" } +func (self *SelectDeleteCommonQuery) GetQueryStringForContinuousQuery(start, end time.Time) string { + queryString := self.GetQueryString() + queryString = strings.TrimSuffix(queryString, ";") + + intoRegex, _ := regexp.Compile("(?i)\\s+into\\s+") + components := intoRegex.Split(queryString, 2) + + queryString = components[0] + + startTime := common.TimeToMicroseconds(start) + startTimeStr := strconv.FormatInt(startTime-1, 10) + endTime := common.TimeToMicroseconds(end) + endTimeStr := strconv.FormatInt(endTime, 10) + + if self.GetWhereCondition() == nil { + queryString = queryString + " where " + } else { + queryString = queryString + " and " + } + + if start.IsZero() { + return queryString + "time < " + endTimeStr + "u" + } else { + return queryString + "time > " + startTimeStr + "u and time < " + endTimeStr + "u" + } +} + // parse the start time or end time from the where conditions and return the new condition // without the time clauses, or nil if there are no where conditions left func getTime(condition *WhereCondition, isParsingStartTime bool) (*WhereCondition, time.Time, error) { diff --git a/src/parser/query_types.h b/src/parser/query_types.h index 07e83471203..02a181cba91 100644 --- a/src/parser/query_types.h +++ b/src/parser/query_types.h @@ -74,10 +74,15 @@ typedef struct { table_name_array *names; } from_clause; +typedef struct { + value *target; +} into_clause; + typedef struct { value_array *c; from_clause *from_clause; groupby_clause *group_by; + into_clause *into_clause; condition *where_condition; int limit; char ascending; @@ -93,11 +98,17 @@ typedef struct { value *name; } drop_series_query; +typedef struct { + int id; +} drop_query; + typedef struct { select_query *select_query; delete_query *delete_query; drop_series_query *drop_series_query; + drop_query *drop_query; char list_series_query; + char list_continuous_queries_query; error *error; } query; diff --git a/src/parser/test_memory_leaks.sh b/src/parser/test_memory_leaks.sh index 7f5cb3461f3..356f0d5e2e7 100755 --- a/src/parser/test_memory_leaks.sh +++ b/src/parser/test_memory_leaks.sh @@ -45,6 +45,16 @@ int main(int argc, char **argv) { q = parse_query("select * from foobar limit"); close_query(&q); + // test continuous queries + q = parse_query("select * from foo into bar;"); + close_query(&q); + + q = parse_query("list continuous queries;"); + close_query(&q); + + q = parse_query("drop continuous query 5;"); + close_query(&q); + return 0; } EOF diff --git a/src/server/server.go b/src/server/server.go index e6e53e5d587..9bfd2a990f6 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -41,6 +41,7 @@ func NewServer(config *configuration.Configuration) (*Server, error) { return nil, err } + raftServer.AssignEngineAndCoordinator(eng, coord) httpApi := http.NewHttpServer(config.ApiHttpPortString(), config.AdminAssetsDir, eng, coord, coord) adminServer := admin.NewHttpServer(config.AdminAssetsDir, config.AdminHttpPortString())