diff --git a/sdk/messaging/azservicebus/client.go b/sdk/messaging/azservicebus/client.go index 5e3c8ddd68a5..3a157537d637 100644 --- a/sdk/messaging/azservicebus/client.go +++ b/sdk/messaging/azservicebus/client.go @@ -133,6 +133,8 @@ func newClientImpl(creds clientCreds, options *ClientOptions) (*Client, error) { } if options != nil { + client.retryOptions = options.RetryOptions + if options.TLSConfig != nil { nsOptions = append(nsOptions, internal.NamespaceWithTLSConfig(options.TLSConfig)) } @@ -160,6 +162,7 @@ func (client *Client) NewReceiverForQueue(queueName string, options *ReceiverOpt ns: client.namespace, entity: entity{Queue: queueName}, getRecoveryKindFunc: internal.GetRecoveryKind, + retryOptions: client.retryOptions, }, options) if err != nil { @@ -178,6 +181,7 @@ func (client *Client) NewReceiverForSubscription(topicName string, subscriptionN ns: client.namespace, entity: entity{Topic: topicName, Subscription: subscriptionName}, getRecoveryKindFunc: internal.GetRecoveryKind, + retryOptions: client.retryOptions, }, options) if err != nil { @@ -200,7 +204,8 @@ func (client *Client) NewSender(queueOrTopic string, options *NewSenderOptions) ns: client.namespace, queueOrTopic: queueOrTopic, cleanupOnClose: cleanupOnClose, - }, client.retryOptions) + retryOptions: client.retryOptions, + }) if err != nil { return nil, err @@ -216,11 +221,13 @@ func (client *Client) AcceptSessionForQueue(ctx context.Context, queueName strin id, cleanupOnClose := client.getCleanupForCloseable() sessionReceiver, err := newSessionReceiver( ctx, - &sessionID, - client.namespace, - entity{Queue: queueName}, - cleanupOnClose, - toReceiverOptions(options)) + newSessionReceiverArgs{ + sessionID: &sessionID, + ns: client.namespace, + entity: entity{Queue: queueName}, + cleanupOnClose: cleanupOnClose, + retryOptions: client.retryOptions, + }, toReceiverOptions(options)) if err != nil { return nil, err @@ -240,10 +247,13 @@ func (client *Client) AcceptSessionForSubscription(ctx context.Context, topicNam id, cleanupOnClose := client.getCleanupForCloseable() sessionReceiver, err := newSessionReceiver( ctx, - &sessionID, - client.namespace, - entity{Topic: topicName, Subscription: subscriptionName}, - cleanupOnClose, + newSessionReceiverArgs{ + sessionID: &sessionID, + ns: client.namespace, + entity: entity{Topic: topicName, Subscription: subscriptionName}, + cleanupOnClose: cleanupOnClose, + retryOptions: client.retryOptions, + }, toReceiverOptions(options)) if err != nil { @@ -264,11 +274,13 @@ func (client *Client) AcceptNextSessionForQueue(ctx context.Context, queueName s id, cleanupOnClose := client.getCleanupForCloseable() sessionReceiver, err := newSessionReceiver( ctx, - nil, - client.namespace, - entity{Queue: queueName}, - cleanupOnClose, - toReceiverOptions(options)) + newSessionReceiverArgs{ + sessionID: nil, + ns: client.namespace, + entity: entity{Queue: queueName}, + cleanupOnClose: cleanupOnClose, + retryOptions: client.retryOptions, + }, toReceiverOptions(options)) if err != nil { return nil, err @@ -288,11 +300,13 @@ func (client *Client) AcceptNextSessionForSubscription(ctx context.Context, topi id, cleanupOnClose := client.getCleanupForCloseable() sessionReceiver, err := newSessionReceiver( ctx, - nil, - client.namespace, - entity{Topic: topicName, Subscription: subscriptionName}, - cleanupOnClose, - toReceiverOptions(options)) + newSessionReceiverArgs{ + sessionID: nil, + ns: client.namespace, + entity: entity{Topic: topicName, Subscription: subscriptionName}, + cleanupOnClose: cleanupOnClose, + retryOptions: client.retryOptions, + }, toReceiverOptions(options)) if err != nil { return nil, err diff --git a/sdk/messaging/azservicebus/client_test.go b/sdk/messaging/azservicebus/client_test.go index ede056561f31..7ce444d41baa 100644 --- a/sdk/messaging/azservicebus/client_test.go +++ b/sdk/messaging/azservicebus/client_test.go @@ -283,6 +283,76 @@ func TestClientNewSessionReceiverCancel(t *testing.T) { require.Nil(t, receiver) } +func TestClientPropagatesRetryOptionsForSessions(t *testing.T) { + connectionString := test.GetConnectionString(t) + + queue, cleanupQueue := createQueue(t, connectionString, &admin.QueueProperties{ + RequiresSession: to.Ptr(true), + }) + + defer cleanupQueue() + + topic, cleanupTopic := createSubscription(t, connectionString, nil, &admin.SubscriptionProperties{ + RequiresSession: to.Ptr(true), + }) + + defer cleanupTopic() + + expectedRetryOptions := RetryOptions{ + MaxRetries: 1, + RetryDelay: time.Second, + MaxRetryDelay: time.Millisecond, + } + + client, err := NewClientFromConnectionString(connectionString, &ClientOptions{ + RetryOptions: expectedRetryOptions, + }) + require.NoError(t, err) + + actualNS := client.namespace.(*internal.Namespace) + require.Equal(t, expectedRetryOptions, actualNS.RetryOptions) + + queueSender, err := client.NewSender(queue, nil) + require.NoError(t, err) + + topicSender, err := client.NewSender(topic, nil) + require.NoError(t, err) + + err = queueSender.SendMessage(context.Background(), &Message{ + SessionID: to.Ptr("hello"), + }, nil) + require.NoError(t, err) + + err = topicSender.SendMessage(context.Background(), &Message{ + SessionID: to.Ptr("hello"), + }, nil) + require.NoError(t, err) + + sessionReceiver, err := client.AcceptSessionForQueue(context.Background(), queue, "hello", nil) + require.NoError(t, err) + require.NoError(t, sessionReceiver.Close(context.Background())) + + require.Equal(t, expectedRetryOptions, sessionReceiver.inner.retryOptions) + + sessionReceiver, err = client.AcceptSessionForSubscription(context.Background(), topic, "sub", "hello", nil) + require.NoError(t, err) + require.NoError(t, sessionReceiver.Close(context.Background())) + + require.Equal(t, expectedRetryOptions, sessionReceiver.inner.retryOptions) + + sessionReceiver, err = client.AcceptNextSessionForQueue(context.Background(), queue, nil) + require.NoError(t, err) + require.NoError(t, sessionReceiver.Close(context.Background())) + + require.Equal(t, expectedRetryOptions, sessionReceiver.inner.retryOptions) + + sessionReceiver, err = client.AcceptNextSessionForSubscription(context.Background(), topic, "sub", nil) + require.NoError(t, err) + require.NoError(t, sessionReceiver.Close(context.Background())) + + require.Equal(t, expectedRetryOptions, sessionReceiver.inner.retryOptions) +} + func TestNewClientUnitTests(t *testing.T) { t.Run("WithTokenCredential", func(t *testing.T) { fakeTokenCredential := struct{ azcore.TokenCredential }{} @@ -361,6 +431,67 @@ func TestNewClientUnitTests(t *testing.T) { require.Empty(t, client.links) require.EqualValues(t, 1, ns.AMQPLinks.Closed) }) + + t.Run("RetryOptionsArePropagated", func(t *testing.T) { + // retry options are passed and copied along several routes, just make sure it's properly propagated. + // NOTE: session receivers are checked in a separate test because they require actual SB access. + client, err := NewClient("fake.something", struct{ azcore.TokenCredential }{}, &ClientOptions{ + RetryOptions: RetryOptions{ + MaxRetries: 101, + RetryDelay: 6 * time.Hour, + MaxRetryDelay: 12 * time.Hour, + }, + }) + + client.namespace = &internal.FakeNS{ + AMQPLinks: &internal.FakeAMQPLinks{ + Receiver: &internal.FakeAMQPReceiver{}, + }, + } + + require.NoError(t, err) + + require.Equal(t, RetryOptions{ + MaxRetries: 101, + RetryDelay: 6 * time.Hour, + MaxRetryDelay: 12 * time.Hour, + }, client.retryOptions) + + sender, err := client.NewSender("hello", nil) + require.NoError(t, err) + + require.Equal(t, RetryOptions{ + MaxRetries: 101, + RetryDelay: 6 * time.Hour, + MaxRetryDelay: 12 * time.Hour, + }, sender.retryOptions) + + receiver, err := client.NewReceiverForQueue("hello", nil) + require.NoError(t, err) + + require.Equal(t, RetryOptions{ + MaxRetries: 101, + RetryDelay: 6 * time.Hour, + MaxRetryDelay: 12 * time.Hour, + }, receiver.retryOptions) + + actualSettler := receiver.settler.(*messageSettler) + + require.Equal(t, RetryOptions{ + MaxRetries: 101, + RetryDelay: 6 * time.Hour, + MaxRetryDelay: 12 * time.Hour, + }, actualSettler.retryOptions) + + subscriptionReceiver, err := client.NewReceiverForSubscription("hello", "world", nil) + require.NoError(t, err) + + require.Equal(t, RetryOptions{ + MaxRetries: 101, + RetryDelay: 6 * time.Hour, + MaxRetryDelay: 12 * time.Hour, + }, subscriptionReceiver.retryOptions) + }) } func assertRPCNotFound(t *testing.T, err error) { diff --git a/sdk/messaging/azservicebus/internal/namespace.go b/sdk/messaging/azservicebus/internal/namespace.go index 1db53ebf30ec..45e86a23cb88 100644 --- a/sdk/messaging/azservicebus/internal/namespace.go +++ b/sdk/messaging/azservicebus/internal/namespace.go @@ -45,7 +45,8 @@ type ( newWebSocketConn func(ctx context.Context, args exported.NewWebSocketConnArgs) (net.Conn, error) - retryOptions exported.RetryOptions + // NOTE: exported only so it can be checked in a test + RetryOptions exported.RetryOptions clientMu sync.RWMutex client *amqp.Client @@ -133,7 +134,7 @@ func NamespaceWithTokenCredential(fullyQualifiedNamespace string, tokenCredentia func NamespaceWithRetryOptions(retryOptions exported.RetryOptions) NamespaceOption { return func(ns *Namespace) error { - ns.retryOptions = retryOptions + ns.RetryOptions = retryOptions return nil } } @@ -407,7 +408,7 @@ func (ns *Namespace) startNegotiateClaimRenewer(ctx context.Context, expiresOn = tmpExpiresOn return nil - }, IsFatalSBError, ns.retryOptions) + }, IsFatalSBError, ns.RetryOptions) if err == nil { break diff --git a/sdk/messaging/azservicebus/internal/namespace_test.go b/sdk/messaging/azservicebus/internal/namespace_test.go index aba9818628e2..9f8ea9e8860d 100644 --- a/sdk/messaging/azservicebus/internal/namespace_test.go +++ b/sdk/messaging/azservicebus/internal/namespace_test.go @@ -33,7 +33,7 @@ func TestNamespaceNegotiateClaim(t *testing.T) { expires := time.Now().Add(24 * time.Hour) ns := &Namespace{ - retryOptions: retryOptionsOnlyOnce, + RetryOptions: retryOptionsOnlyOnce, TokenProvider: sbauth.NewTokenProvider(&fakeTokenCredential{expires: expires}), } @@ -78,7 +78,7 @@ func TestNamespaceNegotiateClaimRenewal(t *testing.T) { expires := time.Now().Add(24 * time.Hour) ns := &Namespace{ - retryOptions: retryOptionsOnlyOnce, + RetryOptions: retryOptionsOnlyOnce, TokenProvider: sbauth.NewTokenProvider(&fakeTokenCredential{expires: expires}), } @@ -156,7 +156,7 @@ func TestNamespaceNegotiateClaimFailsToGetClient(t *testing.T) { func TestNamespaceNegotiateClaimNonRenewableToken(t *testing.T) { ns := &Namespace{ - retryOptions: retryOptionsOnlyOnce, + RetryOptions: retryOptionsOnlyOnce, TokenProvider: sbauth.NewTokenProvider(&fakeTokenCredential{ // credentials that don't renew return a zero-initialized time. expires: time.Time{}, diff --git a/sdk/messaging/azservicebus/liveTestHelpers_test.go b/sdk/messaging/azservicebus/liveTestHelpers_test.go index 1b81aaf78645..51f775a1bbe8 100644 --- a/sdk/messaging/azservicebus/liveTestHelpers_test.go +++ b/sdk/messaging/azservicebus/liveTestHelpers_test.go @@ -62,6 +62,35 @@ func createQueue(t *testing.T, connectionString string, queueProperties *admin.Q } } +// createSubscription creates a queue, automatically setting it to delete on idle in 5 minutes. +func createSubscription(t *testing.T, connectionString string, topicProperties *admin.TopicProperties, subscriptionProperties *admin.SubscriptionProperties) (string, func()) { + nanoSeconds := time.Now().UnixNano() + topicName := fmt.Sprintf("topic-%X", nanoSeconds) + + adminClient, err := admin.NewClientFromConnectionString(connectionString, nil) + require.NoError(t, err) + + if topicProperties == nil { + topicProperties = &admin.TopicProperties{} + } + + autoDeleteOnIdle := "PT5M" + topicProperties.AutoDeleteOnIdle = &autoDeleteOnIdle + + _, err = adminClient.CreateTopic(context.Background(), topicName, &admin.CreateTopicOptions{ + Properties: topicProperties, + }) + require.NoError(t, err) + + _, err = adminClient.CreateSubscription(context.Background(), topicName, "sub", &admin.CreateSubscriptionOptions{Properties: subscriptionProperties}) + require.NoError(t, err) + + return topicName, func() { + _, err := adminClient.DeleteTopic(context.Background(), topicName, nil) + require.NoError(t, err) + } +} + func deleteQueue(t *testing.T, ac *admin.Client, queueName string) { _, err := ac.DeleteQueue(context.Background(), queueName, nil) require.NoError(t, err) diff --git a/sdk/messaging/azservicebus/receiver.go b/sdk/messaging/azservicebus/receiver.go index e413ecaf5dec..88b62d6624fd 100644 --- a/sdk/messaging/azservicebus/receiver.go +++ b/sdk/messaging/azservicebus/receiver.go @@ -81,14 +81,11 @@ type ReceiverOptions struct { // SubQueue should be set to connect to the sub queue (ex: dead letter queue) // of the queue or subscription. SubQueue SubQueue - - retryOptions RetryOptions } const defaultLinkRxBuffer = 2048 func applyReceiverOptions(receiver *Receiver, entity *entity, options *ReceiverOptions) error { - if options == nil { receiver.receiveMode = ReceiveModePeekLock } else { @@ -101,8 +98,6 @@ func applyReceiverOptions(receiver *Receiver, entity *entity, options *ReceiverO if err := entity.SetSubQueue(options.SubQueue); err != nil { return err } - - receiver.retryOptions = options.retryOptions } entityPath, err := entity.String() @@ -121,6 +116,7 @@ type newReceiverArgs struct { cleanupOnClose func() getRecoveryKindFunc func(err error) internal.RecoveryKind newLinkFn func(ctx context.Context, session amqpwrap.AMQPSession) (internal.AMQPSenderCloser, internal.AMQPReceiverCloser, error) + retryOptions RetryOptions } func newReceiver(args newReceiverArgs, options *ReceiverOptions) (*Receiver, error) { @@ -133,6 +129,7 @@ func newReceiver(args newReceiverArgs, options *ReceiverOptions) (*Receiver, err cleanupOnClose: args.cleanupOnClose, defaultDrainTimeout: time.Second, defaultTimeAfterFirstMsg: 20 * time.Millisecond, + retryOptions: args.retryOptions, } if err := applyReceiverOptions(receiver, &args.entity, options); err != nil { diff --git a/sdk/messaging/azservicebus/receiver_unit_test.go b/sdk/messaging/azservicebus/receiver_unit_test.go index a4e7ef1af7ff..91660ca13810 100644 --- a/sdk/messaging/azservicebus/receiver_unit_test.go +++ b/sdk/messaging/azservicebus/receiver_unit_test.go @@ -386,16 +386,12 @@ func TestReceiverOptions(t *testing.T) { require.NoError(t, applyReceiverOptions(receiver, e, &ReceiverOptions{ ReceiveMode: ReceiveModeReceiveAndDelete, SubQueue: SubQueueTransfer, - retryOptions: RetryOptions{ - MaxRetries: 101, - }, })) require.EqualValues(t, ReceiveModeReceiveAndDelete, receiver.receiveMode) path, err = e.String() require.NoError(t, err) require.EqualValues(t, "topic/Subscriptions/subscription/$Transfer/$DeadLetterQueue", path) - require.EqualValues(t, 101, receiver.retryOptions.MaxRetries) } func TestReceiverDeferUnitTests(t *testing.T) { diff --git a/sdk/messaging/azservicebus/sender.go b/sdk/messaging/azservicebus/sender.go index 799b741af97c..8568be8b93d5 100644 --- a/sdk/messaging/azservicebus/sender.go +++ b/sdk/messaging/azservicebus/sender.go @@ -150,9 +150,10 @@ type newSenderArgs struct { ns internal.NamespaceWithNewAMQPLinks queueOrTopic string cleanupOnClose func() + retryOptions RetryOptions } -func newSender(args newSenderArgs, retryOptions RetryOptions) (*Sender, error) { +func newSender(args newSenderArgs) (*Sender, error) { if err := args.ns.Check(); err != nil { return nil, err } @@ -160,7 +161,7 @@ func newSender(args newSenderArgs, retryOptions RetryOptions) (*Sender, error) { sender := &Sender{ queueOrTopic: args.queueOrTopic, cleanupOnClose: args.cleanupOnClose, - retryOptions: RetryOptions(retryOptions), + retryOptions: args.retryOptions, } sender.links = args.ns.NewAMQPLinks(args.queueOrTopic, sender.createSenderLink, internal.GetRecoveryKind) diff --git a/sdk/messaging/azservicebus/session_receiver.go b/sdk/messaging/azservicebus/session_receiver.go index 2bdee141d539..80fbc6258486 100644 --- a/sdk/messaging/azservicebus/session_receiver.go +++ b/sdk/messaging/azservicebus/session_receiver.go @@ -49,18 +49,27 @@ func toReceiverOptions(sropts *SessionReceiverOptions) *ReceiverOptions { } } -func newSessionReceiver(ctx context.Context, sessionID *string, ns internal.NamespaceWithNewAMQPLinks, entity entity, cleanupOnClose func(), options *ReceiverOptions) (*SessionReceiver, error) { +type newSessionReceiverArgs struct { + sessionID *string + ns internal.NamespaceWithNewAMQPLinks + entity entity + cleanupOnClose func() + retryOptions RetryOptions +} + +func newSessionReceiver(ctx context.Context, args newSessionReceiverArgs, options *ReceiverOptions) (*SessionReceiver, error) { sessionReceiver := &SessionReceiver{ - sessionID: sessionID, + sessionID: args.sessionID, lockedUntil: time.Time{}, } r, err := newReceiver(newReceiverArgs{ - ns: ns, - entity: entity, - cleanupOnClose: cleanupOnClose, + ns: args.ns, + entity: args.entity, + cleanupOnClose: args.cleanupOnClose, newLinkFn: sessionReceiver.newLink, getRecoveryKindFunc: internal.GetRecoveryKindForSession, + retryOptions: args.retryOptions, }, options) if err != nil {