Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not reuse a consul lock #4353

Merged
merged 3 commits into from
Dec 8, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 17 additions & 18 deletions go/vt/topo/consultopo/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,8 @@ import (

// NewMasterParticipation is part of the topo.Server interface
func (s *Server) NewMasterParticipation(name, id string) (topo.MasterParticipation, error) {
// Create the lock here.
electionPath := path.Join(s.root, electionsPath, name)
l, err := s.client.LockOpts(&api.LockOptions{
Key: electionPath,
Value: []byte(id),
})
if err != nil {
return nil, err
}

return &consulMasterParticipation{
s: s,
lock: l,
name: name,
id: id,
stop: make(chan struct{}),
Expand All @@ -56,9 +45,6 @@ type consulMasterParticipation struct {
// s is our parent consul topo Server
s *Server

// lock is the *api.Lock structure we're going to use.
lock *api.Lock

// name is the name of this MasterParticipation
name string

Expand All @@ -74,6 +60,16 @@ type consulMasterParticipation struct {

// WaitForMastership is part of the topo.MasterParticipation interface.
func (mp *consulMasterParticipation) WaitForMastership() (context.Context, error) {

electionPath := path.Join(mp.s.root, electionsPath, mp.name)
l, err := mp.s.client.LockOpts(&api.LockOptions{
Key: electionPath,
Value: []byte(mp.id),
})
if err != nil {
return nil, err
}

// If Stop was already called, mp.done is closed, so we are interrupted.
select {
case <-mp.done:
Expand All @@ -82,7 +78,7 @@ func (mp *consulMasterParticipation) WaitForMastership() (context.Context, error
}

// Try to lock until mp.stop is closed.
lost, err := mp.lock.Lock(mp.stop)
lost, err := l.Lock(mp.stop)
if err != nil {
// We can't lock. See if it was because we got canceled.
select {
Expand All @@ -93,19 +89,22 @@ func (mp *consulMasterParticipation) WaitForMastership() (context.Context, error
return nil, err
}

// We have the lock, keep mastership until we loose it.
// We have the lock, keep mastership until we lose it.
lockCtx, lockCancel := context.WithCancel(context.Background())
go func() {
select {
case <-lost:
// We lost the lock, nothing to do but lockCancel().
lockCancel()
// We could have lost the lock. Per consul API, explicitly call Unlock to make sure that session will not be renewed.
if err := l.Unlock(); err != nil {
log.Errorf("master election(%v) Unlock failed: %v", mp.name, err)
}
case <-mp.stop:
// Stop was called. We stop the context first,
// so the running process is not thinking it
// is the master any more, then we unlock.
lockCancel()
if err := mp.lock.Unlock(); err != nil {
if err := l.Unlock(); err != nil {
log.Errorf("master election(%v) Unlock failed: %v", mp.name, err)
}
close(mp.done)
Expand Down