Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add locking between replicaGCQueue and multiraft.state.createGroup. #2868

Merged
merged 1 commit into from
Oct 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions multiraft/multiraft.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,11 @@ func (s *state) handleMessage(req *RaftMessageRequest) {
// messages (in which case the replicaID comes from the incoming
// message, since nothing is on disk yet).
func (s *state) createGroup(groupID roachpb.RangeID, replicaID roachpb.ReplicaID) error {
locker := s.Storage.GroupLocker()
if locker != nil {
locker.Lock()
defer locker.Unlock()
}
if g, ok := s.groups[groupID]; ok {
if replicaID != 0 && g.replicaID != replicaID {
return util.Errorf("cannot create group %s with replica ID %s; already exists with replica ID %s",
Expand Down
12 changes: 12 additions & 0 deletions multiraft/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ type Storage interface {
ReplicaDescriptor(groupID roachpb.RangeID, replicaID roachpb.ReplicaID) (roachpb.ReplicaDescriptor, error)
ReplicaIDForStore(groupID roachpb.RangeID, storeID roachpb.StoreID) (roachpb.ReplicaID, error)
ReplicasFromSnapshot(snap raftpb.Snapshot) ([]roachpb.ReplicaDescriptor, error)

// GroupLocker returns a lock which (if non-nil) will be acquired
// when a group is being created (which entails multiple calls to
// Storage and StateMachine methods and may race with the removal of
// a previous incarnation of a group). If it returns a non-nil value
// it must return the same value on every call.
GroupLocker() sync.Locker
}

// The StateMachine interface is supplied by the application to manage a persistent
Expand Down Expand Up @@ -111,6 +118,11 @@ func (m *MemoryStorage) ReplicasFromSnapshot(_ raftpb.Snapshot) ([]roachpb.Repli
return nil, nil
}

// GroupLocker implements the Storage interface by returning nil.
func (m *MemoryStorage) GroupLocker() sync.Locker {
return nil
}

// groupWriteRequest represents a set of changes to make to a group.
type groupWriteRequest struct {
replicaID roachpb.ReplicaID
Expand Down
4 changes: 4 additions & 0 deletions multiraft/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func (b *BlockableStorage) ReplicasFromSnapshot(snap raftpb.Snapshot) ([]roachpb
return b.storage.ReplicasFromSnapshot(snap)
}

func (b *BlockableStorage) GroupLocker() sync.Locker {
return b.storage.GroupLocker()
}

type blockableGroupStorage struct {
b *BlockableStorage
s WriteableGroupStorage
Expand Down
1 change: 1 addition & 0 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ type RangeManager interface {

// Range and replica manipulation methods.
LookupReplica(start, end roachpb.RKey) *Replica
GetReplica(rangeID roachpb.RangeID) (*Replica, error)
MergeRange(subsumingRng *Replica, updatedEndKey roachpb.RKey, subsumedRangeID roachpb.RangeID) error
NewRangeDescriptor(start, end roachpb.RKey, replicas []roachpb.ReplicaDescriptor) (*roachpb.RangeDescriptor, error)
NewSnapshot() engine.Engine
Expand Down
26 changes: 21 additions & 5 deletions storage/replica_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package storage

import (
"sync"
"time"

"github.com/cockroachdb/cockroach/client"
Expand Down Expand Up @@ -46,13 +47,15 @@ const (
// ranges that have been rebalanced away from this store.
type replicaGCQueue struct {
*baseQueue
db *client.DB
db *client.DB
locker sync.Locker
}

// newReplicaGCQueue returns a new instance of replicaGCQueue.
func newReplicaGCQueue(db *client.DB, gossip *gossip.Gossip) *replicaGCQueue {
func newReplicaGCQueue(db *client.DB, gossip *gossip.Gossip, locker sync.Locker) *replicaGCQueue {
q := &replicaGCQueue{
db: db,
db: db,
locker: locker,
}
q.baseQueue = newBaseQueue("replicaGC", q, gossip, replicaGCQueueMaxSize)
return q
Expand Down Expand Up @@ -124,8 +127,21 @@ func (q *replicaGCQueue) process(now roachpb.Timestamp, rng *Replica, _ *config.
if err := rng.rm.RemoveReplica(rng); err != nil {
return err
}
// TODO(bdarnell): add some sort of locking to prevent the range
// from being recreated while the underlying data is being destroyed.

// Lock the store to prevent a new replica of the range from being
// added while we're deleting the previous one. We'd really like
// to do this before calling RemoveReplica, but this could
// deadlock with other work on the Store.processRaft goroutine.
// Instead, we check after acquiring the lock to make sure the
// range is still absent.
q.locker.Lock()
defer q.locker.Unlock()

if _, err := rng.rm.GetReplica(desc.RangeID); err == nil {
log.Infof("replica recreated during deletion; aborting deletion")
return nil
}

if err := rng.Destroy(); err != nil {
return err
}
Expand Down
21 changes: 15 additions & 6 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,15 @@ type Store struct {
stopper *stop.Stopper
startedAt int64
nodeDesc *roachpb.NodeDescriptor
initComplete sync.WaitGroup // Signaled by async init tasks
mu sync.RWMutex // Protects variables below...
replicas map[roachpb.RangeID]*Replica // Map of replicas by Range ID
replicasByKey *btree.BTree // btree keyed by ranges end keys.
uninitReplicas map[roachpb.RangeID]*Replica // Map of uninitialized replicas by Range ID
initComplete sync.WaitGroup // Signaled by async init tasks

// Synchronizes raft group creation and range GC.
raftGroupLocker sync.Mutex

mu sync.RWMutex // Protects variables below...
replicas map[roachpb.RangeID]*Replica // Map of replicas by Range ID
replicasByKey *btree.BTree // btree keyed by ranges end keys.
uninitReplicas map[roachpb.RangeID]*Replica // Map of uninitialized replicas by Range ID
}

var _ client.Sender = &Store{}
Expand Down Expand Up @@ -366,7 +370,7 @@ func NewStore(ctx StoreContext, eng engine.Engine, nodeDesc *roachpb.NodeDescrip
s._splitQueue = newSplitQueue(s.db, s.ctx.Gossip)
s.verifyQueue = newVerifyQueue(s.ctx.Gossip, s.ReplicaCount)
s.replicateQueue = makeReplicateQueue(s.ctx.Gossip, s.allocator(), s.ctx.Clock, s.ctx.RebalancingOptions)
s._replicaGCQueue = newReplicaGCQueue(s.db, s.ctx.Gossip)
s._replicaGCQueue = newReplicaGCQueue(s.db, s.ctx.Gossip, s.GroupLocker())
s.scanner.AddQueues(s.gcQueue, s._splitQueue, s.verifyQueue, s.replicateQueue, s._replicaGCQueue)

return s
Expand Down Expand Up @@ -1576,6 +1580,11 @@ func (s *Store) ReplicasFromSnapshot(snap raftpb.Snapshot) ([]roachpb.ReplicaDes
return parsedSnap.RangeDescriptor.Replicas, nil
}

// GroupLocker implements the multiraft.Storage interface.
func (s *Store) GroupLocker() sync.Locker {
return &s.raftGroupLocker
}

// AppliedIndex implements the multiraft.StateMachine interface.
func (s *Store) AppliedIndex(groupID roachpb.RangeID) (uint64, error) {
s.mu.RLock()
Expand Down