diff --git a/api/http/api.go b/api/http/api.go index f931bc22ff5..65c2d436d61 100644 --- a/api/http/api.go +++ b/api/http/api.go @@ -162,6 +162,7 @@ func (self *HttpServer) Serve(listener net.Listener) { self.registerEndpoint(p, "get", "/cluster/shard_spaces", self.getShardSpaces) self.registerEndpoint(p, "post", "/cluster/shard_spaces/:db", self.createShardSpace) self.registerEndpoint(p, "del", "/cluster/shard_spaces/:db/:name", self.dropShardSpace) + self.registerEndpoint(p, "post", "/cluster/shard_spaces/:db/:name", self.updateShardSpace) self.registerEndpoint(p, "post", "/cluster/database_configs/:db", self.configureDatabase) // migrates leveldb data from 0.7 to 0.8 format. @@ -1272,3 +1273,27 @@ func (self *HttpServer) migrateData(w libhttp.ResponseWriter, r *libhttp.Request return libhttp.StatusAccepted, nil }) } + +func (self *HttpServer) updateShardSpace(w libhttp.ResponseWriter, r *libhttp.Request) { + self.tryAsClusterAdmin(w, r, func(u User) (int, interface{}) { + space := &cluster.ShardSpace{} + decoder := json.NewDecoder(r.Body) + err := decoder.Decode(space) + if err != nil { + return libhttp.StatusInternalServerError, err.Error() + } + space.Database = r.URL.Query().Get(":db") + space.Name = r.URL.Query().Get(":name") + if !self.clusterConfig.DatabaseExists(space.Database) { + return libhttp.StatusNotAcceptable, "Can't update a shard space for a database that doesn't exist" + } + if !self.clusterConfig.ShardSpaceExists(space) { + return libhttp.StatusNotAcceptable, "Can't update a shard space that doesn't exist" + } + + if err := self.raftServer.UpdateShardSpace(space); err != nil { + return libhttp.StatusInternalServerError, err.Error() + } + return libhttp.StatusOK, nil + }) +} diff --git a/client/influxdb.go b/client/influxdb.go index f81ce35c0b8..1aa178a98ca 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -608,3 +608,14 @@ func (self *Client) DropShard(id uint32, serverIds []uint32) error { _, err = self.delWithBody(url, bytes.NewBuffer(body)) return err } + +// Added to InfluxDB in 0.8.2 +func (self *Client) UpdateShardSpace(database, name string, space *ShardSpace) error { + url := self.getUrl(fmt.Sprintf("/cluster/shard_spaces/%s/%s", database, name)) + data, err := json.Marshal(space) + if err != nil { + return err + } + resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) + return responseToError(resp, err, true) +} diff --git a/cluster/cluster_configuration.go b/cluster/cluster_configuration.go index c58d3cc8aae..d82e2aeb822 100644 --- a/cluster/cluster_configuration.go +++ b/cluster/cluster_configuration.go @@ -125,16 +125,16 @@ func NewClusterConfiguration( } } -func (self *ClusterConfiguration) DoesShardSpaceExist(space *ShardSpace) error { +func (self *ClusterConfiguration) ShardSpaceExists(space *ShardSpace) bool { self.shardLock.RLock() defer self.shardLock.RUnlock() dbSpaces := self.databaseShardSpaces[space.Database] for _, s := range dbSpaces { if s.Name == space.Name { - return fmt.Errorf("Shard space %s exists", space.Name) + return true } } - return nil + return false } func (self *ClusterConfiguration) SetShardCreator(shardCreator ShardCreator) { @@ -1314,6 +1314,17 @@ func (self *ClusterConfiguration) AddShardSpace(space *ShardSpace) error { return nil } +func (self *ClusterConfiguration) UpdateShardSpace(space *ShardSpace) error { + self.shardLock.Lock() + defer self.shardLock.Unlock() + oldSpace := self.getShardSpaceByDatabaseAndName(space.Database, space.Name) + if oldSpace == nil { + return nil + } + oldSpace.UpdateFromSpace(space) + return nil +} + func (self *ClusterConfiguration) RemoveShardSpace(database, name string) error { log.Info("Dropping shard space %s on db %s", name, database) self.shardLock.Lock() diff --git a/cluster/shard_space.go b/cluster/shard_space.go index eb5e1a9a2cf..4a99b5cf43e 100644 --- a/cluster/shard_space.go +++ b/cluster/shard_space.go @@ -44,8 +44,8 @@ func NewShardSpace(database, name string) *ShardSpace { } func (s *ShardSpace) Validate(clusterConfig *ClusterConfiguration, checkForDb bool) error { - if err := clusterConfig.DoesShardSpaceExist(s); err != nil { - return err + if clusterConfig.ShardSpaceExists(s) { + return fmt.Errorf("Shard space %s exists for db %s", s.Name, s.Database) } if checkForDb { if !clusterConfig.DatabaseExists(s.Database) { @@ -132,3 +132,17 @@ func (s *ShardSpace) ParsedShardDuration() time.Duration { } return DEFAULT_SHARD_DURATION } + +func (s *ShardSpace) UpdateFromSpace(space *ShardSpace) error { + r, err := s.compileRegex(space.Regex) + if err != nil { + return err + } + s.Regex = space.Regex + s.compiledRegex = r + s.RetentionPolicy = space.RetentionPolicy + s.ShardDuration = space.ShardDuration + s.ReplicationFactor = space.ReplicationFactor + s.Split = space.Split + return nil +} diff --git a/coordinator/command.go b/coordinator/command.go index 45ba741854b..9dcd3a87c93 100644 --- a/coordinator/command.go +++ b/coordinator/command.go @@ -35,6 +35,7 @@ func init() { &DropSeriesCommand{}, &CreateShardSpaceCommand{}, &DropShardSpaceCommand{}, + &UpdateShardSpaceCommand{}, } { internalRaftCommands[command.CommandName()] = command } @@ -471,3 +472,21 @@ func (c *DropShardSpaceCommand) Apply(server raft.Server) (interface{}, error) { err := config.RemoveShardSpace(c.Database, c.Name) return nil, err } + +type UpdateShardSpaceCommand struct { + ShardSpace *cluster.ShardSpace +} + +func NewUpdateShardSpaceCommand(space *cluster.ShardSpace) *UpdateShardSpaceCommand { + return &UpdateShardSpaceCommand{ShardSpace: space} +} + +func (c *UpdateShardSpaceCommand) CommandName() string { + return "update_shard_space" +} + +func (c *UpdateShardSpaceCommand) Apply(server raft.Server) (interface{}, error) { + config := server.Context().(*cluster.ClusterConfiguration) + err := config.UpdateShardSpace(c.ShardSpace) + return nil, err +} diff --git a/coordinator/raft_server.go b/coordinator/raft_server.go index fa23b905fd1..e422773775f 100644 --- a/coordinator/raft_server.go +++ b/coordinator/raft_server.go @@ -830,3 +830,9 @@ func (self *RaftServer) DropShardSpace(database, name string) error { _, err := self.doOrProxyCommand(command) return err } + +func (self *RaftServer) UpdateShardSpace(shardSpace *cluster.ShardSpace) error { + command := NewUpdateShardSpaceCommand(shardSpace) + _, err := self.doOrProxyCommand(command) + return err +} diff --git a/integration/single_server_test.go b/integration/single_server_test.go index 18afe4f857f..b66c37d369f 100644 --- a/integration/single_server_test.go +++ b/integration/single_server_test.go @@ -952,10 +952,9 @@ func (self *SingleServerSuite) TestSeriesShouldReturnSorted(c *C) { } } -// fix for #886 - https://github.com/influxdb/influxdb/issues/886 -func (self *SingleServerSuite) TestShardSpacesCorrectAfterRestart(c *C) { +func (self *SingleServerSuite) TestUpdateShardSpace(c *C) { client := self.server.GetClient("", c) - db := "test_shard_restart" + db := "test_update_shard_space" c.Assert(client.CreateDatabase(db), IsNil) client = self.server.GetClient(db, c) @@ -993,23 +992,34 @@ func (self *SingleServerSuite) TestShardSpacesCorrectAfterRestart(c *C) { c.Assert(series[1].Name, Equals, "space2.foo") c.Assert(series[1].Points[0][1], Equals, float64(1)) - _, err = http.Post("http://localhost:8086/raft/force_compaction?u=root&p=root", "", nil) + space2.Regex = "/^(space2|foo).*/" + err = client.UpdateShardSpace(db, "space2", space2) c.Assert(err, IsNil) - self.server.Stop() - c.Assert(self.server.Start(), IsNil) - self.server.WaitForServerToStart() + spaces, err = client.GetShardSpaces() + c.Assert(err, IsNil) + c.Assert(self.getSpace(db, "space2", "/^(space2|foo).*/", spaces), NotNil) + c.Assert(self.getSpace(db, "space1", "/.*/", spaces), NotNil) + // foo should now be effectively hidden from us. series, err = client.Query("select count(val) from /.*/") c.Assert(err, IsNil) - c.Assert(series, HasLen, 2) - c.Assert(series[0].Name, Equals, "foo") + c.Assert(series, HasLen, 1) + c.Assert(series[0].Name, Equals, "space2.foo") c.Assert(series[0].Points[0][1], Equals, float64(1)) - c.Assert(series[1].Name, Equals, "space2.foo") - c.Assert(series[1].Points[0][1], Equals, float64(1)) - spaces, err = client.GetShardSpaces() + self.server.WriteDataToDatabase(db, ` +[ + { + "name": "foo", + "columns": ["val"], + "points":[[5]] + } +]`, c) + + series, err = client.Query("select * from foo") c.Assert(err, IsNil) - c.Assert(self.getSpace(db, "space2", "/^space2.*/", spaces), NotNil) - c.Assert(self.getSpace(db, "space1", "/.*/", spaces), NotNil) + c.Assert(series, HasLen, 1) + c.Assert(series[0].Name, Equals, "foo") + c.Assert(series[0].Points[0][2], Equals, float64(5)) }