Skip to content

Commit

Permalink
Add experimental locking options (#130)
Browse files Browse the repository at this point in the history
* Add nested and map lockers

* Add constructors

* Wire in session manager

* Update session logic in locker

* Race condition patch

* Add options to nested locker

* Add cool-off locker

* Change to TryLock

* Switch to TryLock

* Remove unused interface

* Refactor map locker

* Refactor session manager

* Linter

* Add options

* Linter

* Add options

* Wire in useTryLock
  • Loading branch information
jswarren4 authored Feb 24, 2022
1 parent 66e8c20 commit 800301d
Show file tree
Hide file tree
Showing 12 changed files with 695 additions and 236 deletions.
134 changes: 20 additions & 114 deletions concurrency/session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package concurrency

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -15,18 +14,11 @@ const (
)

type SessionManager struct {
// These fields must not be accessed by more than one
// goroutine.
// Session singleton that is refreshed if it closes.
session *Session
// Channel used by the session to communicate that it is closed.
sessionDone <-chan struct{}
// Session singleton that is cleared if it closes.
session *Session
sessionMutex sync.Mutex

logger *zap.Logger
retryDelay time.Duration
get chan sessionManagerGetRequest
close chan struct{}
closeOnce sync.Once
newSession func() (*Session, error)
}

Expand All @@ -38,118 +30,32 @@ func NewSessionManager(client *clientv3.Client, logger *zap.Logger) *SessionMana
func newSessionManager(client *clientv3.Client, retryDelay time.Duration, logger *zap.Logger) *SessionManager {
sm := &SessionManager{
logger: logger,
retryDelay: retryDelay,
get: make(chan sessionManagerGetRequest),
close: make(chan struct{}),
newSession: func() (*Session, error) { return NewSession(client) },
}
go sm.run()
return sm
}

// GetSession provides the singleton session or times out if a session
// cannot be obtained. The context needs to have a timeout, otherwise it
// is possible for the calling goroutine to hang.
func (sm *SessionManager) GetSession(ctx context.Context) (*Session, error) {
request := sessionManagerGetRequest{
resp: make(chan *Session),
}
go func() {
sm.get <- request
}()
select {
case <-ctx.Done():
return nil, ctx.Err()
case session := <-request.resp:
return session, nil
}
}

// Close closes the manager, causing the current session to be closed
// and no new ones to be created.
func (sm *SessionManager) Close() {
sm.closeOnce.Do(func() {
close(sm.close)
})
}

func (sm *SessionManager) resetSession() {
sm.logger.Info("Initializing session")
session, err := sm.newSession()
for err != nil {
sm.logger.Error("Error getting session", zap.Error(err))
stopRetry := false
func() {
ctx, cancel := context.WithTimeout(context.Background(), sm.retryDelay)
defer cancel()
select {
case <-ctx.Done():
// Let pass so retry can be attempted.
case <-sm.close:
stopRetry = true
}
}()
if stopRetry {
return
}
session, err = sm.newSession()
}
sm.session = session
sm.sessionDone = session.Done()
sm.logger.Info("new session initialized", zap.String("lease_id", fmt.Sprintf("%x", sm.session.Lease())))
}

func (sm *SessionManager) run() {
// Thread safety is handled by controlling all activity
// through a single goroutine that interacts with other
// goroutines via channels.
sm.logger.Info("Starting session manager")
run:
for {
// If the session manager should be closed, give
// that the highest priority.
select {
case <-sm.close:
sm.logger.Info("Closing session manager")
if sm.session != nil {
// This may fail the session was already closed
// due to some external cause, like etcd connectivity
// issues. The result is just a log message.
sm.session.Close()
}
break run
default:
}
switch {
case sm.sessionDone == nil:
sm.resetSession()
continue
}
// If the current session has closed,
// prioritize creating a new one ahead
// of remaining concerns.
select {
case <-sm.sessionDone:
// Create new session
sm.resetSession()
continue
default:
}
select {
case <-sm.close:
// Let the check above take care of cleanup
continue
case <-sm.sessionDone:
// Let the check above take care of creating a new session
continue
case req := <-sm.get:
// Get the current session
req.resp <- sm.session
sm.sessionMutex.Lock()
defer sm.sessionMutex.Unlock()
if sm.session == nil {
var err error
sm.session, err = sm.newSession()
if err != nil {
return nil, err
}
sessionDone := sm.session.Done()
// Start goroutine to check for closed session.
go func() {
<-sessionDone
// Clear out dead session
sm.sessionMutex.Lock()
defer sm.sessionMutex.Unlock()
sm.session = nil
}()
}
sm.logger.Info("Session manager closed")
}

type sessionManagerGetRequest struct {
resp chan *Session
return sm.session, nil
}
81 changes: 5 additions & 76 deletions concurrency/session_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,19 @@ package concurrency

import (
"context"
"errors"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"

"github.com/IBM-Cloud/go-etcd-rules/rules/teststore"
)

func Test_SessionManager(t *testing.T) {
_, client := teststore.InitV3Etcd(t)
defer client.Close()
lgr, err := zap.NewDevelopment()
require.NoError(t, err)
mgr := newSessionManager(client, 0, lgr)
Expand All @@ -41,84 +39,15 @@ func Test_SessionManager(t *testing.T) {
wg.Wait()
}

func Test_SessionManager_Close(t *testing.T) {
lgr, err := zap.NewDevelopment()
require.NoError(t, err)
_, goodClient := teststore.InitV3Etcd(t)
badClient, _ := clientv3.New(clientv3.Config{
Endpoints: []string{"http://127.0.0.1:2377"},
})
testCases := []struct {
name string

client *clientv3.Client
newSession func() (*Session, error)
}{
{
name: "ok",
client: goodClient,
newSession: func() (*Session, error) {
return NewSession(goodClient)
},
},
{
name: "bad",
client: badClient,
newSession: func() (*Session, error) {
return nil, errors.New("bad")
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mgr := &SessionManager{
logger: lgr,
retryDelay: time.Millisecond,
get: make(chan sessionManagerGetRequest),
close: make(chan struct{}),
newSession: tc.newSession,
}
go mgr.run()
var wg sync.WaitGroup
// Use a lot of goroutines to ensure any concurrency
// issues are caught by race condition checks.
for i := 0; i < 1000; i++ {
// Make a copy for the goroutine
localI := i
wg.Add(1)
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
session, _ := mgr.GetSession(ctx)
if localI%10 == 0 {
// Disrupt things by closing sessions, forcing
// the manager to create new ones.
if session != nil {
_ = session.Close()
}
}
if localI%25 == 0 {
mgr.Close()
}
wg.Done()
}()
}
wg.Wait()
})
}
}

func Test_NewSessionManager(t *testing.T) {
_, client := teststore.InitV3Etcd(t)
lgr, err := zap.NewDevelopment()
require.NoError(t, err)
mgr := NewSessionManager(client, lgr)
assert.Equal(t, lgr, mgr.logger)
assert.Equal(t, sessionManagerRetryDelay, mgr.retryDelay)
assert.NotNil(t, mgr.get)
assert.NotNil(t, mgr.close)
session, err := mgr.newSession()
require.NoError(t, err)
session.Close()
mgr.Close()
if assert.NoError(t, err) {
session.Close()
}
assert.NoError(t, client.Close())
}
27 changes: 25 additions & 2 deletions rules/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.uber.org/zap"
"golang.org/x/net/context"

"github.com/IBM-Cloud/go-etcd-rules/concurrency"
"github.com/IBM-Cloud/go-etcd-rules/rules/lock"
)

Expand Down Expand Up @@ -112,8 +113,30 @@ func newV3Engine(logger *zap.Logger, cl *clientv3.Client, options ...EngineOptio
logger: logger,
}
}
baseEtcdLocker := lock.NewV3Locker(cl, opts.lockAcquisitionTimeout)
var baseEtcdLocker lock.RuleLocker
if opts.useSharedLockSession {
sessionManager := concurrency.NewSessionManager(cl, logger)
baseEtcdLocker = lock.NewSessionLocker(sessionManager.GetSession, opts.lockAcquisitionTimeout, false, opts.useTryLock)
} else {
baseEtcdLocker = lock.NewV3Locker(cl, opts.lockAcquisitionTimeout, opts.useTryLock)
}
metricsEtcdLocker := lock.WithMetrics(baseEtcdLocker, "etcd")
var baseLocker lock.RuleLocker
if opts.useSharedLockSession {
baseMapLocker := lock.NewMapLocker()
metricsMapLocker := lock.WithMetrics(baseMapLocker, "map")
baseLocker = lock.NewNestedLocker(metricsMapLocker, metricsEtcdLocker)
} else {
baseLocker = metricsEtcdLocker
}
var finalLocker lock.RuleLocker
if opts.lockCoolOff == 0 {
finalLocker = baseLocker
} else {
coolOffLocker := lock.NewCoolOffLocker(opts.lockCoolOff)
metricsCoolOffLocker := lock.WithMetrics(coolOffLocker, "cooloff")
finalLocker = lock.NewNestedLocker(metricsCoolOffLocker, baseLocker)
}
eng := v3Engine{
baseEngine: baseEngine{
keyProc: &keyProc,
Expand All @@ -122,7 +145,7 @@ func newV3Engine(logger *zap.Logger, cl *clientv3.Client, options ...EngineOptio
options: opts,
ruleLockTTLs: map[int]int{},
ruleMgr: ruleMgr,
locker: metricsEtcdLocker,
locker: finalLocker,
callbackListener: cbListener,
},
keyProc: keyProc,
Expand Down
69 changes: 69 additions & 0 deletions rules/lock/cooloff_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package lock

import (
"fmt"
"sync"
"time"
)

const (
coolOffErrFormat = "cooloff expires in %s"
)

// NewCoolOffLocker creates a simple locker that will prevent a lock from
// being obtained if a previous attempt (successful or not) was made within
// the specified expiration period. It is intended to be used with other lockers
// to prevent excessive locking using more expensive resources (e.g. etcd). It is
// theoretically possible for two callers to obtain the same lock, if the cooloff
// period expires before the first caller releases the lock; therefore this locker
// needs to be used with a nested locker to prevent two callers from accessing the
// same protected resource.
func NewCoolOffLocker(expiration time.Duration) RuleLocker {
locker := coolOffLocker{
coolOffDuration: expiration,
locks: make(map[string]time.Time),
mutex: &sync.Mutex{},
}
return locker
}

type coolOffLocker struct {
locks map[string]time.Time
mutex *sync.Mutex
coolOffDuration time.Duration
}

func (col coolOffLocker) Lock(key string, options ...Option) (RuleLock, error) {
col.mutex.Lock()
defer col.mutex.Unlock()
now := time.Now()
// Remove any expired keys
var toDelete []string
for k, v := range col.locks {
if now.After(v) {
toDelete = append(toDelete, k)
}
}
for _, key := range toDelete {
delete(col.locks, key)
}
var err error
if _, ok := col.locks[key]; ok {
err = fmt.Errorf(coolOffErrFormat, col.coolOffDuration)
}
// Failed attempts to get the lock should also update the cooloff,
// so always add the key regardless of success or failure.
col.locks[key] = now.Add(col.coolOffDuration)

if err != nil {
return nil, err
}
return coolOffLock{}, nil
}

type coolOffLock struct {
}

func (coolOffLock) Unlock() error {
return nil
}
Loading

0 comments on commit 800301d

Please sign in to comment.