Skip to content

Commit

Permalink
[FAB-8945] Fix lazyref bugs
Browse files Browse the repository at this point in the history
Fix the following issues:
- Close() didn't wait for Go routine to
  shut down
- Idle expiration time wasn't calculated
  correctly

Change-Id: Iabacfad37b9d57bd53491735099f451483dcdc6f
Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
  • Loading branch information
bstasyszyn committed Mar 17, 2018
1 parent cf7cd8a commit ff9763c
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 66 deletions.
167 changes: 135 additions & 32 deletions pkg/util/concurrent/lazyref/lazyref.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package lazyref

import (
"errors"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -49,6 +50,9 @@ const (
// LastInitialized specifies that the expiration time is calculated
// from the time the reference was initialized
LastInitialized

// Refreshing indicates that the reference should be periodically refreshed
Refreshing
)

// Reference holds a value that is initialized on first access using the provided
Expand All @@ -63,7 +67,7 @@ const (
// is closed (via a call to Close) or if it expires. (Note: The Finalizer function
// is not called every time the value is refreshed with the periodic refresh feature.)
type Reference struct {
sync.RWMutex
lock sync.RWMutex
ref unsafe.Pointer
lastTimeAccessed unsafe.Pointer
initializer Initializer
Expand All @@ -72,15 +76,17 @@ type Reference struct {
expirationProvider ExpirationProvider
initialInit time.Duration
expiryType ExpirationType
closed chan bool
closed bool
closech chan bool
running bool
wg sync.WaitGroup
}

// New creates a new reference
func New(initializer Initializer, opts ...Opt) *Reference {
lazyRef := &Reference{
initializer: initializer,
initialInit: InitOnFirstAccess,
closed: make(chan bool, 1),
}

for _, opt := range opts {
Expand All @@ -91,19 +97,23 @@ func New(initializer Initializer, opts ...Opt) *Reference {
// This is an expiring reference. After the initializer is
// called, set a timer that will call the expiration handler.
initializer := lazyRef.initializer
initialExpiration := lazyRef.expirationProvider()
lazyRef.initializer = func() (interface{}, error) {
value, err := initializer()
if err == nil {
lazyRef.startTimer(lazyRef.expirationProvider())
lazyRef.ensureTimerStarted(initialExpiration)
}
return value, err
}

lazyRef.closech = make(chan bool, 1)

if lazyRef.expirationHandler == nil {
// Set a default expiration handler
lazyRef.expirationHandler = lazyRef.resetValue
}
if lazyRef.initialInit >= 0 {
lazyRef.startTimer(lazyRef.initialInit)
lazyRef.ensureTimerStarted(lazyRef.initialInit)
}
}

Expand All @@ -117,8 +127,12 @@ func (r *Reference) Get() (interface{}, error) {
return value, nil
}

r.Lock()
defer r.Unlock()
r.lock.Lock()
defer r.lock.Unlock()

if r.closed {
return nil, errors.New("reference is already closed")
}

// Try again inside the lock
if value, ok := r.get(); ok {
Expand Down Expand Up @@ -151,16 +165,34 @@ func (r *Reference) MustGet() interface{} {
// Close should be called for expiring references and
// rerences that specify finalizers.
func (r *Reference) Close() {
r.Lock()
defer r.Unlock()
if !r.setClosed() {
// Already closed
return
}

logger.Info("Closing reference")

logger.Debug("Closing reference")
r.notifyClosing()
r.wg.Wait()
r.finalize()
}

if r.expirationHandler != nil {
r.closed <- true
func (r *Reference) setClosed() bool {
r.lock.Lock()
defer r.lock.Unlock()
if r.closed {
return false
}
if r.finalizer != nil {
r.finalizer()
r.closed = true
return true
}

func (r *Reference) notifyClosing() {
r.lock.Lock()
defer r.lock.Unlock()
if r.running {
logger.Debugf("Sending closed event...")
r.closech <- true
}
}

Expand All @@ -173,6 +205,10 @@ func (r *Reference) get() (interface{}, bool) {
return (*valueHolder)(p).value, true
}

func (r *Reference) isSet() bool {
return atomic.LoadPointer(&r.ref) != nil
}

func (r *Reference) set(value interface{}) {
atomic.StorePointer(&r.ref, unsafe.Pointer(&valueHolder{value: value}))
}
Expand All @@ -187,36 +223,106 @@ func (r *Reference) lastAccessed() time.Time {
return *(*time.Time)(p)
}

func (r *Reference) startTimer(expiration time.Duration) {
func (r *Reference) timerRunning() bool {
r.lock.RLock()
defer r.lock.RUnlock()
return r.running
}

func (r *Reference) setTimerRunning() bool {
r.lock.Lock()
defer r.lock.Unlock()

if r.running || r.closed {
logger.Debugf("Cannot start timer since timer is either already running or it is closed")
return false
}

r.running = true
r.wg.Add(1)
logger.Debugf("Timer started")
return true
}

func (r *Reference) setTimerStopped() {
r.lock.Lock()
defer r.lock.Unlock()
logger.Debugf("Timer stopped")
r.running = false
r.wg.Done()
}

func (r *Reference) ensureTimerStarted(initialExpiration time.Duration) {
if r.running {
logger.Debugf("Timer is already running")
return
}

r.setLastAccessed()

go func() {
expiry := expiration
if !r.setTimerRunning() {
logger.Debugf("Timer is already running")
return
}
defer r.setTimerStopped()

logger.Debugf("Starting timer")

expiry := initialExpiration
for {
select {
case <-r.closed:
case <-r.closech:
logger.Debugf("Got closed event. Exiting timer.")
return

case <-time.After(expiry):
if r.expiryType == LastInitialized {
r.handleExpiration()
return
expiration := r.expirationProvider()

if !r.isSet() && r.expiryType != Refreshing {
expiry = expiration
logger.Debugf("Reference is not set. Will expire again in %s", expiry)
continue
}

// Check how long it's been since last access
durSinceLastAccess := time.Now().Sub(r.lastAccessed())
if durSinceLastAccess > expiration {
if r.expiryType == LastInitialized || r.expiryType == Refreshing {
logger.Debugf("Handling expiration...")
r.handleExpiration()
return
expiry = expiration
logger.Debugf("... finished handling expiration. Setting expiration to %s", expiry)
} else {
// Check how long it's been since last access
durSinceLastAccess := time.Now().Sub(r.lastAccessed())
logger.Debugf("Duration since last access is %s", durSinceLastAccess)
if durSinceLastAccess > expiration {
logger.Debugf("... handling expiration...")
r.handleExpiration()
expiry = expiration
logger.Debugf("... finished handling expiration. Setting expiration to %s", expiry)
} else {
// Set another expiry for the remainder of the time
expiry = expiration - durSinceLastAccess
logger.Debugf("Not expired yet. Will check again in %s", expiry)
}
}
// Set another expiry for the remainder of the time
expiry = expiration - durSinceLastAccess
}
}
}()
}

func (r *Reference) finalize() {
if r.finalizer == nil {
return
}

r.lock.Lock()
r.finalizer()
r.lock.Unlock()
}

func (r *Reference) handleExpiration() {
r.Lock()
defer r.Unlock()
r.lock.Lock()
defer r.lock.Unlock()

logger.Debug("Invoking expiration handler")
r.expirationHandler()
Expand All @@ -240,10 +346,7 @@ func (r *Reference) resetValue() {
// lock so there's no need to lock
func (r *Reference) refreshValue() {
if value, err := r.initializer(); err != nil {
expiration := r.expirationProvider()
logger.Warnf("Error - initializer returned error: %s. Will retry in %s", err, expiration)
// Start the timer so that we can retry
r.startTimer(expiration)
logger.Warnf("Error - initializer returned error: %s. Will retry again later", err)
} else {
r.set(value)
}
Expand Down
Loading

0 comments on commit ff9763c

Please sign in to comment.