Skip to content

Commit

Permalink
Move load database config to API
Browse files Browse the repository at this point in the history
Fix #791 - Removed load database config options from the daemon. Created an API endpoint and updated test.
Fix #745 - Added definition of continuous queries to load database config.
  • Loading branch information
pauldix committed Jul 31, 2014
1 parent eaaa322 commit ab24da4
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 80 deletions.
49 changes: 49 additions & 0 deletions api/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func (self *HttpServer) Serve(listener net.Listener) {
self.registerEndpoint(p, "get", "/cluster/shard_spaces", self.getShardSpaces)
self.registerEndpoint(p, "post", "/cluster/shard_spaces", self.createShardSpace)
self.registerEndpoint(p, "del", "/cluster/shard_spaces/:db/:name", self.dropShardSpace)
self.registerEndpoint(p, "post", "/cluster/db_config", self.configureDatabase)

// return whether the cluster is in sync or not
self.registerEndpoint(p, "get", "/sync", self.isInSync)
Expand Down Expand Up @@ -1191,3 +1192,51 @@ func (self *HttpServer) dropShardSpace(w libhttp.ResponseWriter, r *libhttp.Requ
return libhttp.StatusOK, nil
})
}

type DatabaseConfig struct {
Database string `json:"database"`
Spaces []*cluster.ShardSpace `json:"spaces"`
ContinuousQueries []string `json:"continuousQueries"`
}

func (self *HttpServer) configureDatabase(w libhttp.ResponseWriter, r *libhttp.Request) {
self.tryAsClusterAdmin(w, r, func(u User) (int, interface{}) {
databaseConfig := &DatabaseConfig{}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
return libhttp.StatusInternalServerError, err.Error()
}
err = json.Unmarshal(body, databaseConfig)
if err != nil {
return libhttp.StatusInternalServerError, err.Error()
}
err = self.coordinator.CreateDatabase(u, databaseConfig.Database)
if err != nil {
return libhttp.StatusBadRequest, err.Error()
}
for _, space := range databaseConfig.Spaces {
space.Database = databaseConfig.Database
err = self.raftServer.CreateShardSpace(space)
if err != nil {
return libhttp.StatusInternalServerError, err.Error()
}
}
for _, queryString := range databaseConfig.ContinuousQueries {
q, err := parser.ParseQuery(queryString)
if err != nil {
return libhttp.StatusInternalServerError, err.Error()
}
for _, query := range q {
selectQuery := query.SelectQuery

if selectQuery.IsContinuousQuery() {
err := self.coordinator.CreateContinuousQuery(u, databaseConfig.Database, query.QueryString)
if err != nil {
return libhttp.StatusInternalServerError, err.Error()
}
}
}
}
return libhttp.StatusCreated, nil
})
}
2 changes: 1 addition & 1 deletion cluster/cluster_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (self *ClusterConfiguration) PeriodicallyDropShardsWithRetentionPolicies()
go func() {
for {
time.Sleep(self.config.StorageRetentionSweepPeriod.Duration)
log.Debug("Checking for shards to drop")
log.Info("Checking for shards to drop")
shards := self.getExpiredShards()
for _, s := range shards {
self.shardStore.DeleteShard(s.id)
Expand Down
53 changes: 0 additions & 53 deletions daemon/influxd.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
Expand All @@ -13,7 +12,6 @@ import (
"time"

log "code.google.com/p/log4go"
"github.com/influxdb/influxdb/client"
"github.com/influxdb/influxdb/configuration"
"github.com/influxdb/influxdb/coordinator"
"github.com/influxdb/influxdb/server"
Expand Down Expand Up @@ -63,10 +61,6 @@ func main() {
protobufPort := flag.Int("protobuf-port", 0, "Override the protobuf port, the `protobuf_port` config option will be overridden")
pidFile := flag.String("pidfile", "", "the pid file")
repairLeveldb := flag.Bool("repair-ldb", false, "set to true to repair the leveldb files")
loadDatabaseConfig := flag.String("load-database-config", "", "Will create databases with the given shard spaces from a file.")
loadServer := flag.String("load-server", "localhost:8086", "If loading a database config, connects to this host/port")
loadUser := flag.String("load-user", "root", "If loading a database config, uses this user to auth")
loadPassword := flag.String("load-password", "root", "If loading a database config, use this password to auth")
stdout := flag.Bool("stdout", false, "Log to stdout overriding the configuration")

runtime.GOMAXPROCS(runtime.NumCPU())
Expand All @@ -77,13 +71,6 @@ func main() {
fmt.Println(v)
return
}
if *loadDatabaseConfig != "" {
err := LoadDatabaseConfig(*loadDatabaseConfig, *loadServer, *loadUser, *loadPassword)
if err != nil {
panic(err)
}
return
}
config := configuration.LoadConfiguration(*fileName)

// override the hostname if it was specified on the command line
Expand Down Expand Up @@ -177,43 +164,3 @@ func main() {
log.Error("ListenAndServe failed: ", err)
}
}

type DatabaseConfig struct {
Database string `json:"database"`
Spaces []*client.ShardSpace `json:"spaces"`
}

func LoadDatabaseConfig(fileName, server, user, password string) error {
fmt.Println("Loading config from ", fileName)
c, err := client.NewClient(&client.ClientConfig{
Host: server,
Username: user,
Password: password,
})
if err != nil {
return err
}
content, err := ioutil.ReadFile(fileName)
if err != nil {
return err
}
configs := []*DatabaseConfig{}
err = json.Unmarshal(content, &configs)
if err != nil {
return err
}
for _, config := range configs {
err := c.CreateDatabase(config.Database)
if err != nil {
return err
}
for _, space := range config.Spaces {
space.Database = config.Database
err := c.CreateShardSpace(space)
if err != nil {
return err
}
}
}
return nil
}
10 changes: 4 additions & 6 deletions integration/database_conf.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
[
{
"database": "test_db_conf_db",
"spaces": [
{
"name": "everything",
"retentionPolicy": "90d",
"retentionPolicy": "inf",
"shardDuration": "7d",
"regex": "/.*/",
"replicationFactor": 1,
Expand All @@ -22,10 +21,9 @@
"name": "specific",
"retentionPolicy": "7d",
"shardDuration": "1d",
"regex": "/^something_specficic/",
"replicationFactor": 1,
"split": 1
"regex": "/^something_specfic/",
"replicationFactor": 2,
"split": 3
}
]
}
]
36 changes: 16 additions & 20 deletions integration/single_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"io/ioutil"
"net/http"
"os"
"os/exec"
"path/filepath"
"strconv"

influxdb "github.com/influxdb/influxdb/client"
Expand Down Expand Up @@ -785,7 +783,7 @@ func (self *SingleServerSuite) TestCreateShardSpace(c *C) {
]`, c)
spaces, err = client.GetShardSpaces()
c.Assert(err, IsNil)
c.Assert(self.hasSpace("db1", "month", "/^the_dude_abides/", spaces), Equals, true)
c.Assert(self.getSpace("db1", "month", "/^the_dude_abides/", spaces), NotNil)
shards, err := client.GetShards()
c.Assert(err, IsNil)
spaceShards := self.getShardsForSpace("month", shards.All)
Expand All @@ -802,14 +800,13 @@ func (self *SingleServerSuite) getShardsForSpace(name string, shards []*influxdb
return filteredShards
}

func (self *SingleServerSuite) hasSpace(database, name, regex string, spaces []*influxdb.ShardSpace) bool {
func (self *SingleServerSuite) getSpace(database, name string, regex string, spaces []*influxdb.ShardSpace) *influxdb.ShardSpace {
for _, s := range spaces {
fmt.Printf("Checking %#v\n", s)
if s.Name == name && s.Database == database && s.Regex == regex {
return true
return s
}
}
return false
return nil
}

func (self *SingleServerSuite) TestDropShardSpace(c *C) {
Expand All @@ -829,28 +826,27 @@ func (self *SingleServerSuite) TestDropShardSpace(c *C) {
]`, c)
spaces, err := client.GetShardSpaces()
c.Assert(err, IsNil)
c.Assert(self.hasSpace("db1", spaceName, "/^dont_drop_me_bro/", spaces), Equals, true)
c.Assert(self.getSpace("db1", spaceName, "/^dont_drop_me_bro/", spaces), NotNil)
err = client.DropShardSpace("db1", spaceName)
c.Assert(err, IsNil)
spaces, err = client.GetShardSpaces()
c.Assert(err, IsNil)
c.Assert(self.hasSpace("db1", spaceName, "/^dont_drop_me_bro/", spaces), Equals, false)
c.Assert(self.getSpace("db1", spaceName, "/^dont_drop_me_bro/", spaces), IsNil)
}

func (self *SingleServerSuite) TestLoadDatabaseConfig(c *C) {
dir, err := os.Getwd()
if err != nil {
c.Error(err)
}
root := filepath.Join(dir, "..")
filename := filepath.Join(root, "influxdb")
configArg := "-load-database-config"
cmd := exec.Command(filename, configArg, "database_conf.json")
_, err = cmd.Output()
contents, err := ioutil.ReadFile("database_conf.json")
c.Assert(err, IsNil)
resp, err := http.Post("http://localhost:8086/cluster/db_config?u=root&p=root", "application/json", bytes.NewBuffer(contents))
c.Assert(err, IsNil)
c.Assert(resp.StatusCode, Equals, http.StatusCreated)

client := self.server.GetClient("", c)
spaces, err := client.GetShardSpaces()
c.Assert(err, IsNil)
c.Assert(self.hasSpace("test_db_conf_db", "specific", "/^something_specficic/", spaces), Equals, true)
c.Assert(self.hasSpace("test_db_conf_db", "everything", "/.*/", spaces), Equals, true)
c.Assert(self.getSpace("test_db_conf_db", "everything", "/.*/", spaces), NotNil)
space := self.getSpace("test_db_conf_db", "specific", "/^something_specfic/", spaces)
c.Assert(space, NotNil)
c.Assert(space.Split, Equals, uint32(3))
c.Assert(space.ReplicationFactor, Equals, uint32(2))
}

0 comments on commit ab24da4

Please sign in to comment.