Skip to content

Commit

Permalink
apply drop server; work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
levicook committed Apr 19, 2015
1 parent ece6247 commit c46837d
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 6 deletions.
6 changes: 6 additions & 0 deletions influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@ var (
// ErrServerOpen is returned when opening an already open server.
ErrServerOpen = errors.New("server already open")

// ErrDropServerConflict is returned when removing the server would result in data loss
ErrDropServerConflict = errors.New("removing this server would result in data loss")

// ErrServerClosed is returned when closing an already closed server.
ErrServerClosed = errors.New("server already closed")

// ErrServernotFound is returned when removing/adding a server and it is not found
ErrServerNotFound = errors.New("server not found")

// ErrPathRequired is returned when opening a server without a path.
ErrPathRequired = errors.New("path required")

Expand Down
68 changes: 65 additions & 3 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (s *Server) load() error {
s.shards[sh.ID] = sh

// Only open shards owned by the server.
if !sh.HasDataNodeID(s.id) {
if !sh.hasDataNodeID(s.id) {
continue
}

Expand Down Expand Up @@ -969,6 +969,66 @@ func (s *Server) CreateDatabaseIfNotExists(name string) error {
if err := s.CreateDatabase(name); err != nil && err != ErrDatabaseExists {
return err
}

return nil
}

func (s *Server) applyDropServer(m *messaging.Message) error {
var c dropServerCommand
mustUnmarshalJSON(m.Data, &c)

s.mu.Lock()
defer s.mu.Unlock()

// am I being deleted?

// don't drop the last data node...
if len(s.dataNodes) <= 1 {
return ErrDropServerConflict
}

// walk shards; track shards marked for deletion
var shardIDsToRemove []uint64
for _, shard := range s.shards {
if shard.hasDataNodeID(c.NodeID) {
shardIDsToRemove = append(shardIDsToRemove, c.NodeID)
}
}

// walk shard groups (via databases) and make sure we're not going to lose data
for _, database := range s.databases {
for _, policy := range database.policies {
if policy.ReplicaN > 1 {
continue
}

var matchedShardGroupCount int
for _, shardGroup := range policy.shardGroups {

var matchedShardCount int
// am i going to delete all of the shards in this group?
for _, shard := range shardGroup.Shards {
for _, shardID := range shardIDsToRemove {
if shard.ID == shardID {
matchedShardCount++
}
}
}

if matchedShardCount == len(shardGroup.Shards) {
matchedShardGroupCount++
}
}

if matchedShardGroupCount == len(policy.shardGroups) {
return ErrDropServerConflict
}
}
}

// finally, remove the dataNode from server memory
// TODO delete(s.dataNodes, c.NodeID)

return nil
}

Expand Down Expand Up @@ -1162,7 +1222,7 @@ func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err err
// Open shards assigned to this server.
for _, sh := range g.Shards {
// Ignore if this server is not assigned.
if !sh.HasDataNodeID(s.id) {
if !sh.hasDataNodeID(s.id) {
continue
}

Expand Down Expand Up @@ -1213,7 +1273,7 @@ func (s *Server) applyDeleteShardGroup(m *messaging.Message) (err error) {

for _, shard := range g.Shards {
// Ignore shards not on this server.
if !shard.HasDataNodeID(s.id) {
if !shard.hasDataNodeID(s.id) {
continue
}

Expand Down Expand Up @@ -3370,6 +3430,8 @@ func (s *Server) processor(conn MessagingConn, done chan struct{}) {
// Process message.
var err error
switch m.Type {
case dropServerMessageType:
err = s.applyDropServer(m)
case createDataNodeMessageType:
err = s.applyCreateDataNode(m)
case deleteDataNodeMessageType:
Expand Down
27 changes: 24 additions & 3 deletions shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,37 @@ func (s *Shard) sync(index uint64) error {
}
}

// HasDataNodeID return true if the data node owns the shard.
func (s *Shard) HasDataNodeID(id uint64) bool {
for _, dataNodeID := range s.DataNodeIDs {
func (s *Shard) dropDataNodeID(id uint64) {
s.mu.Lock()
defer s.mu.Unlock()

var dataNodeIds []uint64

for _, dataNodeId := range s.DataNodeIDs {
if id != dataNodeId {
dataNodeIds = append(dataNodeIds, dataNodeId)
}
}

s.DataNodeIDs = dataNodeIds
}

func (s *Shard) hasDataNodeID(id uint64) bool {
for _, dataNodeID := range s.readDataNodeIDs() {
if dataNodeID == id {
return true
}
}
return false
}

func (s *Shard) readDataNodeIDs() []uint64 {
s.mu.RLock()
defer s.mu.RUnlock()

return s.DataNodeIDs
}

// readSeries reads encoded series data from a shard.
func (s *Shard) readSeries(seriesID uint64, timestamp int64) (values []byte, err error) {
err = s.store.View(func(tx *bolt.Tx) error {
Expand Down

0 comments on commit c46837d

Please sign in to comment.