Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change how InstanceManager manages instance caching #251

Merged
merged 7 commits into from
Feb 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions backend/instancemgmt/instance_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,26 +65,41 @@ func New(provider InstanceProvider) InstanceManager {

return &instanceManager{
provider: provider,
cache: map[interface{}]CachedInstance{},
cache: sync.Map{},
locker: newLocker(),
}
}

type instanceManager struct {
rwMutex sync.RWMutex
locker *locker
provider InstanceProvider
cache map[interface{}]CachedInstance
cache sync.Map
}

func (im *instanceManager) Get(pluginContext backend.PluginContext) (Instance, error) {
cacheKey, err := im.provider.GetKey(pluginContext)
if err != nil {
return nil, err
}
im.rwMutex.RLock()
ci, ok := im.cache[cacheKey]
im.rwMutex.RUnlock()
// Double-checked locking for update/create criteria
im.locker.RLock(cacheKey)
item, ok := im.cache.Load(cacheKey)
im.locker.RUnlock(cacheKey)

if ok {
ci := item.(CachedInstance)
needsUpdate := im.provider.NeedsUpdate(pluginContext, ci)

if !needsUpdate {
return ci.instance, nil
}
}

im.locker.Lock(cacheKey)
defer im.locker.Unlock(cacheKey)

if item, ok := im.cache.Load(cacheKey); ok {
ci := item.(CachedInstance)
needsUpdate := im.provider.NeedsUpdate(pluginContext, ci)

if !needsUpdate {
Expand All @@ -96,17 +111,14 @@ func (im *instanceManager) Get(pluginContext backend.PluginContext) (Instance, e
}
}

im.rwMutex.Lock()
defer im.rwMutex.Unlock()

instance, err := im.provider.NewInstance(pluginContext)
if err != nil {
return nil, err
}
im.cache[cacheKey] = CachedInstance{
im.cache.Store(cacheKey, CachedInstance{
PluginContext: pluginContext,
instance: instance,
}
})

return instance, nil
}
Expand Down
149 changes: 146 additions & 3 deletions backend/instancemgmt/instance_manager_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package instancemgmt

import (
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -59,17 +61,155 @@ func TestInstanceManager(t *testing.T) {
})
}

func TestInstanceManagerConcurrency(t *testing.T) {
t.Run("Check possible race condition issues when initially creating instance", func(t *testing.T) {
tip := &testInstanceProvider{}
im := New(tip)
ctx := backend.PluginContext{
OrgID: 1,
AppInstanceSettings: &backend.AppInstanceSettings{
Updated: time.Now(),
},
}
var wg sync.WaitGroup
wg.Add(10)

var createdInstances []*testInstance
mutex := new(sync.Mutex)
// Creating new instances because of updated context
for i := 0; i < 10; i++ {
go func() {
instance, _ := im.Get(ctx)
mutex.Lock()
defer mutex.Unlock()
// Collect all instances created
createdInstances = append(createdInstances, instance.(*testInstance))
wg.Done()
}()
}
wg.Wait()

t.Run("All created instances should be either disposed or exist in cache for later disposing", func(t *testing.T) {
cachedInstance, _ := im.Get(ctx)
for _, instance := range createdInstances {
if cachedInstance.(*testInstance) != instance && instance.disposedTimes < 1 {
require.FailNow(t, "Found lost reference to un-disposed instance")
}
}
})
})

t.Run("Check possible race condition issues when re-creating instance on settings update", func(t *testing.T) {
initialCtx := backend.PluginContext{
OrgID: 1,
AppInstanceSettings: &backend.AppInstanceSettings{
Updated: time.Now(),
},
}
tip := &testInstanceProvider{}
im := New(tip)
// Creating initial instance with old contexts
instanceToDispose, _ := im.Get(initialCtx)

updatedCtx := backend.PluginContext{
OrgID: 1,
AppInstanceSettings: &backend.AppInstanceSettings{
Updated: time.Now(),
},
}

var wg sync.WaitGroup
wg.Add(10)

var createdInstances []*testInstance
mutex := new(sync.Mutex)
// Creating new instances because of updated context
for i := 0; i < 10; i++ {
go func() {
instance, _ := im.Get(updatedCtx)
mutex.Lock()
defer mutex.Unlock()
// Collect all instances created during concurrent update
createdInstances = append(createdInstances, instance.(*testInstance))
wg.Done()
}()
}
wg.Wait()

t.Run("Initial instance should be disposed only once", func(t *testing.T) {
require.Equal(t, int64(1), instanceToDispose.(*testInstance).disposedTimes, "Instance should be disposed only once")
})
t.Run("All created instances should be either disposed or exist in cache for later disposing", func(t *testing.T) {
cachedInstance, _ := im.Get(updatedCtx)
for _, instance := range createdInstances {
if cachedInstance.(*testInstance) != instance && instance.disposedTimes < 1 {
require.FailNow(t, "Found lost reference to un-disposed instance")
}
}
})
})

t.Run("Long recreation of instance should not affect datasources with different ID", func(t *testing.T) {
const delay = time.Millisecond * 50
ctx := backend.PluginContext{
OrgID: 1,
AppInstanceSettings: &backend.AppInstanceSettings{
Updated: time.Now(),
},
}
if testing.Short() {
t.Skip("Tests with Sleep")
}

tip := &testInstanceProvider{delay: delay}
im := New(tip)
// Creating instance with id#1 in cache
_, err := im.Get(ctx)
require.NoError(t, err)
var wg1, wg2 sync.WaitGroup
wg1.Add(1)
wg2.Add(1)
go func() {
// Creating instance with id#2 in cache
wg1.Done()
_, err := im.Get(backend.PluginContext{
OrgID: 2,
AppInstanceSettings: &backend.AppInstanceSettings{
Updated: time.Now(),
},
})
require.NoError(t, err)
wg2.Done()
}()
// Waiting before thread 2 starts to get the instance, so thread 2 could qcquire the lock before thread 1
wg1.Wait()
// Getting existing instance with id#1 from cache
start := time.Now()
_, err = im.Get(ctx)
elapsed := time.Since(start)
require.NoError(t, err)
// Waiting before thread 2 finished to get the instance
wg2.Wait()
if elapsed > delay {
require.Fail(t, "Instance should be retrieved from cache without delay")
}
})
}

type testInstance struct {
orgID int64
updated time.Time
disposed bool
orgID int64
updated time.Time
disposed bool
disposedTimes int64
}

func (ti *testInstance) Dispose() {
ti.disposed = true
atomic.AddInt64(&ti.disposedTimes, 1)
}

type testInstanceProvider struct {
delay time.Duration
}

func (tip *testInstanceProvider) GetKey(pluginContext backend.PluginContext) (interface{}, error) {
Expand All @@ -83,6 +223,9 @@ func (tip *testInstanceProvider) NeedsUpdate(pluginContext backend.PluginContext
}

func (tip *testInstanceProvider) NewInstance(pluginContext backend.PluginContext) (Instance, error) {
if tip.delay > 0 {
time.Sleep(tip.delay)
}
return &testInstance{
orgID: pluginContext.OrgID,
updated: pluginContext.AppInstanceSettings.Updated,
Expand Down
85 changes: 85 additions & 0 deletions backend/instancemgmt/locker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package instancemgmt

import (
"fmt"
"sync"
)

// locker is a named reader/writer mutual exclusion lock.
// The lock for each particular key can be held by an arbitrary number of readers or a single writer.
type locker struct {
locks map[interface{}]*sync.RWMutex
locksRW *sync.RWMutex
}

func newLocker() *locker {
return &locker{
locks: make(map[interface{}]*sync.RWMutex),
locksRW: new(sync.RWMutex),
}
}

// Lock locks named rw mutex with specified key for writing.
// If the lock with the same key is already locked for reading or writing,
// Lock blocks until the lock is available.
func (lkr *locker) Lock(key interface{}) {
lk, ok := lkr.getLock(key)
if !ok {
lk = lkr.newLock(key)
}
lk.Lock()
}

// Unlock unlocks named rw mutex with specified key for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
func (lkr *locker) Unlock(key interface{}) {
lk, ok := lkr.getLock(key)
if !ok {
panic(fmt.Errorf("lock for key '%s' not initialized", key))
santriseus marked this conversation as resolved.
Show resolved Hide resolved
}
lk.Unlock()
}

// RLock locks named rw mutex with specified key for reading.
//
// It should not be used for recursive read locking for the same key; a blocked Lock
// call excludes new readers from acquiring the lock. See the
// documentation on the golang RWMutex type.
func (lkr *locker) RLock(key interface{}) {
lk, ok := lkr.getLock(key)
if !ok {
lk = lkr.newLock(key)
}
lk.RLock()
}

// RUnlock undoes a single RLock call for specified key;
// it does not affect other simultaneous readers of locker for specified key.
// It is a run-time error if locker for specified key is not locked for reading
func (lkr *locker) RUnlock(key interface{}) {
lk, ok := lkr.getLock(key)
if !ok {
panic(fmt.Errorf("lock for key '%s' not initialized", key))
santriseus marked this conversation as resolved.
Show resolved Hide resolved
}
lk.RUnlock()
}

func (lkr *locker) newLock(key interface{}) *sync.RWMutex {
lkr.locksRW.Lock()
defer lkr.locksRW.Unlock()

if lk, ok := lkr.locks[key]; ok {
return lk
}
lk := new(sync.RWMutex)
lkr.locks[key] = lk
return lk
}

func (lkr *locker) getLock(key interface{}) (*sync.RWMutex, bool) {
lkr.locksRW.RLock()
defer lkr.locksRW.RUnlock()

lock, ok := lkr.locks[key]
return lock, ok
}
63 changes: 63 additions & 0 deletions backend/instancemgmt/locker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package instancemgmt

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestLocker(t *testing.T) {
if testing.Short() {
t.Skip("Tests with Sleep")
}
const notUpdated = "not_updated"
const atThread1 = "at_thread_1"
const atThread2 = "at_thread_2"
t.Run("Should lock for same keys", func(t *testing.T) {
updated := notUpdated
locker := newLocker()
locker.Lock(1)
var wg sync.WaitGroup
wg.Add(1)
defer func() {
locker.Unlock(1)
wg.Wait()
}()

go func() {
locker.RLock(1)
defer func() {
locker.RUnlock(1)
wg.Done()
}()
require.Equal(t, atThread1, updated, "Value should be updated in different thread")
updated = atThread2
}()
time.Sleep(time.Millisecond * 10)
marefr marked this conversation as resolved.
Show resolved Hide resolved
require.Equal(t, notUpdated, updated, "Value should not be updated in different thread")
updated = atThread1
})

t.Run("Should not lock for different keys", func(t *testing.T) {
updated := notUpdated
locker := newLocker()
locker.Lock(1)
defer locker.Unlock(1)
var wg sync.WaitGroup
wg.Add(1)
go func() {
locker.RLock(2)
defer func() {
locker.RUnlock(2)
wg.Done()
}()
require.Equal(t, notUpdated, updated, "Value should not be updated in different thread")
updated = atThread2
}()
wg.Wait()
require.Equal(t, atThread2, updated, "Value should be updated in different thread")
updated = atThread1
})
}