From ff9763c2e5817a35d17ca9a25c2c78fb23fde5ec Mon Sep 17 00:00:00 2001 From: Bob Stasyszyn Date: Sat, 17 Mar 2018 11:39:35 -0400 Subject: [PATCH] [FAB-8945] Fix lazyref bugs Fix the following issues: - Close() didn't wait for Go routine to shut down - Idle expiration time wasn't calculated correctly Change-Id: Iabacfad37b9d57bd53491735099f451483dcdc6f Signed-off-by: Bob Stasyszyn --- pkg/util/concurrent/lazyref/lazyref.go | 167 ++++++++++++++++---- pkg/util/concurrent/lazyref/lazyref_test.go | 57 +++---- pkg/util/concurrent/lazyref/options.go | 2 +- 3 files changed, 160 insertions(+), 66 deletions(-) diff --git a/pkg/util/concurrent/lazyref/lazyref.go b/pkg/util/concurrent/lazyref/lazyref.go index ab44a028cf..d6c97ea4c0 100644 --- a/pkg/util/concurrent/lazyref/lazyref.go +++ b/pkg/util/concurrent/lazyref/lazyref.go @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0 package lazyref import ( + "errors" "fmt" "sync" "sync/atomic" @@ -49,6 +50,9 @@ const ( // LastInitialized specifies that the expiration time is calculated // from the time the reference was initialized LastInitialized + + // Refreshing indicates that the reference should be periodically refreshed + Refreshing ) // Reference holds a value that is initialized on first access using the provided @@ -63,7 +67,7 @@ const ( // is closed (via a call to Close) or if it expires. (Note: The Finalizer function // is not called every time the value is refreshed with the periodic refresh feature.) type Reference struct { - sync.RWMutex + lock sync.RWMutex ref unsafe.Pointer lastTimeAccessed unsafe.Pointer initializer Initializer @@ -72,7 +76,10 @@ type Reference struct { expirationProvider ExpirationProvider initialInit time.Duration expiryType ExpirationType - closed chan bool + closed bool + closech chan bool + running bool + wg sync.WaitGroup } // New creates a new reference @@ -80,7 +87,6 @@ func New(initializer Initializer, opts ...Opt) *Reference { lazyRef := &Reference{ initializer: initializer, initialInit: InitOnFirstAccess, - closed: make(chan bool, 1), } for _, opt := range opts { @@ -91,19 +97,23 @@ func New(initializer Initializer, opts ...Opt) *Reference { // This is an expiring reference. After the initializer is // called, set a timer that will call the expiration handler. initializer := lazyRef.initializer + initialExpiration := lazyRef.expirationProvider() lazyRef.initializer = func() (interface{}, error) { value, err := initializer() if err == nil { - lazyRef.startTimer(lazyRef.expirationProvider()) + lazyRef.ensureTimerStarted(initialExpiration) } return value, err } + + lazyRef.closech = make(chan bool, 1) + if lazyRef.expirationHandler == nil { // Set a default expiration handler lazyRef.expirationHandler = lazyRef.resetValue } if lazyRef.initialInit >= 0 { - lazyRef.startTimer(lazyRef.initialInit) + lazyRef.ensureTimerStarted(lazyRef.initialInit) } } @@ -117,8 +127,12 @@ func (r *Reference) Get() (interface{}, error) { return value, nil } - r.Lock() - defer r.Unlock() + r.lock.Lock() + defer r.lock.Unlock() + + if r.closed { + return nil, errors.New("reference is already closed") + } // Try again inside the lock if value, ok := r.get(); ok { @@ -151,16 +165,34 @@ func (r *Reference) MustGet() interface{} { // Close should be called for expiring references and // rerences that specify finalizers. func (r *Reference) Close() { - r.Lock() - defer r.Unlock() + if !r.setClosed() { + // Already closed + return + } + + logger.Info("Closing reference") - logger.Debug("Closing reference") + r.notifyClosing() + r.wg.Wait() + r.finalize() +} - if r.expirationHandler != nil { - r.closed <- true +func (r *Reference) setClosed() bool { + r.lock.Lock() + defer r.lock.Unlock() + if r.closed { + return false } - if r.finalizer != nil { - r.finalizer() + r.closed = true + return true +} + +func (r *Reference) notifyClosing() { + r.lock.Lock() + defer r.lock.Unlock() + if r.running { + logger.Debugf("Sending closed event...") + r.closech <- true } } @@ -173,6 +205,10 @@ func (r *Reference) get() (interface{}, bool) { return (*valueHolder)(p).value, true } +func (r *Reference) isSet() bool { + return atomic.LoadPointer(&r.ref) != nil +} + func (r *Reference) set(value interface{}) { atomic.StorePointer(&r.ref, unsafe.Pointer(&valueHolder{value: value})) } @@ -187,36 +223,106 @@ func (r *Reference) lastAccessed() time.Time { return *(*time.Time)(p) } -func (r *Reference) startTimer(expiration time.Duration) { +func (r *Reference) timerRunning() bool { + r.lock.RLock() + defer r.lock.RUnlock() + return r.running +} + +func (r *Reference) setTimerRunning() bool { + r.lock.Lock() + defer r.lock.Unlock() + + if r.running || r.closed { + logger.Debugf("Cannot start timer since timer is either already running or it is closed") + return false + } + + r.running = true + r.wg.Add(1) + logger.Debugf("Timer started") + return true +} + +func (r *Reference) setTimerStopped() { + r.lock.Lock() + defer r.lock.Unlock() + logger.Debugf("Timer stopped") + r.running = false + r.wg.Done() +} + +func (r *Reference) ensureTimerStarted(initialExpiration time.Duration) { + if r.running { + logger.Debugf("Timer is already running") + return + } + r.setLastAccessed() go func() { - expiry := expiration + if !r.setTimerRunning() { + logger.Debugf("Timer is already running") + return + } + defer r.setTimerStopped() + + logger.Debugf("Starting timer") + + expiry := initialExpiration for { select { - case <-r.closed: + case <-r.closech: + logger.Debugf("Got closed event. Exiting timer.") + return + case <-time.After(expiry): - if r.expiryType == LastInitialized { - r.handleExpiration() - return + expiration := r.expirationProvider() + + if !r.isSet() && r.expiryType != Refreshing { + expiry = expiration + logger.Debugf("Reference is not set. Will expire again in %s", expiry) + continue } - // Check how long it's been since last access - durSinceLastAccess := time.Now().Sub(r.lastAccessed()) - if durSinceLastAccess > expiration { + if r.expiryType == LastInitialized || r.expiryType == Refreshing { + logger.Debugf("Handling expiration...") r.handleExpiration() - return + expiry = expiration + logger.Debugf("... finished handling expiration. Setting expiration to %s", expiry) + } else { + // Check how long it's been since last access + durSinceLastAccess := time.Now().Sub(r.lastAccessed()) + logger.Debugf("Duration since last access is %s", durSinceLastAccess) + if durSinceLastAccess > expiration { + logger.Debugf("... handling expiration...") + r.handleExpiration() + expiry = expiration + logger.Debugf("... finished handling expiration. Setting expiration to %s", expiry) + } else { + // Set another expiry for the remainder of the time + expiry = expiration - durSinceLastAccess + logger.Debugf("Not expired yet. Will check again in %s", expiry) + } } - // Set another expiry for the remainder of the time - expiry = expiration - durSinceLastAccess } } }() } +func (r *Reference) finalize() { + if r.finalizer == nil { + return + } + + r.lock.Lock() + r.finalizer() + r.lock.Unlock() +} + func (r *Reference) handleExpiration() { - r.Lock() - defer r.Unlock() + r.lock.Lock() + defer r.lock.Unlock() logger.Debug("Invoking expiration handler") r.expirationHandler() @@ -240,10 +346,7 @@ func (r *Reference) resetValue() { // lock so there's no need to lock func (r *Reference) refreshValue() { if value, err := r.initializer(); err != nil { - expiration := r.expirationProvider() - logger.Warnf("Error - initializer returned error: %s. Will retry in %s", err, expiration) - // Start the timer so that we can retry - r.startTimer(expiration) + logger.Warnf("Error - initializer returned error: %s. Will retry again later", err) } else { r.set(value) } diff --git a/pkg/util/concurrent/lazyref/lazyref_test.go b/pkg/util/concurrent/lazyref/lazyref_test.go index 26d139b46d..843ee594e3 100644 --- a/pkg/util/concurrent/lazyref/lazyref_test.go +++ b/pkg/util/concurrent/lazyref/lazyref_test.go @@ -216,9 +216,6 @@ func TestGetWithFinalizer(t *testing.T) { func TestExpiring(t *testing.T) { var numTimesInitialized int32 var numTimesFinalized int32 - expectedTimesInitialized := 3 - expectedTimesFinalized := expectedTimesInitialized - expectedTimesValueChanged := expectedTimesInitialized concurrency := 20 iterations := 100 @@ -261,9 +258,9 @@ func TestExpiring(t *testing.T) { } time.Sleep(5 * time.Millisecond) } - if timesValueChanged != expectedTimesValueChanged { + if timesValueChanged <= 1 { mutex.Lock() - errors = append(errors, fmt.Errorf("expecting value to have changed %d time(s) but it changed %d time(s)", expectedTimesValueChanged, timesValueChanged)) + errors = append(errors, fmt.Errorf("expecting value to have changed multiple times but it changed %d time(s)", timesValueChanged)) mutex.Unlock() } }() @@ -272,23 +269,20 @@ func TestExpiring(t *testing.T) { wg.Wait() ref.Close() - if num := atomic.LoadInt32(&numTimesInitialized); num != int32(expectedTimesInitialized) { - t.Fatalf("expecting initializer to be called %d time(s) but was called %d time(s)", expectedTimesInitialized, num) - } - if num := atomic.LoadInt32(&numTimesFinalized); num != int32(expectedTimesFinalized) { - t.Fatalf("expecting finalizer to be called %d time(s) but was called %d time(s)", expectedTimesFinalized, num) - } if len(errors) > 0 { t.Fatalf(errors[0].Error()) } + if num := atomic.LoadInt32(&numTimesInitialized); num <= 1 { + t.Fatalf("expecting initializer to be called multiple times but was called %d time(s)", num) + } + if num := atomic.LoadInt32(&numTimesFinalized); num <= 1 { + t.Fatalf("expecting finalizer to be called multiple times but was called %d time(s)", num) + } } func TestExpiringWithErr(t *testing.T) { var numTimesInitialized int32 var numTimesFinalized int32 - expectedTimesInitialized := 5 - expectedTimesFinalized := expectedTimesInitialized - 1 - expectedTimesValueChanged := expectedTimesInitialized - 1 concurrency := 20 iterations := 100 @@ -337,9 +331,9 @@ func TestExpiringWithErr(t *testing.T) { } time.Sleep(50 * time.Millisecond) } - if timesValueChanged != expectedTimesValueChanged { + if timesValueChanged <= 1 { mutex.Lock() - errors = append(errors, fmt.Errorf("expecting value to have changed %d time(s) but it changed %d time(s)", expectedTimesValueChanged, timesValueChanged)) + errors = append(errors, fmt.Errorf("expecting value to have changed multiple times but it changed %d time(s)", timesValueChanged)) mutex.Unlock() } }() @@ -351,20 +345,17 @@ func TestExpiringWithErr(t *testing.T) { if len(errors) > 0 { t.Fatalf(errors[0].Error()) } - if num := atomic.LoadInt32(&numTimesInitialized); num != int32(expectedTimesInitialized) { - t.Fatalf("expecting initializer to be called %d time(s) but was called %d time(s)", expectedTimesInitialized, num) + if num := atomic.LoadInt32(&numTimesInitialized); num <= 1 { + t.Fatalf("expecting initializer to be called multiple times but was called %d time(s)", num) } - if num := atomic.LoadInt32(&numTimesFinalized); num != int32(expectedTimesFinalized) { - t.Fatalf("expecting finalizer to be called %d time(s) but was called %d time(s)", expectedTimesFinalized, num) + if num := atomic.LoadInt32(&numTimesFinalized); num <= 1 { + t.Fatalf("expecting finalizer to be called multiple times but was called %d time(s)", num) } } func TestExpiringOnIdle(t *testing.T) { var numTimesInitialized int32 var numTimesFinalized int32 - expectedTimesInitialized := 2 - expectedTimesFinalized := expectedTimesInitialized - expectedTimesValueChanged := 2 iterations := 20 seq := 0 @@ -381,7 +372,7 @@ func TestExpiringOnIdle(t *testing.T) { atomic.AddInt32(&numTimesFinalized, 1) }, ), - WithIdleExpiration(time.Second), + WithIdleExpiration(100*time.Millisecond), ) previousValue := "" @@ -392,7 +383,7 @@ func TestExpiringOnIdle(t *testing.T) { previousValue = value.(string) timesValueChanged++ } - time.Sleep(10 * time.Millisecond) + time.Sleep(time.Duration(20*j) * time.Millisecond) } // Wait for the ref to expire @@ -403,17 +394,17 @@ func TestExpiringOnIdle(t *testing.T) { timesValueChanged++ } - if timesValueChanged != expectedTimesValueChanged { - t.Fatalf("expecting value to have changed %d time(s) but it changed %d time(s)", expectedTimesValueChanged, timesValueChanged) + if timesValueChanged <= 1 { + t.Fatalf("expecting value to have changed multiple times but it changed %d time(s)", timesValueChanged) } ref.Close() - if num := atomic.LoadInt32(&numTimesInitialized); num != int32(expectedTimesInitialized) { - t.Fatalf("expecting initializer to be called %d time(s) but was called %d time(s)", expectedTimesInitialized, num) + if num := atomic.LoadInt32(&numTimesInitialized); num <= 1 { + t.Fatalf("expecting initializer to be called multiple times but was called %d time(s)", num) } - if num := atomic.LoadInt32(&numTimesFinalized); num != int32(expectedTimesFinalized) { - t.Fatalf("expecting finalizer to be called %d time(s) but was called %d time(s)", expectedTimesFinalized, num) + if num := atomic.LoadInt32(&numTimesFinalized); num <= 1 { + t.Fatalf("expecting finalizer to be called multiple times but was called %d time(s)", num) } } @@ -423,7 +414,7 @@ func TestProactiveRefresh(t *testing.T) { expectedTimesFinalized := 1 concurrency := 20 - iterations := 100 + iterations := 50 seq := 0 ref := New( @@ -443,7 +434,7 @@ func TestProactiveRefresh(t *testing.T) { t.Logf("Finalizer called") }, ), - WithRefreshInterval(InitOnFirstAccess, 500*time.Millisecond), + WithRefreshInterval(InitImmediately, 500*time.Millisecond), ) var wg sync.WaitGroup diff --git a/pkg/util/concurrent/lazyref/options.go b/pkg/util/concurrent/lazyref/options.go index c73db46e79..bd79fa8183 100644 --- a/pkg/util/concurrent/lazyref/options.go +++ b/pkg/util/concurrent/lazyref/options.go @@ -66,7 +66,7 @@ const ( func WithRefreshInterval(initialInit, refreshPeriod time.Duration) Opt { return func(ref *Reference) { ref.expirationHandler = ref.refreshValue - ref.expiryType = LastInitialized + ref.expiryType = Refreshing ref.expirationProvider = NewSimpleExpirationProvider(refreshPeriod) ref.initialInit = initialInit }