From 99e60ff831cdd14000af000263228a732b3a3054 Mon Sep 17 00:00:00 2001 From: ripark Date: Mon, 31 Oct 2022 15:57:01 -0700 Subject: [PATCH 1/3] Add in a watchdog timer (really, just a timeout on first message) to guard against conditions where our link has expired but our client doesn't know about it. We can't repro this condition locally but it might just be that we're missing the Detach frame from the service side due to network conditions. --- sdk/messaging/azservicebus/receiver.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sdk/messaging/azservicebus/receiver.go b/sdk/messaging/azservicebus/receiver.go index 007652af2291..51bb7a2721ff 100644 --- a/sdk/messaging/azservicebus/receiver.go +++ b/sdk/messaging/azservicebus/receiver.go @@ -61,6 +61,8 @@ type Receiver struct { defaultDrainTimeout time.Duration defaultTimeAfterFirstMsg time.Duration + idleDuration time.Duration // controls the amount of time we'll wait without _any_ activity for receiving. + cancelReleaser *atomic.Value } @@ -136,6 +138,7 @@ func newReceiver(args newReceiverArgs, options *ReceiverOptions) (*Receiver, err cleanupOnClose: args.cleanupOnClose, defaultDrainTimeout: time.Second, defaultTimeAfterFirstMsg: 20 * time.Millisecond, + idleDuration: 5 * time.Minute, retryOptions: args.retryOptions, cancelReleaser: &atomic.Value{}, } @@ -516,11 +519,14 @@ type fetchMessagesResult struct { // Note, if you want to only receive prefetched messages send the parentCtx in // pre-cancelled. This will cause us to only flush the prefetch buffer. func (r *Receiver) fetchMessages(parentCtx context.Context, receiver amqpwrap.AMQPReceiver, count int, timeAfterFirstMessage time.Duration) fetchMessagesResult { + firstReceiveCtx, cancelFirstReceive := context.WithTimeout(parentCtx, r.idleDuration) + defer cancelFirstReceive() + // The first receive is a bit special - we activate a short timer after this // so the user doesn't end up in a situation where we're holding onto a bunch // of messages but never return because they never cancelled and we never // received all 'count' number of messages. - firstMsg, err := receiver.Receive(parentCtx) + firstMsg, err := receiver.Receive(firstReceiveCtx) if err != nil { // drain the prefetch buffer - we're stopping because of a From 75e7e95cdb29339b15e904210396c1e6b71efdf8 Mon Sep 17 00:00:00 2001 From: ripark Date: Mon, 31 Oct 2022 16:17:47 -0700 Subject: [PATCH 2/3] Make sure the idle timeout doesn't interfere with the test --- sdk/messaging/azservicebus/receiver_unit_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/messaging/azservicebus/receiver_unit_test.go b/sdk/messaging/azservicebus/receiver_unit_test.go index 1dae1214d8eb..fba2e4b5fe04 100644 --- a/sdk/messaging/azservicebus/receiver_unit_test.go +++ b/sdk/messaging/azservicebus/receiver_unit_test.go @@ -215,6 +215,7 @@ func TestReceiverCancellationUnitTests(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) r := &Receiver{ + idleDuration: 5 * time.Minute, defaultTimeAfterFirstMsg: time.Second, defaultDrainTimeout: time.Second, amqpLinks: &internal.FakeAMQPLinks{ From b2ad3a16298bc29df92f6bf54dfa0a34a75c7cf2 Mon Sep 17 00:00:00 2001 From: ripark Date: Mon, 31 Oct 2022 18:03:20 -0700 Subject: [PATCH 3/3] Give a max 5 minutes of pure idle time when receiving. If we hit that transparently restart the link. --- .../azservicebus/internal/amqp_test_utils.go | 6 ++ sdk/messaging/azservicebus/internal/errors.go | 6 ++ .../azservicebus/internal/errors_test.go | 4 + sdk/messaging/azservicebus/receiver.go | 30 +++++-- .../azservicebus/receiver_unit_test.go | 89 ++++++++++++++++++- 5 files changed, 128 insertions(+), 7 deletions(-) diff --git a/sdk/messaging/azservicebus/internal/amqp_test_utils.go b/sdk/messaging/azservicebus/internal/amqp_test_utils.go index 0aaceab2f64d..b603ec0a5c3f 100644 --- a/sdk/messaging/azservicebus/internal/amqp_test_utils.go +++ b/sdk/messaging/azservicebus/internal/amqp_test_utils.go @@ -44,6 +44,8 @@ type FakeAMQPLinks struct { Closed int CloseIfNeededCalled int + GetFn func(ctx context.Context) (*LinksWithID, error) + // values to be returned for each `Get` call Revision LinkID Receiver AMQPReceiver @@ -188,6 +190,10 @@ func (l *FakeAMQPLinks) Get(ctx context.Context) (*LinksWithID, error) { case <-ctx.Done(): return nil, ctx.Err() default: + if l.GetFn != nil { + return l.GetFn(ctx) + } + return &LinksWithID{ Sender: l.Sender, Receiver: l.Receiver, diff --git a/sdk/messaging/azservicebus/internal/errors.go b/sdk/messaging/azservicebus/internal/errors.go index 245fadc1dc71..68d7784333c1 100644 --- a/sdk/messaging/azservicebus/internal/errors.go +++ b/sdk/messaging/azservicebus/internal/errors.go @@ -17,6 +17,8 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/go-amqp" ) +var IdleError = errors.New("link was idle, detaching (will be reattached).") + type errNonRetriable struct { Message string } @@ -169,6 +171,10 @@ func GetRecoveryKind(err error) RecoveryKind { return RecoveryKindFatal } + if errors.Is(err, IdleError) { + return RecoveryKindLink + } + var netErr net.Error // these are errors that can flow from the go-amqp connection to diff --git a/sdk/messaging/azservicebus/internal/errors_test.go b/sdk/messaging/azservicebus/internal/errors_test.go index bed3ce960462..6b57e8f90b8f 100644 --- a/sdk/messaging/azservicebus/internal/errors_test.go +++ b/sdk/messaging/azservicebus/internal/errors_test.go @@ -286,3 +286,7 @@ func Test_TransformError(t *testing.T) { // and it's okay, for convenience, to pass a nil. require.Nil(t, TransformError(nil)) } + +func Test_IdleError(t *testing.T) { + require.Equal(t, RecoveryKindLink, GetRecoveryKind(IdleError)) +} diff --git a/sdk/messaging/azservicebus/receiver.go b/sdk/messaging/azservicebus/receiver.go index 51bb7a2721ff..7548b6b5e124 100644 --- a/sdk/messaging/azservicebus/receiver.go +++ b/sdk/messaging/azservicebus/receiver.go @@ -58,10 +58,12 @@ type Receiver struct { mu sync.Mutex receiving bool - defaultDrainTimeout time.Duration defaultTimeAfterFirstMsg time.Duration - idleDuration time.Duration // controls the amount of time we'll wait without _any_ activity for receiving. + // idleDuration controls how long we'll wait, on the client side, for the first message. This is an "internal" + // timeout in that it doesn't result in the user seeing an early exit, but more prevents us from waiting on a + // link that might have been closed, service-side, without our knowledge. + idleDuration time.Duration cancelReleaser *atomic.Value } @@ -136,7 +138,6 @@ func newReceiver(args newReceiverArgs, options *ReceiverOptions) (*Receiver, err receiver := &Receiver{ lastPeekedSequenceNumber: 0, cleanupOnClose: args.cleanupOnClose, - defaultDrainTimeout: time.Second, defaultTimeAfterFirstMsg: 20 * time.Millisecond, idleDuration: 5 * time.Minute, retryOptions: args.retryOptions, @@ -207,8 +208,13 @@ func (r *Receiver) ReceiveMessages(ctx context.Context, maxMessages int, options return nil, errors.New("receiver is already receiving messages. ReceiveMessages() cannot be called concurrently") } - messages, err := r.receiveMessagesImpl(ctx, maxMessages, options) - return messages, internal.TransformError(err) + for { + messages, err := r.receiveMessagesImpl(ctx, maxMessages, options) + + if !errors.Is(err, internal.IdleError) { + return messages, internal.TransformError(err) + } + } } // ReceiveDeferredMessagesOptions contains optional parameters for the ReceiveDeferredMessages function. @@ -422,7 +428,9 @@ func (r *Receiver) receiveMessagesImpl(ctx context.Context, maxMessages int, opt // function on Receiver).will have the same issue and will return the relevant error // at that time if len(result.Messages) == 0 { - if internal.IsCancelError(result.Error) || rk == internal.RecoveryKindFatal { + if internal.IsCancelError(result.Error) || + rk == internal.RecoveryKindFatal || + errors.Is(result.Error, internal.IdleError) { return nil, result.Error } @@ -519,6 +527,8 @@ type fetchMessagesResult struct { // Note, if you want to only receive prefetched messages send the parentCtx in // pre-cancelled. This will cause us to only flush the prefetch buffer. func (r *Receiver) fetchMessages(parentCtx context.Context, receiver amqpwrap.AMQPReceiver, count int, timeAfterFirstMessage time.Duration) fetchMessagesResult { + // This idle timer prevents us from getting into a situation where the remote service has + // invalidated a link but we didn't get notified and are waiting on an eternally dead link. firstReceiveCtx, cancelFirstReceive := context.WithTimeout(parentCtx, r.idleDuration) defer cancelFirstReceive() @@ -529,6 +539,14 @@ func (r *Receiver) fetchMessages(parentCtx context.Context, receiver amqpwrap.AM firstMsg, err := receiver.Receive(firstReceiveCtx) if err != nil { + if errors.Is(err, context.DeadlineExceeded) && parentCtx.Err() == nil { + log.Writef(EventReceiver, "Detaching link due to local idle timeout. Will reattach automatically.") + + return fetchMessagesResult{ + Error: internal.IdleError, + } + } + // drain the prefetch buffer - we're stopping because of a // failure on the link/connection _or_ the user cancelled the // operation. diff --git a/sdk/messaging/azservicebus/receiver_unit_test.go b/sdk/messaging/azservicebus/receiver_unit_test.go index fba2e4b5fe04..9715017be455 100644 --- a/sdk/messaging/azservicebus/receiver_unit_test.go +++ b/sdk/messaging/azservicebus/receiver_unit_test.go @@ -6,6 +6,7 @@ package azservicebus import ( "context" "fmt" + "strings" "sync/atomic" "testing" "time" @@ -217,7 +218,6 @@ func TestReceiverCancellationUnitTests(t *testing.T) { r := &Receiver{ idleDuration: 5 * time.Minute, defaultTimeAfterFirstMsg: time.Second, - defaultDrainTimeout: time.Second, amqpLinks: &internal.FakeAMQPLinks{ Receiver: &internal.FakeAMQPReceiver{ ReceiveFn: func(ctx context.Context) (*amqp.Message, error) { @@ -237,6 +237,93 @@ func TestReceiverCancellationUnitTests(t *testing.T) { }) } +func TestIdleTimer(t *testing.T) { + for i := 0; i < 1; i++ { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + logs := test.CaptureLogsForTest() + // test.EnableStdoutLogging() + + receiveCalled := 0 + + fakeAMQPLinks := &internal.FakeAMQPLinks{ + GetFn: func(ctx context.Context) (*internal.LinksWithID, error) { + return &internal.LinksWithID{ + Receiver: &internal.FakeAMQPReceiver{ + ReceiveFn: func(ctx context.Context) (*amqp.Message, error) { + receiveCalled++ + + if receiveCalled == 2 { + return &amqp.Message{}, nil + } + + <-ctx.Done() + return nil, ctx.Err() + }, + }, + }, nil + }, + } + + r := &Receiver{ + receiveMode: ReceiveModeReceiveAndDelete, + idleDuration: time.Second, + defaultTimeAfterFirstMsg: time.Second, + amqpLinks: fakeAMQPLinks, + cancelReleaser: &atomic.Value{}, + retryOptions: exported.RetryOptions{ + // retries aren't used when we're the ones doing the disconnecting + // due to idleness. It's normal for the user to just sit there forever + // waiting for a message. + MaxRetries: 1, + }, + cleanupOnClose: func() {}, + } + + defer func() { _ = r.Close(context.Background()) }() + + r.cancelReleaser.Store(emptyCancelFn) + + msgs, err := r.ReceiveMessages(ctx, 100, nil) + + require.Equal(t, 1+1+1, receiveCalled, "receive called for idled, receiving first message and then attempting to receive a second message") + require.NotEmpty(t, msgs) + require.Nil(t, err) + require.Equal(t, 2, fakeAMQPLinks.CloseIfNeededCalled) + + expectedLogs := []string{ + // (all of these retries are internally visible only, the user sees one long ReceiveMessages() call until they + // cancel the context or (as in this test) they eventually receive a message. + + // receiveMessagesImpl #1 + "[azsb.Receiver] Asking for 100 credits", + "[azsb.Receiver] Only need to issue 100 additional credits", + + // idle timeout occurs, which causes us to restart the entire receive (including link acquisition) + "[azsb.Receiver] Detaching link due to local idle timeout. Will reattach automatically.", + "[azsb.Receiver] Received 0/100 messages", + "[azsb.Receiver] Failure when receiving messages: link was idle, detaching (will be reattached).", + + // receiveMessagesImpl #2 (no idle timeout this time) + "[azsb.Receiver] Asking for 100 credits", + "[azsb.Receiver] Only need to issue 100 additional credits", + "[azsb.Receiver] Received 1/100 messages", + } + + // we'll filter out some other logging that's not part of this test. + var actualLogs []string + + for _, msg := range logs() { + if !strings.HasPrefix(msg, "[azsb.Receiver] [fakelink] Message releaser") { + actualLogs = append(actualLogs, msg) + } + } + + require.Equal(t, expectedLogs, actualLogs) + } +} + func TestReceiverOptions(t *testing.T) { // defaults receiver := &Receiver{}