Skip to content

Commit

Permalink
Add ability to update existing shard spaces.
Browse files Browse the repository at this point in the history
This will help users recover from #886. It's dangerous functionality because it only changes the metadata. Will document and tell people to use with caution.
  • Loading branch information
pauldix committed Sep 8, 2014
1 parent f20f8e8 commit 6ddfba6
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 28 deletions.
43 changes: 34 additions & 9 deletions api/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,16 @@ func (self *HttpServer) Serve(listener net.Listener) {
self.registerEndpoint("get", "/interfaces", self.listInterfaces)

// cluster config endpoints
self.registerEndpoint("get", "/cluster/servers", self.listServers)
self.registerEndpoint("del", "/cluster/servers/:id", self.removeServers)
self.registerEndpoint("post", "/cluster/shards", self.createShard)
self.registerEndpoint("get", "/cluster/shards", self.getShards)
self.registerEndpoint("del", "/cluster/shards/:id", self.dropShard)
self.registerEndpoint("get", "/cluster/shard_spaces", self.getShardSpaces)
self.registerEndpoint("post", "/cluster/shard_spaces/:db", self.createShardSpace)
self.registerEndpoint("del", "/cluster/shard_spaces/:db/:name", self.dropShardSpace)
self.registerEndpoint("post", "/cluster/database_configs/:db", self.configureDatabase)
self.registerEndpoint(p, "get", "/cluster/servers", self.listServers)
self.registerEndpoint(p, "del", "/cluster/servers/:id", self.removeServers)
self.registerEndpoint(p, "post", "/cluster/shards", self.createShard)
self.registerEndpoint(p, "get", "/cluster/shards", self.getShards)
self.registerEndpoint(p, "del", "/cluster/shards/:id", self.dropShard)
self.registerEndpoint(p, "get", "/cluster/shard_spaces", self.getShardSpaces)
self.registerEndpoint(p, "post", "/cluster/shard_spaces/:db", self.createShardSpace)
self.registerEndpoint(p, "del", "/cluster/shard_spaces/:db/:name", self.dropShardSpace)
self.registerEndpoint(p, "post", "/cluster/shard_spaces/:db/:name", self.updateShardSpace)
self.registerEndpoint(p, "post", "/cluster/database_configs/:db", self.configureDatabase)

// return whether the cluster is in sync or not
self.registerEndpoint("get", "/sync", self.isInSync)
Expand Down Expand Up @@ -1185,3 +1186,27 @@ func (self *HttpServer) configureDatabase(w libhttp.ResponseWriter, r *libhttp.R
return libhttp.StatusCreated, nil
})
}

func (self *HttpServer) updateShardSpace(w libhttp.ResponseWriter, r *libhttp.Request) {
self.tryAsClusterAdmin(w, r, func(u User) (int, interface{}) {
space := &cluster.ShardSpace{}
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(space)
if err != nil {
return libhttp.StatusInternalServerError, err.Error()
}
space.Database = r.URL.Query().Get(":db")
space.Name = r.URL.Query().Get(":name")
if !self.clusterConfig.DatabaseExists(space.Database) {
return libhttp.StatusNotAcceptable, "Can't update a shard space for a database that doesn't exist"
}
if !self.clusterConfig.ShardSpaceExists(space) {
return libhttp.StatusNotAcceptable, "Can't update a shard space that doesn't exist"
}

if err := self.raftServer.UpdateShardSpace(space); err != nil {
return libhttp.StatusInternalServerError, err.Error()
}
return libhttp.StatusOK, nil
})
}
11 changes: 11 additions & 0 deletions client/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,3 +597,14 @@ func (self *Client) DropShard(id uint32, serverIds []uint32) error {
_, err = self.delWithBody(url, bytes.NewBuffer(body))
return err
}

// Added to InfluxDB in 0.8.2
func (self *Client) UpdateShardSpace(database, name string, space *ShardSpace) error {
url := self.getUrl(fmt.Sprintf("/cluster/shard_spaces/%s/%s", database, name))
data, err := json.Marshal(space)
if err != nil {
return err
}
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
return responseToError(resp, err, true)
}
17 changes: 14 additions & 3 deletions cluster/cluster_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,16 @@ func NewClusterConfiguration(
}
}

func (self *ClusterConfiguration) DoesShardSpaceExist(space *ShardSpace) error {
func (self *ClusterConfiguration) ShardSpaceExists(space *ShardSpace) bool {
self.shardLock.RLock()
defer self.shardLock.RUnlock()
dbSpaces := self.databaseShardSpaces[space.Database]
for _, s := range dbSpaces {
if s.Name == space.Name {
return fmt.Errorf("Shard space %s exists", space.Name)
return true
}
}
return nil
return false
}

func (self *ClusterConfiguration) SetShardCreator(shardCreator ShardCreator) {
Expand Down Expand Up @@ -1314,6 +1314,17 @@ func (self *ClusterConfiguration) AddShardSpace(space *ShardSpace) error {
return nil
}

func (self *ClusterConfiguration) UpdateShardSpace(space *ShardSpace) error {
self.shardLock.Lock()
defer self.shardLock.Unlock()
oldSpace := self.getShardSpaceByDatabaseAndName(space.Database, space.Name)
if oldSpace == nil {
return nil
}
oldSpace.UpdateFromSpace(space)
return nil
}

func (self *ClusterConfiguration) RemoveShardSpace(database, name string) error {
log.Info("Dropping shard space %s on db %s", name, database)
self.shardLock.Lock()
Expand Down
18 changes: 16 additions & 2 deletions cluster/shard_space.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func NewShardSpace(database, name string) *ShardSpace {
}

func (s *ShardSpace) Validate(clusterConfig *ClusterConfiguration, checkForDb bool) error {
if err := clusterConfig.DoesShardSpaceExist(s); err != nil {
return err
if clusterConfig.ShardSpaceExists(s) {
return fmt.Errorf("Shard space %s exists for db %s", s.Name, s.Database)
}
if checkForDb {
if !clusterConfig.DatabaseExists(s.Database) {
Expand Down Expand Up @@ -132,3 +132,17 @@ func (s *ShardSpace) ParsedShardDuration() time.Duration {
}
return DEFAULT_SHARD_DURATION
}

func (s *ShardSpace) UpdateFromSpace(space *ShardSpace) error {
r, err := s.compileRegex(space.Regex)
if err != nil {
return err
}
s.Regex = space.Regex
s.compiledRegex = r
s.RetentionPolicy = space.RetentionPolicy
s.ShardDuration = space.ShardDuration
s.ReplicationFactor = space.ReplicationFactor
s.Split = space.Split
return nil
}
19 changes: 19 additions & 0 deletions coordinator/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func init() {
&DropSeriesCommand{},
&CreateShardSpaceCommand{},
&DropShardSpaceCommand{},
&UpdateShardSpaceCommand{},
} {
internalRaftCommands[command.CommandName()] = command
}
Expand Down Expand Up @@ -471,3 +472,21 @@ func (c *DropShardSpaceCommand) Apply(server raft.Server) (interface{}, error) {
err := config.RemoveShardSpace(c.Database, c.Name)
return nil, err
}

type UpdateShardSpaceCommand struct {
ShardSpace *cluster.ShardSpace
}

func NewUpdateShardSpaceCommand(space *cluster.ShardSpace) *UpdateShardSpaceCommand {
return &UpdateShardSpaceCommand{ShardSpace: space}
}

func (c *UpdateShardSpaceCommand) CommandName() string {
return "update_shard_space"
}

func (c *UpdateShardSpaceCommand) Apply(server raft.Server) (interface{}, error) {
config := server.Context().(*cluster.ClusterConfiguration)
err := config.UpdateShardSpace(c.ShardSpace)
return nil, err
}
6 changes: 6 additions & 0 deletions coordinator/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,3 +826,9 @@ func (self *RaftServer) DropShardSpace(database, name string) error {
_, err := self.doOrProxyCommand(command)
return err
}

func (self *RaftServer) UpdateShardSpace(shardSpace *cluster.ShardSpace) error {
command := NewUpdateShardSpaceCommand(shardSpace)
_, err := self.doOrProxyCommand(command)
return err
}
38 changes: 24 additions & 14 deletions integration/single_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -957,10 +957,9 @@ func (self *SingleServerSuite) TestSeriesShouldReturnSorted(c *C) {
}
}

// fix for #886 - https://github.com/influxdb/influxdb/issues/886
func (self *SingleServerSuite) TestShardSpacesCorrectAfterRestart(c *C) {
func (self *SingleServerSuite) TestUpdateShardSpace(c *C) {
client := self.server.GetClient("", c)
db := "test_shard_restart"
db := "test_update_shard_space"
c.Assert(client.CreateDatabase(db), IsNil)
client = self.server.GetClient(db, c)

Expand Down Expand Up @@ -998,23 +997,34 @@ func (self *SingleServerSuite) TestShardSpacesCorrectAfterRestart(c *C) {
c.Assert(series[1].Name, Equals, "space2.foo")
c.Assert(series[1].Points[0][1], Equals, float64(1))

_, err = http.Post("http://localhost:8086/raft/force_compaction?u=root&p=root", "", nil)
space2.Regex = "/^(space2|foo).*/"
err = client.UpdateShardSpace(db, "space2", space2)
c.Assert(err, IsNil)

self.server.Stop()
c.Assert(self.server.Start(), IsNil)
self.server.WaitForServerToStart()
spaces, err = client.GetShardSpaces()
c.Assert(err, IsNil)
c.Assert(self.getSpace(db, "space2", "/^(space2|foo).*/", spaces), NotNil)
c.Assert(self.getSpace(db, "space1", "/.*/", spaces), NotNil)

// foo should now be effectively hidden from us.
series, err = client.Query("select count(val) from /.*/")
c.Assert(err, IsNil)
c.Assert(series, HasLen, 2)
c.Assert(series[0].Name, Equals, "foo")
c.Assert(series, HasLen, 1)
c.Assert(series[0].Name, Equals, "space2.foo")
c.Assert(series[0].Points[0][1], Equals, float64(1))
c.Assert(series[1].Name, Equals, "space2.foo")
c.Assert(series[1].Points[0][1], Equals, float64(1))

spaces, err = client.GetShardSpaces()
self.server.WriteDataToDatabase(db, `
[
{
"name": "foo",
"columns": ["val"],
"points":[[5]]
}
]`, c)

series, err = client.Query("select * from foo")
c.Assert(err, IsNil)
c.Assert(self.getSpace(db, "space2", "/^space2.*/", spaces), NotNil)
c.Assert(self.getSpace(db, "space1", "/.*/", spaces), NotNil)
c.Assert(series, HasLen, 1)
c.Assert(series[0].Name, Equals, "foo")
c.Assert(series[0].Points[0][2], Equals, float64(5))
}

0 comments on commit 6ddfba6

Please sign in to comment.