Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Continuous Queries #153

Merged
merged 1 commit into from
Jan 17, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile.in
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ ifneq ($(only),)
GOTEST_OPTS = -gocheck.f $(only)
endif
ifneq ($(verbose),off)
GOTEST_OPTS += -v -gocheck.v
GOTEST_OPTS += -v -gocheck.v -gocheck.vv
endif

test: test_dependencies parser
Expand Down
55 changes: 55 additions & 0 deletions src/api/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"path/filepath"
"protocol"
"regexp"
"strconv"
"strings"
"time"
)
Expand Down Expand Up @@ -99,6 +100,11 @@ func (self *HttpServer) Serve(listener net.Listener) {
self.registerEndpoint(p, "del", "/db/:db/users/:user", self.deleteDbUser)
self.registerEndpoint(p, "post", "/db/:db/users/:user", self.updateDbUser)

// continuous queries management interface
self.registerEndpoint(p, "get", "/db/:db/continuous_queries", self.listDbContinuousQueries)
self.registerEndpoint(p, "post", "/db/:db/continuous_queries", self.createDbContinuousQueries)
self.registerEndpoint(p, "del", "/db/:db/continuous_queries/:id", self.deleteDbContinuousQueries)

// healthcheck
self.registerEndpoint(p, "get", "/ping", self.ping)

Expand Down Expand Up @@ -622,6 +628,10 @@ type User struct {
Name string `json:"username"`
}

type NewContinuousQuery struct {
Query string `json:"query"`
}

func (self *HttpServer) listClusterAdmins(w libhttp.ResponseWriter, r *libhttp.Request) {
self.tryAsClusterAdmin(w, r, func(u common.User) (int, interface{}) {
names, err := self.userManager.ListClusterAdmins(u)
Expand Down Expand Up @@ -906,3 +916,48 @@ func (self *HttpServer) listInterfaces(w libhttp.ResponseWriter, r *libhttp.Requ
w.Write(body)
}
}

func (self *HttpServer) listDbContinuousQueries(w libhttp.ResponseWriter, r *libhttp.Request) {
db := r.URL.Query().Get(":db")

self.tryAsDbUserAndClusterAdmin(w, r, func(u common.User) (int, interface{}) {
queries, err := self.coordinator.ListContinuousQueries(u, db)
if err != nil {
return errorToStatusCode(err), err.Error()
}

return libhttp.StatusOK, queries
})
}

func (self *HttpServer) createDbContinuousQueries(w libhttp.ResponseWriter, r *libhttp.Request) {
db := r.URL.Query().Get(":db")

self.tryAsDbUserAndClusterAdmin(w, r, func(u common.User) (int, interface{}) {
var values interface{}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
return libhttp.StatusInternalServerError, err.Error()
}
json.Unmarshal(body, &values)
query := values.(map[string]interface{})["query"].(string)
fmt.Println(query)

if err := self.coordinator.CreateContinuousQuery(u, db, query); err != nil {
return errorToStatusCode(err), err.Error()
}
return libhttp.StatusOK, nil
})
}

func (self *HttpServer) deleteDbContinuousQueries(w libhttp.ResponseWriter, r *libhttp.Request) {
db := r.URL.Query().Get(":db")
id, _ := strconv.ParseInt(r.URL.Query().Get(":id"), 10, 64)

self.tryAsDbUserAndClusterAdmin(w, r, func(u common.User) (int, interface{}) {
if err := self.coordinator.DeleteContinuousQuery(u, db, uint32(id)); err != nil {
return errorToStatusCode(err), err.Error()
}
return libhttp.StatusOK, nil
})
}
137 changes: 132 additions & 5 deletions src/api/http/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,11 @@ func (self *MockEngine) RunQuery(_ common.User, _ string, query string, localOnl

type MockCoordinator struct {
coordinator.Coordinator
series []*protocol.Series
deleteQueries []*parser.DeleteQuery
db string
droppedDb string
series []*protocol.Series
continuousQueries map[string][]*coordinator.ContinuousQuery
deleteQueries []*parser.DeleteQuery
db string
droppedDb string
}

func (self *MockCoordinator) WriteSeriesData(_ common.User, db string, series *protocol.Series) error {
Expand All @@ -127,14 +128,56 @@ func (self *MockCoordinator) DropDatabase(_ common.User, db string) error {
return nil
}

func (self *MockCoordinator) ListContinuousQueries(_ common.User, db string) ([]*protocol.Series, error) {
points := []*protocol.Point{}

for _, query := range self.continuousQueries[db] {
queryId := int64(query.Id)
queryString := query.Query
points = append(points, &protocol.Point{
Values: []*protocol.FieldValue{
&protocol.FieldValue{Int64Value: &queryId},
&protocol.FieldValue{StringValue: &queryString},
},
Timestamp: nil,
SequenceNumber: nil,
})
}

seriesName := "continuous queries"
series := []*protocol.Series{&protocol.Series{
Name: &seriesName,
Fields: []string{"id", "query"},
Points: points,
}}
return series, nil
}

func (self *MockCoordinator) CreateContinuousQuery(_ common.User, db string, query string) error {
self.continuousQueries[db] = append(self.continuousQueries[db], &coordinator.ContinuousQuery{2, query})
return nil
}

func (self *MockCoordinator) DeleteContinuousQuery(_ common.User, db string, id uint32) error {
length := len(self.continuousQueries[db])
_, self.continuousQueries[db] = self.continuousQueries[db][length-1], self.continuousQueries[db][:length-1]
return nil
}

func (self *ApiSuite) formatUrl(path string, args ...interface{}) string {
path = fmt.Sprintf(path, args...)
port := self.listener.Addr().(*net.TCPAddr).Port
return fmt.Sprintf("http://localhost:%d%s", port, path)
}

func (self *ApiSuite) SetUpSuite(c *C) {
self.coordinator = &MockCoordinator{}
self.coordinator = &MockCoordinator{
continuousQueries: map[string][]*coordinator.ContinuousQuery{
"db1": []*coordinator.ContinuousQuery{
&coordinator.ContinuousQuery{1, "select * from foo into bar;"},
},
},
}
self.manager = &MockUserManager{
clusterAdmins: []string{"root"},
dbUsers: map[string][]string{"db1": []string{"db_user1"}},
Expand Down Expand Up @@ -733,3 +776,87 @@ func (self *ApiSuite) TestBasicAuthentication(c *C) {
c.Assert(err, IsNil)
c.Assert(users, DeepEquals, []*coordinator.Database{&coordinator.Database{"db1", 1}, &coordinator.Database{"db2", 1}})
}

func (self *ApiSuite) TestContinuousQueryOperations(c *C) {
// verify current continuous query index
url := self.formatUrl("/db/db1/continuous_queries?u=root&p=root")
resp, err := libhttp.Get(url)
c.Assert(err, IsNil)
c.Assert(resp.Header.Get("content-type"), Equals, "application/json")
body, err := ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
series := []*protocol.Series{}
err = json.Unmarshal(body, &series)
c.Assert(err, IsNil)
c.Assert(series, HasLen, 1)
c.Assert(series[0].Points, HasLen, 1)
c.Assert(series[0].Points[0].Values, HasLen, 2)

c.Assert(*series[0].Name, Equals, "continuous queries")
c.Assert(*series[0].Points[0].Values[0].Int64Value, Equals, int64(1))
c.Assert(*series[0].Points[0].Values[1].StringValue, Equals, "select * from foo into bar;")

resp.Body.Close()

// add a new continuous query
data := `{"query": "select * from quu into qux;"}`
url = self.formatUrl("/db/db1/continuous_queries?u=root&p=root")
resp, err = libhttp.Post(url, "application/json", bytes.NewBufferString(data))
c.Assert(err, IsNil)
c.Assert(resp.StatusCode, Equals, libhttp.StatusOK)
resp.Body.Close()

// verify updated continuous query index
url = self.formatUrl("/db/db1/continuous_queries?u=root&p=root")
resp, err = libhttp.Get(url)
c.Assert(err, IsNil)
c.Assert(resp.Header.Get("content-type"), Equals, "application/json")
body, err = ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
series = []*protocol.Series{}
err = json.Unmarshal(body, &series)
c.Assert(err, IsNil)

c.Assert(series, HasLen, 1)
c.Assert(series[0].Points, HasLen, 2)
c.Assert(series[0].Points[0].Values, HasLen, 2)
c.Assert(series[0].Points[1].Values, HasLen, 2)

c.Assert(*series[0].Name, Equals, "continuous queries")
c.Assert(*series[0].Points[0].Values[0].Int64Value, Equals, int64(1))
c.Assert(*series[0].Points[0].Values[1].StringValue, Equals, "select * from foo into bar;")
c.Assert(*series[0].Points[1].Values[0].Int64Value, Equals, int64(2))
c.Assert(*series[0].Points[1].Values[1].StringValue, Equals, "select * from quu into qux;")

resp.Body.Close()

// delete the newly-created query
url = self.formatUrl("/db/db1/continuous_queries/2?u=root&p=root")
req, err := libhttp.NewRequest("DELETE", url, nil)
c.Assert(err, IsNil)
resp, err = libhttp.DefaultClient.Do(req)
c.Assert(err, IsNil)
_, err = ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
c.Assert(resp.StatusCode, Equals, libhttp.StatusOK)
resp.Body.Close()

// verify updated continuous query index
url = self.formatUrl("/db/db1/continuous_queries?u=root&p=root")
resp, err = libhttp.Get(url)
c.Assert(err, IsNil)
c.Assert(resp.Header.Get("content-type"), Equals, "application/json")
body, err = ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
series = []*protocol.Series{}
err = json.Unmarshal(body, &series)
c.Assert(err, IsNil)
c.Assert(series, HasLen, 1)
c.Assert(series[0].Points, HasLen, 1)
c.Assert(series[0].Points[0].Values, HasLen, 2)

c.Assert(*series[0].Name, Equals, "continuous queries")
c.Assert(*series[0].Points[0].Values[0].Int64Value, Equals, int64(1))
c.Assert(*series[0].Points[0].Values[1].StringValue, Equals, "select * from foo into bar;")
resp.Body.Close()
}
81 changes: 81 additions & 0 deletions src/coordinator/cluster_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"encoding/gob"
"errors"
"fmt"
"parser"
"sync"
"sync/atomic"
"time"
)

/*
Expand All @@ -29,13 +31,22 @@ type ClusterConfiguration struct {
dbUsers map[string]map[string]*dbUser
servers []*ClusterServer
serversLock sync.RWMutex
continuousQueries map[string][]*ContinuousQuery
continuousQueriesLock sync.RWMutex
parsedContinuousQueries map[string]map[uint32]*parser.SelectQuery
continuousQueryTimestamp time.Time
hasRunningServers bool
localServerId uint32
ClusterVersion uint32
config *configuration.Configuration
addedLocalServerWait chan bool
}

type ContinuousQuery struct {
Id uint32
Query string
}

type Database struct {
Name string `json:"name"`
ReplicationFactor uint8 `json:"replicationFactor"`
Expand All @@ -46,6 +57,8 @@ func NewClusterConfiguration(config *configuration.Configuration) *ClusterConfig
databaseReplicationFactors: make(map[string]uint8),
clusterAdmins: make(map[string]*clusterAdmin),
dbUsers: make(map[string]map[string]*dbUser),
continuousQueries: make(map[string][]*ContinuousQuery),
parsedContinuousQueries: make(map[string]map[uint32]*parser.SelectQuery),
servers: make([]*ClusterServer, 0),
config: config,
addedLocalServerWait: make(chan bool, 1),
Expand Down Expand Up @@ -336,6 +349,74 @@ func (self *ClusterConfiguration) DropDatabase(name string) error {
return nil
}

func (self *ClusterConfiguration) CreateContinuousQuery(db string, query string) error {
self.continuousQueriesLock.Lock()
defer self.continuousQueriesLock.Unlock()

if self.continuousQueries == nil {
self.continuousQueries = map[string][]*ContinuousQuery{}
}

if self.parsedContinuousQueries == nil {
self.parsedContinuousQueries = map[string]map[uint32]*parser.SelectQuery{}
}

maxId := uint32(0)
for _, query := range self.continuousQueries[db] {
if query.Id > maxId {
maxId = query.Id
}
}

selectQuery, err := parser.ParseSelectQuery(query)
if err != nil {
return fmt.Errorf("Failed to parse continuous query: %s", query)
}

queryId := maxId + 1
if self.parsedContinuousQueries[db] == nil {
self.parsedContinuousQueries[db] = map[uint32]*parser.SelectQuery{queryId: selectQuery}
} else {
self.parsedContinuousQueries[db][queryId] = selectQuery
}
self.continuousQueries[db] = append(self.continuousQueries[db], &ContinuousQuery{queryId, query})

return nil
}

func (self *ClusterConfiguration) SetContinuousQueryTimestamp(timestamp time.Time) error {
self.continuousQueriesLock.Lock()
defer self.continuousQueriesLock.Unlock()

self.continuousQueryTimestamp = timestamp

return nil
}

func (self *ClusterConfiguration) DeleteContinuousQuery(db string, id uint32) error {
self.continuousQueriesLock.Lock()
defer self.continuousQueriesLock.Unlock()

for i, query := range self.continuousQueries[db] {
if query.Id == id {
q := self.continuousQueries[db]
q[len(q)-1], q[i], q = nil, q[len(q)-1], q[:len(q)-1]
self.continuousQueries[db] = q
delete(self.parsedContinuousQueries[db], id)
break
}
}

return nil
}

func (self *ClusterConfiguration) GetContinuousQueries(db string) []*ContinuousQuery {
self.continuousQueriesLock.Lock()
defer self.continuousQueriesLock.Unlock()

return self.continuousQueries[db]
}

func (self *ClusterConfiguration) GetDbUsers(db string) (names []string) {
self.usersLock.RLock()
defer self.usersLock.RUnlock()
Expand Down
Loading