Skip to content

Commit

Permalink
[azservicebus] Make the client side idle timer work across multiple c…
Browse files Browse the repository at this point in the history
…alls (Azure#19506)

I added in a simple idle timer in Azure#19465, which would expire the link if our internal message receive went longer than 5 minutes. This extends that to track it across multiple consecutive calls as well, in case the user calls and cancels multiple times in a row, eating up 5 minutes of wall-clock time.

This is actually pretty similar to the workaround applied by the customer here in Azure#18517 but tries to take into account multiple calls and also recovers the link without exiting ReceiveMessages().
  • Loading branch information
richardpark-msft authored Nov 8, 2022
1 parent 8606a5b commit 719d2cb
Show file tree
Hide file tree
Showing 8 changed files with 296 additions and 96 deletions.
5 changes: 5 additions & 0 deletions sdk/messaging/azservicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## 1.1.2 (2022-11-08)

### Features Added

- Added a client-side idle timer which will reset Receiver links, transparently, if the link is idle for
5 minutes.

### Bugs Fixed

- $cbs link is properly closed, even on cancellation (#19492)
Expand Down
2 changes: 2 additions & 0 deletions sdk/messaging/azservicebus/internal/amqp_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type FakeAMQPLinks struct {

Closed int
CloseIfNeededCalled int
CloseIfNeededArgs []error

GetFn func(ctx context.Context) (*LinksWithID, error)

Expand Down Expand Up @@ -224,6 +225,7 @@ func (l *FakeAMQPLinks) Close(ctx context.Context, permanently bool) error {

func (l *FakeAMQPLinks) CloseIfNeeded(ctx context.Context, err error) RecoveryKind {
l.CloseIfNeededCalled++
l.CloseIfNeededArgs = append(l.CloseIfNeededArgs, err)
return GetRecoveryKind(err)
}

Expand Down
4 changes: 1 addition & 3 deletions sdk/messaging/azservicebus/internal/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/go-amqp"
)

var IdleError = errors.New("link was idle, detaching (will be reattached).")

type errNonRetriable struct {
Message string
}
Expand Down Expand Up @@ -171,7 +169,7 @@ func GetRecoveryKind(err error) RecoveryKind {
return RecoveryKindFatal
}

if errors.Is(err, IdleError) {
if IsLocalIdleError(err) {
return RecoveryKindLink
}

Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azservicebus/internal/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,5 +288,5 @@ func Test_TransformError(t *testing.T) {
}

func Test_IdleError(t *testing.T) {
require.Equal(t, RecoveryKindLink, GetRecoveryKind(IdleError))
require.Equal(t, RecoveryKindLink, GetRecoveryKind(localIdleError))
}
71 changes: 71 additions & 0 deletions sdk/messaging/azservicebus/internal/idle_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package internal

import (
"context"
"errors"
"time"
)

type LocalIdleTracker struct {
// MaxDuration controls how long we'll wait, on the client side, for the first message
MaxDuration time.Duration

// IdleStart marks the first time the user cancelled in a string of cancellations.
// It gets reset any time there is a success or a non-cancel related failure.
// NOTE: this is public for some unit tests but isn't intended to be set by external code.
IdleStart time.Time
}

var localIdleError = errors.New("link was idle, detaching (will be reattached).")

func IsLocalIdleError(err error) bool {
return errors.Is(err, localIdleError)
}

// NewContextWithDeadline creates a context that has an appropriate deadline that will expire
// when the idle period has completed.
func (idle *LocalIdleTracker) NewContextWithDeadline(ctx context.Context) (context.Context, context.CancelFunc) {
if idle.IdleStart.IsZero() {
// we're not in the middle of an idle period, so we'll start from now.
return context.WithTimeout(ctx, idle.MaxDuration)
}

// we've already idled before.
return context.WithDeadline(ctx, idle.IdleStart.Add(idle.MaxDuration))
}

// Check checks if we are actually idle, taking into account when we initially
// started being idle ([idle.IdleStart]) vs the current time.
//
// If it turns out the link should be considered idle it'll return idleError.
// Else, it'll return the err parameter.
func (idle *LocalIdleTracker) Check(parentCtx context.Context, operationStart time.Time, err error) error {
if err == nil || !IsCancelError(err) {
// either no error occurred (in which case the link is working)
// or a non-cancel error happened. The non-cancel error will just
// be handled by the normal recovery path.
idle.IdleStart = time.Time{}
return err
}

// okay, we're dealing with a cancellation error. Was it the user cancelling (ie, parentCtx) or
// was it our idle timer firing?
if parentCtx.Err() != nil {
// The user cancelled. These cancels come from a single Receive() call on the link, which means we
// didn't get a message back.
if idle.IdleStart.IsZero() {
idle.IdleStart = operationStart
}

return err
}

// It's our idle timeout that caused us to cancel, which means the idle interval has expired.
// We'll clear our internally stored time and indicate we're idle with the sentinel 'idleError'
idle.IdleStart = time.Time{}

return localIdleError
}
112 changes: 112 additions & 0 deletions sdk/messaging/azservicebus/internal/idle_check_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package internal

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestLocalIdleTracker(t *testing.T) {
t.Run("is hierarchical", func(t *testing.T) {
idleTracker := &LocalIdleTracker{MaxDuration: time.Hour}

parentCtx, cancelParent := context.WithCancel(context.Background())

ctx, cancel := idleTracker.NewContextWithDeadline(parentCtx)
defer cancel()

require.NoError(t, ctx.Err())

cancelParent()

require.ErrorIs(t, ctx.Err(), context.Canceled)
})

t.Run("expires in MaxDuration if no previous cancel time exists", func(t *testing.T) {
maxDuration := 2 * time.Second

idleTracker := &LocalIdleTracker{
MaxDuration: maxDuration,
}

ctx, cancel := idleTracker.NewContextWithDeadline(context.Background())
defer cancel()
require.Nil(t, ctx.Err())

deadline, ok := ctx.Deadline()
require.True(t, ok)
require.GreaterOrEqual(t, deadline.Add(-maxDuration).UnixNano(), int64(0), "our deadline was set appropriately into the future")
})

t.Run("with a previous cancel", func(t *testing.T) {
maxWait := 2 * time.Second
idleStartTime := time.Now().Add(time.Hour)

idleTracker := &LocalIdleTracker{
MaxDuration: maxWait,
IdleStart: idleStartTime,
}

ctx, cancel := idleTracker.NewContextWithDeadline(context.Background())
defer cancel()
require.Nil(t, ctx.Err())

deadline, ok := ctx.Deadline()
require.True(t, ok)
require.Equal(t, deadline.Add(-maxWait), idleStartTime, "deadline used our idle start time as the base, not time.Now()")
})

t.Run("user cancels", func(t *testing.T) {
idleTracker := &LocalIdleTracker{
MaxDuration: 30 * time.Minute,
}

parentCtx, cancelParent := context.WithCancel(context.Background())
cancelParent()

// The user cancelled here - since that cancellation is specifically for the _first_
// message it means they didn't receive anything. If they do this for long enough the
// link will be considered idle.

twoHoursFromNow := time.Now().Add(2 * time.Hour)
require.Zero(t, idleTracker.IdleStart)
err := idleTracker.Check(parentCtx, twoHoursFromNow, context.Canceled)

require.ErrorIs(t, err, context.Canceled)
require.Equal(t, idleTracker.IdleStart, twoHoursFromNow, "time of first cancel is recorded (gets used as the base for future idle calculations)")

// now we have a successful call, and it resets the idle time back to zero (ie, we're no longer in danger of being idle)
err = idleTracker.Check(parentCtx, twoHoursFromNow, nil)
require.NoError(t, err)
require.Zero(t, idleTracker.IdleStart, "a successful call resets our idle tracking")

// we also reset the idle time back to zero if there's any other error since
// those errors will be dealt with by the recovery code.
idleTracker.IdleStart = time.Now()

err = idleTracker.Check(parentCtx, twoHoursFromNow, errors.New("some other error"))
require.EqualError(t, err, "some other error")
require.Zero(t, idleTracker.IdleStart, "an error is also considered as proof that the link is alive")
})

t.Run("idle deadline expires", func(t *testing.T) {
twoHoursAndOneMinuteAgo := time.Now().Add(-time.Hour - time.Minute)
idleTracker := &LocalIdleTracker{
MaxDuration: time.Hour,
IdleStart: twoHoursAndOneMinuteAgo,
}

parentCtx, cancelParent := context.WithCancel(context.Background())
defer cancelParent()

err := idleTracker.Check(parentCtx, time.Now(), context.DeadlineExceeded)
require.ErrorIs(t, err, localIdleError)
require.True(t, IsLocalIdleError(err))
})
}
48 changes: 23 additions & 25 deletions sdk/messaging/azservicebus/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,7 @@ type Receiver struct {
receiving bool

defaultTimeAfterFirstMsg time.Duration

// idleDuration controls how long we'll wait, on the client side, for the first message. This is an "internal"
// timeout in that it doesn't result in the user seeing an early exit, but more prevents us from waiting on a
// link that might have been closed, service-side, without our knowledge.
idleDuration time.Duration
idleTracker *internal.LocalIdleTracker

cancelReleaser *atomic.Value
}
Expand Down Expand Up @@ -139,7 +135,7 @@ func newReceiver(args newReceiverArgs, options *ReceiverOptions) (*Receiver, err
lastPeekedSequenceNumber: 0,
cleanupOnClose: args.cleanupOnClose,
defaultTimeAfterFirstMsg: 20 * time.Millisecond,
idleDuration: 5 * time.Minute,
idleTracker: &internal.LocalIdleTracker{MaxDuration: 5 * time.Minute},
retryOptions: args.retryOptions,
cancelReleaser: &atomic.Value{},
}
Expand Down Expand Up @@ -211,9 +207,16 @@ func (r *Receiver) ReceiveMessages(ctx context.Context, maxMessages int, options
for {
messages, err := r.receiveMessagesImpl(ctx, maxMessages, options)

if !errors.Is(err, internal.IdleError) {
return messages, internal.TransformError(err)
// when we have a client-side idle error it just means we went too long without
// any activity. The user didn't make this happen (nor did the service) so we treat it
// as just an internal "state reset" and try the receive again.
//
// When this does happen there aren't any messages or anything to return.
if internal.IsLocalIdleError(err) {
continue
}

return messages, internal.TransformError(err)
}
}

Expand Down Expand Up @@ -418,6 +421,10 @@ func (r *Receiver) receiveMessagesImpl(ctx context.Context, maxMessages int, opt
log.Writef(EventReceiver, "Failure when receiving messages: %s", result.Error)
}

if internal.IsLocalIdleError(result.Error) {
return nil, result.Error
}

// If the user does get some messages we ignore 'error' and return only the messages.
//
// Doing otherwise would break the idiom that people are used to where people expected
Expand All @@ -429,8 +436,7 @@ func (r *Receiver) receiveMessagesImpl(ctx context.Context, maxMessages int, opt
// at that time
if len(result.Messages) == 0 {
if internal.IsCancelError(result.Error) ||
rk == internal.RecoveryKindFatal ||
errors.Is(result.Error, internal.IdleError) {
rk == internal.RecoveryKindFatal {
return nil, result.Error
}

Expand Down Expand Up @@ -526,27 +532,19 @@ type fetchMessagesResult struct {
//
// Note, if you want to only receive prefetched messages send the parentCtx in
// pre-cancelled. This will cause us to only flush the prefetch buffer.
func (r *Receiver) fetchMessages(parentCtx context.Context, receiver amqpwrap.AMQPReceiver, count int, timeAfterFirstMessage time.Duration) fetchMessagesResult {
// This idle timer prevents us from getting into a situation where the remote service has
// invalidated a link but we didn't get notified and are waiting on an eternally dead link.
firstReceiveCtx, cancelFirstReceive := context.WithTimeout(parentCtx, r.idleDuration)
func (r *Receiver) fetchMessages(usersCtx context.Context, receiver amqpwrap.AMQPReceiver, count int, timeAfterFirstMessage time.Duration) fetchMessagesResult {
firstReceiveCtx, cancelFirstReceive := r.idleTracker.NewContextWithDeadline(usersCtx)
defer cancelFirstReceive()

start := time.Now()

// The first receive is a bit special - we activate a short timer after this
// so the user doesn't end up in a situation where we're holding onto a bunch
// of messages but never return because they never cancelled and we never
// received all 'count' number of messages.
firstMsg, err := receiver.Receive(firstReceiveCtx)

if err != nil {
if errors.Is(err, context.DeadlineExceeded) && parentCtx.Err() == nil {
log.Writef(EventReceiver, "Detaching link due to local idle timeout. Will reattach automatically.")

return fetchMessagesResult{
Error: internal.IdleError,
}
}

if err := r.idleTracker.Check(usersCtx, start, err); err != nil {
// drain the prefetch buffer - we're stopping because of a
// failure on the link/connection _or_ the user cancelled the
// operation.
Expand All @@ -566,7 +564,7 @@ func (r *Receiver) fetchMessages(parentCtx context.Context, receiver amqpwrap.AM

// after we get one message we will try to receive as much as we can
// during the `timeAfterFirstMessage` duration.
ctx, cancel := context.WithTimeout(parentCtx, timeAfterFirstMessage)
ctx, cancel := context.WithTimeout(usersCtx, timeAfterFirstMessage)
defer cancel()

var lastErr error
Expand Down Expand Up @@ -595,7 +593,7 @@ func (r *Receiver) fetchMessages(parentCtx context.Context, receiver amqpwrap.AM
//
// If we cancel: we want a nil error since there's no failure. In that case parentCtx.Err() is nil
// If they cancel: we want to forward on their cancellation error.
Error: parentCtx.Err(),
Error: usersCtx.Err(),
}
} else {
return fetchMessagesResult{
Expand Down
Loading

0 comments on commit 719d2cb

Please sign in to comment.