From 87361d16b4c9c323bef12bf7898a369813231d9c Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Fri, 1 Aug 2014 15:02:32 -0400 Subject: [PATCH] Update load database config with validations and tests Update the client struct so the Database is public and can be set by library users. Move the db_config API to specify the database name in the URL. Make load config test also test loading continuous queries. --- api/http/api.go | 32 +++++++++++++++++++------------ client/influxdb.go | 10 +++++----- integration/database_conf.json | 5 ++++- integration/single_server_test.go | 10 +++++++++- 4 files changed, 38 insertions(+), 19 deletions(-) diff --git a/api/http/api.go b/api/http/api.go index 1389dd1d601..4f0cb8e407c 100644 --- a/api/http/api.go +++ b/api/http/api.go @@ -154,7 +154,7 @@ func (self *HttpServer) Serve(listener net.Listener) { self.registerEndpoint(p, "get", "/cluster/shard_spaces", self.getShardSpaces) self.registerEndpoint(p, "post", "/cluster/shard_spaces", self.createShardSpace) self.registerEndpoint(p, "del", "/cluster/shard_spaces/:db/:name", self.dropShardSpace) - self.registerEndpoint(p, "post", "/cluster/db_config", self.configureDatabase) + self.registerEndpoint(p, "post", "/cluster/database_configs/:db", self.configureDatabase) // return whether the cluster is in sync or not self.registerEndpoint(p, "get", "/sync", self.isInSync) @@ -1210,6 +1210,21 @@ func (self *HttpServer) configureDatabase(w libhttp.ResponseWriter, r *libhttp.R if err != nil { return libhttp.StatusInternalServerError, err.Error() } + databaseConfig.Database = r.URL.Query().Get(":db") + + // validate before creating anything + for _, queryString := range databaseConfig.ContinuousQueries { + q, err := parser.ParseQuery(queryString) + if err != nil { + return libhttp.StatusBadRequest, err.Error() + } + for _, query := range q { + if !query.SelectQuery.IsContinuousQuery() { + return libhttp.StatusBadRequest, fmt.Errorf("This query isn't a continuous query. Use 'into'. %s", query.QueryString) + } + } + } + err = self.coordinator.CreateDatabase(u, databaseConfig.Database) if err != nil { return libhttp.StatusBadRequest, err.Error() @@ -1222,18 +1237,11 @@ func (self *HttpServer) configureDatabase(w libhttp.ResponseWriter, r *libhttp.R } } for _, queryString := range databaseConfig.ContinuousQueries { - q, err := parser.ParseQuery(queryString) - if err != nil { - return libhttp.StatusInternalServerError, err.Error() - } + q, _ := parser.ParseQuery(queryString) for _, query := range q { - selectQuery := query.SelectQuery - - if selectQuery.IsContinuousQuery() { - err := self.coordinator.CreateContinuousQuery(u, databaseConfig.Database, query.QueryString) - if err != nil { - return libhttp.StatusInternalServerError, err.Error() - } + err := self.coordinator.CreateContinuousQuery(u, databaseConfig.Database, query.QueryString) + if err != nil { + return libhttp.StatusInternalServerError, err.Error() } } } diff --git a/client/influxdb.go b/client/influxdb.go index 993076a28b2..19e720a1b31 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -21,7 +21,7 @@ type Client struct { host string username string password string - database string + Database string httpClient *http.Client udpConn *net.UDPConn schema string @@ -394,7 +394,7 @@ func (self *Client) writeSeriesCommon(series []*Series, options map[string]strin if err != nil { return err } - url := self.getUrl("/db/" + self.database + "/series") + url := self.getUrl("/db/" + self.Database + "/series") for name, value := range options { url += fmt.Sprintf("&%s=%s", name, value) } @@ -431,7 +431,7 @@ func (self *Client) QueryWithNumbers(query string, precision ...TimePrecision) ( func (self *Client) queryCommon(query string, useNumber bool, precision ...TimePrecision) ([]*Series, error) { escapedQuery := url.QueryEscape(query) - url := self.getUrl("/db/" + self.database + "/series") + url := self.getUrl("/db/" + self.Database + "/series") if len(precision) > 0 { url += "&time_precision=" + string(precision[0]) } @@ -481,12 +481,12 @@ func (self *Client) AuthenticateClusterAdmin(username, password string) error { } func (self *Client) GetContinuousQueries() ([]map[string]interface{}, error) { - url := self.getUrlWithUserAndPass(fmt.Sprintf("/db/%s/continuous_queries", self.database), self.username, self.password) + url := self.getUrlWithUserAndPass(fmt.Sprintf("/db/%s/continuous_queries", self.Database), self.username, self.password) return self.listSomething(url) } func (self *Client) DeleteContinuousQueries(id int) error { - url := self.getUrlWithUserAndPass(fmt.Sprintf("/db/%s/continuous_queries/%d", self.database, id), self.username, self.password) + url := self.getUrlWithUserAndPass(fmt.Sprintf("/db/%s/continuous_queries/%d", self.Database, id), self.username, self.password) resp, err := self.del(url) return responseToError(resp, err, true) } diff --git a/integration/database_conf.json b/integration/database_conf.json index 25d22b0e990..78fd59ab03b 100644 --- a/integration/database_conf.json +++ b/integration/database_conf.json @@ -1,5 +1,4 @@ { - "database": "test_db_conf_db", "spaces": [ { "name": "everything", @@ -25,5 +24,9 @@ "replicationFactor": 2, "split": 3 } + ], + "continuousQueries": [ + "select * from events into events.[id]", + "select count(value) from events group by time(5m) into 5m.count.events" ] } diff --git a/integration/single_server_test.go b/integration/single_server_test.go index 6107606da54..e8a4f50c695 100644 --- a/integration/single_server_test.go +++ b/integration/single_server_test.go @@ -837,7 +837,7 @@ func (self *SingleServerSuite) TestDropShardSpace(c *C) { func (self *SingleServerSuite) TestLoadDatabaseConfig(c *C) { contents, err := ioutil.ReadFile("database_conf.json") c.Assert(err, IsNil) - resp, err := http.Post("http://localhost:8086/cluster/db_config?u=root&p=root", "application/json", bytes.NewBuffer(contents)) + resp, err := http.Post("http://localhost:8086/cluster/database_configs/test_db_conf_db?u=root&p=root", "application/json", bytes.NewBuffer(contents)) c.Assert(err, IsNil) c.Assert(resp.StatusCode, Equals, http.StatusCreated) @@ -849,4 +849,12 @@ func (self *SingleServerSuite) TestLoadDatabaseConfig(c *C) { c.Assert(space, NotNil) c.Assert(space.Split, Equals, uint32(3)) c.Assert(space.ReplicationFactor, Equals, uint32(2)) + + client.Database = "test_db_conf_db" + series, err := client.Query("list continuous queries;") + c.Assert(err, IsNil) + queries := series[0].Points + c.Assert(queries, HasLen, 2) + c.Assert(queries[0][2], Equals, "select * from events into events.[id]") + c.Assert(queries[1][2], Equals, "select count(value) from events group by time(5m) into 5m.count.events") }