Skip to content

Commit

Permalink
fix: Add logic to ticker loop to ensure subscription always exists wi…
Browse files Browse the repository at this point in the history
…th an open connection (#861)
  • Loading branch information
terev authored and whynowy committed Sep 4, 2020
1 parent 697c9a0 commit 8961e46
Showing 1 changed file with 41 additions and 22 deletions.
63 changes: 41 additions & 22 deletions sensors/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Knetic/govaluate"
Expand All @@ -40,6 +41,15 @@ import (
sensortriggers "github.com/argoproj/argo-events/sensors/triggers"
)

func subscribeOnce(subLock *uint32, subscribe func()) {
// acquire subLock if not already held
if !atomic.CompareAndSwapUint32(subLock, 0, 1) {
return
}

subscribe()
}

// ListenEvents watches and handles events received from the gateway.
func (sensorCtx *SensorContext) ListenEvents(ctx context.Context, stopCh <-chan struct{}) error {
logger := logging.FromContext(ctx).Desugar()
Expand Down Expand Up @@ -142,18 +152,28 @@ func (sensorCtx *SensorContext) ListenEvents(ctx context.Context, stopCh <-chan
}
}

var subLock uint32
wg1 := &sync.WaitGroup{}
closeSubCh := make(chan struct{})
wg1.Add(1)
go func() {
defer wg1.Done()
logger.Sugar().Infof("started to subscribe events for triggers %s with client %s", fmt.Sprintf("[%s]", strings.Join(triggerNames, " ")), clientID)
err = ebDriver.SubscribeEventSources(cctx, conn, closeSubCh, depExpression, deps, filterFunc, actionFunc)
if err != nil {
logger.Error("failed to subscribe to event bus", zap.Any("clientID", clientID), zap.Error(err))
return
}
}()

subscribeFunc := func() {
wg1.Add(1)
go func() {
defer wg1.Done()
// release the lock when goroutine exits
defer atomic.StoreUint32(&subLock, 0)

logger.Sugar().Infof("started subscribing to events for triggers %s with client %s", fmt.Sprintf("[%s]", strings.Join(triggerNames, " ")), clientID)

err = ebDriver.SubscribeEventSources(cctx, conn, closeSubCh, depExpression, deps, filterFunc, actionFunc)
if err != nil {
logger.Error("failed to subscribe to event bus", zap.Any("clientID", clientID), zap.Error(err))
return
}
}()
}

subscribeOnce(&subLock, subscribeFunc)

logger.Sugar().Infof("starting eventbus connection daemon for client %s...", clientID)
ticker := time.NewTicker(5 * time.Second)
Expand All @@ -173,18 +193,17 @@ func (sensorCtx *SensorContext) ListenEvents(ctx context.Context, stopCh <-chan
continue
}
logger.Info("reconnected to NATS streaming server.", zap.Any("clientID", clientID))
closeSubCh <- struct{}{}
time.Sleep(2 * time.Second)
wg1.Add(1)
go func() {
defer wg1.Done()
logger.Sugar().Infof("started to re-subscribe events for triggers %s with client %s", fmt.Sprintf("[%s]", strings.Join(triggerNames, " ")), clientID)
err = ebDriver.SubscribeEventSources(cctx, conn, closeSubCh, depExpression, deps, filterFunc, actionFunc)
if err != nil {
logger.Error("failed to re-subscribe to eventbus", zap.Any("clientID", clientID), zap.Error(err))
return
}
}()

if atomic.LoadUint32(&subLock) == 1 {
closeSubCh <- struct{}{}
// give subscription time to close
time.Sleep(2 * time.Second)
}
}

// create subscription if conn is alive and no subscription is currently held
if conn != nil && !conn.IsClosed() {
subscribeOnce(&subLock, subscribeFunc)
}
}
}
Expand Down

0 comments on commit 8961e46

Please sign in to comment.