Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[azservicebus] Idle timer #19465

Merged
merged 3 commits into from
Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdk/messaging/azservicebus/internal/amqp_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type FakeAMQPLinks struct {
Closed int
CloseIfNeededCalled int

GetFn func(ctx context.Context) (*LinksWithID, error)

// values to be returned for each `Get` call
Revision LinkID
Receiver AMQPReceiver
Expand Down Expand Up @@ -188,6 +190,10 @@ func (l *FakeAMQPLinks) Get(ctx context.Context) (*LinksWithID, error) {
case <-ctx.Done():
return nil, ctx.Err()
default:
if l.GetFn != nil {
return l.GetFn(ctx)
}

return &LinksWithID{
Sender: l.Sender,
Receiver: l.Receiver,
Expand Down
6 changes: 6 additions & 0 deletions sdk/messaging/azservicebus/internal/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/go-amqp"
)

var IdleError = errors.New("link was idle, detaching (will be reattached).")

type errNonRetriable struct {
Message string
}
Expand Down Expand Up @@ -169,6 +171,10 @@ func GetRecoveryKind(err error) RecoveryKind {
return RecoveryKindFatal
}

if errors.Is(err, IdleError) {
return RecoveryKindLink
}

var netErr net.Error

// these are errors that can flow from the go-amqp connection to
Expand Down
4 changes: 4 additions & 0 deletions sdk/messaging/azservicebus/internal/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,7 @@ func Test_TransformError(t *testing.T) {
// and it's okay, for convenience, to pass a nil.
require.Nil(t, TransformError(nil))
}

func Test_IdleError(t *testing.T) {
require.Equal(t, RecoveryKindLink, GetRecoveryKind(IdleError))
}
36 changes: 30 additions & 6 deletions sdk/messaging/azservicebus/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,13 @@ type Receiver struct {
mu sync.Mutex
receiving bool

defaultDrainTimeout time.Duration
defaultTimeAfterFirstMsg time.Duration

// idleDuration controls how long we'll wait, on the client side, for the first message. This is an "internal"
// timeout in that it doesn't result in the user seeing an early exit, but more prevents us from waiting on a
// link that might have been closed, service-side, without our knowledge.
idleDuration time.Duration

cancelReleaser *atomic.Value
}

Expand Down Expand Up @@ -134,8 +138,8 @@ func newReceiver(args newReceiverArgs, options *ReceiverOptions) (*Receiver, err
receiver := &Receiver{
lastPeekedSequenceNumber: 0,
cleanupOnClose: args.cleanupOnClose,
defaultDrainTimeout: time.Second,
defaultTimeAfterFirstMsg: 20 * time.Millisecond,
idleDuration: 5 * time.Minute,
retryOptions: args.retryOptions,
cancelReleaser: &atomic.Value{},
}
Expand Down Expand Up @@ -204,8 +208,13 @@ func (r *Receiver) ReceiveMessages(ctx context.Context, maxMessages int, options
return nil, errors.New("receiver is already receiving messages. ReceiveMessages() cannot be called concurrently")
}

messages, err := r.receiveMessagesImpl(ctx, maxMessages, options)
return messages, internal.TransformError(err)
for {
messages, err := r.receiveMessagesImpl(ctx, maxMessages, options)

if !errors.Is(err, internal.IdleError) {
return messages, internal.TransformError(err)
}
}
}

// ReceiveDeferredMessagesOptions contains optional parameters for the ReceiveDeferredMessages function.
Expand Down Expand Up @@ -419,7 +428,9 @@ func (r *Receiver) receiveMessagesImpl(ctx context.Context, maxMessages int, opt
// function on Receiver).will have the same issue and will return the relevant error
// at that time
if len(result.Messages) == 0 {
if internal.IsCancelError(result.Error) || rk == internal.RecoveryKindFatal {
if internal.IsCancelError(result.Error) ||
rk == internal.RecoveryKindFatal ||
errors.Is(result.Error, internal.IdleError) {
return nil, result.Error
}

Expand Down Expand Up @@ -516,13 +527,26 @@ type fetchMessagesResult struct {
// Note, if you want to only receive prefetched messages send the parentCtx in
// pre-cancelled. This will cause us to only flush the prefetch buffer.
func (r *Receiver) fetchMessages(parentCtx context.Context, receiver amqpwrap.AMQPReceiver, count int, timeAfterFirstMessage time.Duration) fetchMessagesResult {
// This idle timer prevents us from getting into a situation where the remote service has
// invalidated a link but we didn't get notified and are waiting on an eternally dead link.
firstReceiveCtx, cancelFirstReceive := context.WithTimeout(parentCtx, r.idleDuration)
defer cancelFirstReceive()

// The first receive is a bit special - we activate a short timer after this
// so the user doesn't end up in a situation where we're holding onto a bunch
// of messages but never return because they never cancelled and we never
// received all 'count' number of messages.
firstMsg, err := receiver.Receive(parentCtx)
firstMsg, err := receiver.Receive(firstReceiveCtx)

if err != nil {
if errors.Is(err, context.DeadlineExceeded) && parentCtx.Err() == nil {
log.Writef(EventReceiver, "Detaching link due to local idle timeout. Will reattach automatically.")

return fetchMessagesResult{
Error: internal.IdleError,
}
}

// drain the prefetch buffer - we're stopping because of a
// failure on the link/connection _or_ the user cancelled the
// operation.
Expand Down
90 changes: 89 additions & 1 deletion sdk/messaging/azservicebus/receiver_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package azservicebus
import (
"context"
"fmt"
"strings"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -215,8 +216,8 @@ func TestReceiverCancellationUnitTests(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

r := &Receiver{
idleDuration: 5 * time.Minute,
defaultTimeAfterFirstMsg: time.Second,
defaultDrainTimeout: time.Second,
amqpLinks: &internal.FakeAMQPLinks{
Receiver: &internal.FakeAMQPReceiver{
ReceiveFn: func(ctx context.Context) (*amqp.Message, error) {
Expand All @@ -236,6 +237,93 @@ func TestReceiverCancellationUnitTests(t *testing.T) {
})
}

func TestIdleTimer(t *testing.T) {
for i := 0; i < 1; i++ {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

logs := test.CaptureLogsForTest()
// test.EnableStdoutLogging()

receiveCalled := 0

fakeAMQPLinks := &internal.FakeAMQPLinks{
GetFn: func(ctx context.Context) (*internal.LinksWithID, error) {
return &internal.LinksWithID{
Receiver: &internal.FakeAMQPReceiver{
ReceiveFn: func(ctx context.Context) (*amqp.Message, error) {
receiveCalled++

if receiveCalled == 2 {
return &amqp.Message{}, nil
}

<-ctx.Done()
return nil, ctx.Err()
},
},
}, nil
},
}

r := &Receiver{
receiveMode: ReceiveModeReceiveAndDelete,
idleDuration: time.Second,
defaultTimeAfterFirstMsg: time.Second,
amqpLinks: fakeAMQPLinks,
cancelReleaser: &atomic.Value{},
retryOptions: exported.RetryOptions{
// retries aren't used when we're the ones doing the disconnecting
// due to idleness. It's normal for the user to just sit there forever
// waiting for a message.
MaxRetries: 1,
},
cleanupOnClose: func() {},
}

defer func() { _ = r.Close(context.Background()) }()

r.cancelReleaser.Store(emptyCancelFn)

msgs, err := r.ReceiveMessages(ctx, 100, nil)

require.Equal(t, 1+1+1, receiveCalled, "receive called for idled, receiving first message and then attempting to receive a second message")
require.NotEmpty(t, msgs)
require.Nil(t, err)
require.Equal(t, 2, fakeAMQPLinks.CloseIfNeededCalled)

expectedLogs := []string{
// (all of these retries are internally visible only, the user sees one long ReceiveMessages() call until they
// cancel the context or (as in this test) they eventually receive a message.

// receiveMessagesImpl #1
"[azsb.Receiver] Asking for 100 credits",
"[azsb.Receiver] Only need to issue 100 additional credits",

// idle timeout occurs, which causes us to restart the entire receive (including link acquisition)
"[azsb.Receiver] Detaching link due to local idle timeout. Will reattach automatically.",
"[azsb.Receiver] Received 0/100 messages",
"[azsb.Receiver] Failure when receiving messages: link was idle, detaching (will be reattached).",

// receiveMessagesImpl #2 (no idle timeout this time)
"[azsb.Receiver] Asking for 100 credits",
"[azsb.Receiver] Only need to issue 100 additional credits",
"[azsb.Receiver] Received 1/100 messages",
}

// we'll filter out some other logging that's not part of this test.
var actualLogs []string

for _, msg := range logs() {
if !strings.HasPrefix(msg, "[azsb.Receiver] [fakelink] Message releaser") {
actualLogs = append(actualLogs, msg)
}
}

require.Equal(t, expectedLogs, actualLogs)
}
}

func TestReceiverOptions(t *testing.T) {
// defaults
receiver := &Receiver{}
Expand Down