Skip to content

Commit

Permalink
[azservicebus] RetryOptions were not being propagated throughout all …
Browse files Browse the repository at this point in the history
…the clients Azure#17695

RetryOptions were not being propagated throughout all the clients (mismatch in old vs new system).

Fixed and added tests for the various spots that inherit these options to make sure they're properly getting copied over:
- Sender
- All the session receiver functions (AcceptNext, Accept)
- Receivers
- Namespace

Fixes Azure#17686
  • Loading branch information
richardpark-msft authored Apr 23, 2022
1 parent 16caf89 commit fd331bf
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 42 deletions.
54 changes: 34 additions & 20 deletions sdk/messaging/azservicebus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ func newClientImpl(creds clientCreds, options *ClientOptions) (*Client, error) {
}

if options != nil {
client.retryOptions = options.RetryOptions

if options.TLSConfig != nil {
nsOptions = append(nsOptions, internal.NamespaceWithTLSConfig(options.TLSConfig))
}
Expand Down Expand Up @@ -160,6 +162,7 @@ func (client *Client) NewReceiverForQueue(queueName string, options *ReceiverOpt
ns: client.namespace,
entity: entity{Queue: queueName},
getRecoveryKindFunc: internal.GetRecoveryKind,
retryOptions: client.retryOptions,
}, options)

if err != nil {
Expand All @@ -178,6 +181,7 @@ func (client *Client) NewReceiverForSubscription(topicName string, subscriptionN
ns: client.namespace,
entity: entity{Topic: topicName, Subscription: subscriptionName},
getRecoveryKindFunc: internal.GetRecoveryKind,
retryOptions: client.retryOptions,
}, options)

if err != nil {
Expand All @@ -200,7 +204,8 @@ func (client *Client) NewSender(queueOrTopic string, options *NewSenderOptions)
ns: client.namespace,
queueOrTopic: queueOrTopic,
cleanupOnClose: cleanupOnClose,
}, client.retryOptions)
retryOptions: client.retryOptions,
})

if err != nil {
return nil, err
Expand All @@ -216,11 +221,13 @@ func (client *Client) AcceptSessionForQueue(ctx context.Context, queueName strin
id, cleanupOnClose := client.getCleanupForCloseable()
sessionReceiver, err := newSessionReceiver(
ctx,
&sessionID,
client.namespace,
entity{Queue: queueName},
cleanupOnClose,
toReceiverOptions(options))
newSessionReceiverArgs{
sessionID: &sessionID,
ns: client.namespace,
entity: entity{Queue: queueName},
cleanupOnClose: cleanupOnClose,
retryOptions: client.retryOptions,
}, toReceiverOptions(options))

if err != nil {
return nil, err
Expand All @@ -240,10 +247,13 @@ func (client *Client) AcceptSessionForSubscription(ctx context.Context, topicNam
id, cleanupOnClose := client.getCleanupForCloseable()
sessionReceiver, err := newSessionReceiver(
ctx,
&sessionID,
client.namespace,
entity{Topic: topicName, Subscription: subscriptionName},
cleanupOnClose,
newSessionReceiverArgs{
sessionID: &sessionID,
ns: client.namespace,
entity: entity{Topic: topicName, Subscription: subscriptionName},
cleanupOnClose: cleanupOnClose,
retryOptions: client.retryOptions,
},
toReceiverOptions(options))

if err != nil {
Expand All @@ -264,11 +274,13 @@ func (client *Client) AcceptNextSessionForQueue(ctx context.Context, queueName s
id, cleanupOnClose := client.getCleanupForCloseable()
sessionReceiver, err := newSessionReceiver(
ctx,
nil,
client.namespace,
entity{Queue: queueName},
cleanupOnClose,
toReceiverOptions(options))
newSessionReceiverArgs{
sessionID: nil,
ns: client.namespace,
entity: entity{Queue: queueName},
cleanupOnClose: cleanupOnClose,
retryOptions: client.retryOptions,
}, toReceiverOptions(options))

if err != nil {
return nil, err
Expand All @@ -288,11 +300,13 @@ func (client *Client) AcceptNextSessionForSubscription(ctx context.Context, topi
id, cleanupOnClose := client.getCleanupForCloseable()
sessionReceiver, err := newSessionReceiver(
ctx,
nil,
client.namespace,
entity{Topic: topicName, Subscription: subscriptionName},
cleanupOnClose,
toReceiverOptions(options))
newSessionReceiverArgs{
sessionID: nil,
ns: client.namespace,
entity: entity{Topic: topicName, Subscription: subscriptionName},
cleanupOnClose: cleanupOnClose,
retryOptions: client.retryOptions,
}, toReceiverOptions(options))

if err != nil {
return nil, err
Expand Down
131 changes: 131 additions & 0 deletions sdk/messaging/azservicebus/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,76 @@ func TestClientNewSessionReceiverCancel(t *testing.T) {
require.Nil(t, receiver)
}

func TestClientPropagatesRetryOptionsForSessions(t *testing.T) {
connectionString := test.GetConnectionString(t)

queue, cleanupQueue := createQueue(t, connectionString, &admin.QueueProperties{
RequiresSession: to.Ptr(true),
})

defer cleanupQueue()

topic, cleanupTopic := createSubscription(t, connectionString, nil, &admin.SubscriptionProperties{
RequiresSession: to.Ptr(true),
})

defer cleanupTopic()

expectedRetryOptions := RetryOptions{
MaxRetries: 1,
RetryDelay: time.Second,
MaxRetryDelay: time.Millisecond,
}

client, err := NewClientFromConnectionString(connectionString, &ClientOptions{
RetryOptions: expectedRetryOptions,
})
require.NoError(t, err)

actualNS := client.namespace.(*internal.Namespace)
require.Equal(t, expectedRetryOptions, actualNS.RetryOptions)

queueSender, err := client.NewSender(queue, nil)
require.NoError(t, err)

topicSender, err := client.NewSender(topic, nil)
require.NoError(t, err)

err = queueSender.SendMessage(context.Background(), &Message{
SessionID: to.Ptr("hello"),
}, nil)
require.NoError(t, err)

err = topicSender.SendMessage(context.Background(), &Message{
SessionID: to.Ptr("hello"),
}, nil)
require.NoError(t, err)

sessionReceiver, err := client.AcceptSessionForQueue(context.Background(), queue, "hello", nil)
require.NoError(t, err)
require.NoError(t, sessionReceiver.Close(context.Background()))

require.Equal(t, expectedRetryOptions, sessionReceiver.inner.retryOptions)

sessionReceiver, err = client.AcceptSessionForSubscription(context.Background(), topic, "sub", "hello", nil)
require.NoError(t, err)
require.NoError(t, sessionReceiver.Close(context.Background()))

require.Equal(t, expectedRetryOptions, sessionReceiver.inner.retryOptions)

sessionReceiver, err = client.AcceptNextSessionForQueue(context.Background(), queue, nil)
require.NoError(t, err)
require.NoError(t, sessionReceiver.Close(context.Background()))

require.Equal(t, expectedRetryOptions, sessionReceiver.inner.retryOptions)

sessionReceiver, err = client.AcceptNextSessionForSubscription(context.Background(), topic, "sub", nil)
require.NoError(t, err)
require.NoError(t, sessionReceiver.Close(context.Background()))

require.Equal(t, expectedRetryOptions, sessionReceiver.inner.retryOptions)
}

func TestNewClientUnitTests(t *testing.T) {
t.Run("WithTokenCredential", func(t *testing.T) {
fakeTokenCredential := struct{ azcore.TokenCredential }{}
Expand Down Expand Up @@ -361,6 +431,67 @@ func TestNewClientUnitTests(t *testing.T) {
require.Empty(t, client.links)
require.EqualValues(t, 1, ns.AMQPLinks.Closed)
})

t.Run("RetryOptionsArePropagated", func(t *testing.T) {
// retry options are passed and copied along several routes, just make sure it's properly propagated.
// NOTE: session receivers are checked in a separate test because they require actual SB access.
client, err := NewClient("fake.something", struct{ azcore.TokenCredential }{}, &ClientOptions{
RetryOptions: RetryOptions{
MaxRetries: 101,
RetryDelay: 6 * time.Hour,
MaxRetryDelay: 12 * time.Hour,
},
})

client.namespace = &internal.FakeNS{
AMQPLinks: &internal.FakeAMQPLinks{
Receiver: &internal.FakeAMQPReceiver{},
},
}

require.NoError(t, err)

require.Equal(t, RetryOptions{
MaxRetries: 101,
RetryDelay: 6 * time.Hour,
MaxRetryDelay: 12 * time.Hour,
}, client.retryOptions)

sender, err := client.NewSender("hello", nil)
require.NoError(t, err)

require.Equal(t, RetryOptions{
MaxRetries: 101,
RetryDelay: 6 * time.Hour,
MaxRetryDelay: 12 * time.Hour,
}, sender.retryOptions)

receiver, err := client.NewReceiverForQueue("hello", nil)
require.NoError(t, err)

require.Equal(t, RetryOptions{
MaxRetries: 101,
RetryDelay: 6 * time.Hour,
MaxRetryDelay: 12 * time.Hour,
}, receiver.retryOptions)

actualSettler := receiver.settler.(*messageSettler)

require.Equal(t, RetryOptions{
MaxRetries: 101,
RetryDelay: 6 * time.Hour,
MaxRetryDelay: 12 * time.Hour,
}, actualSettler.retryOptions)

subscriptionReceiver, err := client.NewReceiverForSubscription("hello", "world", nil)
require.NoError(t, err)

require.Equal(t, RetryOptions{
MaxRetries: 101,
RetryDelay: 6 * time.Hour,
MaxRetryDelay: 12 * time.Hour,
}, subscriptionReceiver.retryOptions)
})
}

func assertRPCNotFound(t *testing.T, err error) {
Expand Down
7 changes: 4 additions & 3 deletions sdk/messaging/azservicebus/internal/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ type (

newWebSocketConn func(ctx context.Context, args exported.NewWebSocketConnArgs) (net.Conn, error)

retryOptions exported.RetryOptions
// NOTE: exported only so it can be checked in a test
RetryOptions exported.RetryOptions

clientMu sync.RWMutex
client *amqp.Client
Expand Down Expand Up @@ -133,7 +134,7 @@ func NamespaceWithTokenCredential(fullyQualifiedNamespace string, tokenCredentia

func NamespaceWithRetryOptions(retryOptions exported.RetryOptions) NamespaceOption {
return func(ns *Namespace) error {
ns.retryOptions = retryOptions
ns.RetryOptions = retryOptions
return nil
}
}
Expand Down Expand Up @@ -407,7 +408,7 @@ func (ns *Namespace) startNegotiateClaimRenewer(ctx context.Context,

expiresOn = tmpExpiresOn
return nil
}, IsFatalSBError, ns.retryOptions)
}, IsFatalSBError, ns.RetryOptions)

if err == nil {
break
Expand Down
6 changes: 3 additions & 3 deletions sdk/messaging/azservicebus/internal/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestNamespaceNegotiateClaim(t *testing.T) {
expires := time.Now().Add(24 * time.Hour)

ns := &Namespace{
retryOptions: retryOptionsOnlyOnce,
RetryOptions: retryOptionsOnlyOnce,
TokenProvider: sbauth.NewTokenProvider(&fakeTokenCredential{expires: expires}),
}

Expand Down Expand Up @@ -78,7 +78,7 @@ func TestNamespaceNegotiateClaimRenewal(t *testing.T) {
expires := time.Now().Add(24 * time.Hour)

ns := &Namespace{
retryOptions: retryOptionsOnlyOnce,
RetryOptions: retryOptionsOnlyOnce,
TokenProvider: sbauth.NewTokenProvider(&fakeTokenCredential{expires: expires}),
}

Expand Down Expand Up @@ -156,7 +156,7 @@ func TestNamespaceNegotiateClaimFailsToGetClient(t *testing.T) {

func TestNamespaceNegotiateClaimNonRenewableToken(t *testing.T) {
ns := &Namespace{
retryOptions: retryOptionsOnlyOnce,
RetryOptions: retryOptionsOnlyOnce,
TokenProvider: sbauth.NewTokenProvider(&fakeTokenCredential{
// credentials that don't renew return a zero-initialized time.
expires: time.Time{},
Expand Down
29 changes: 29 additions & 0 deletions sdk/messaging/azservicebus/liveTestHelpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,35 @@ func createQueue(t *testing.T, connectionString string, queueProperties *admin.Q
}
}

// createSubscription creates a queue, automatically setting it to delete on idle in 5 minutes.
func createSubscription(t *testing.T, connectionString string, topicProperties *admin.TopicProperties, subscriptionProperties *admin.SubscriptionProperties) (string, func()) {
nanoSeconds := time.Now().UnixNano()
topicName := fmt.Sprintf("topic-%X", nanoSeconds)

adminClient, err := admin.NewClientFromConnectionString(connectionString, nil)
require.NoError(t, err)

if topicProperties == nil {
topicProperties = &admin.TopicProperties{}
}

autoDeleteOnIdle := "PT5M"
topicProperties.AutoDeleteOnIdle = &autoDeleteOnIdle

_, err = adminClient.CreateTopic(context.Background(), topicName, &admin.CreateTopicOptions{
Properties: topicProperties,
})
require.NoError(t, err)

_, err = adminClient.CreateSubscription(context.Background(), topicName, "sub", &admin.CreateSubscriptionOptions{Properties: subscriptionProperties})
require.NoError(t, err)

return topicName, func() {
_, err := adminClient.DeleteTopic(context.Background(), topicName, nil)
require.NoError(t, err)
}
}

func deleteQueue(t *testing.T, ac *admin.Client, queueName string) {
_, err := ac.DeleteQueue(context.Background(), queueName, nil)
require.NoError(t, err)
Expand Down
7 changes: 2 additions & 5 deletions sdk/messaging/azservicebus/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,11 @@ type ReceiverOptions struct {
// SubQueue should be set to connect to the sub queue (ex: dead letter queue)
// of the queue or subscription.
SubQueue SubQueue

retryOptions RetryOptions
}

const defaultLinkRxBuffer = 2048

func applyReceiverOptions(receiver *Receiver, entity *entity, options *ReceiverOptions) error {

if options == nil {
receiver.receiveMode = ReceiveModePeekLock
} else {
Expand All @@ -101,8 +98,6 @@ func applyReceiverOptions(receiver *Receiver, entity *entity, options *ReceiverO
if err := entity.SetSubQueue(options.SubQueue); err != nil {
return err
}

receiver.retryOptions = options.retryOptions
}

entityPath, err := entity.String()
Expand All @@ -121,6 +116,7 @@ type newReceiverArgs struct {
cleanupOnClose func()
getRecoveryKindFunc func(err error) internal.RecoveryKind
newLinkFn func(ctx context.Context, session amqpwrap.AMQPSession) (internal.AMQPSenderCloser, internal.AMQPReceiverCloser, error)
retryOptions RetryOptions
}

func newReceiver(args newReceiverArgs, options *ReceiverOptions) (*Receiver, error) {
Expand All @@ -133,6 +129,7 @@ func newReceiver(args newReceiverArgs, options *ReceiverOptions) (*Receiver, err
cleanupOnClose: args.cleanupOnClose,
defaultDrainTimeout: time.Second,
defaultTimeAfterFirstMsg: 20 * time.Millisecond,
retryOptions: args.retryOptions,
}

if err := applyReceiverOptions(receiver, &args.entity, options); err != nil {
Expand Down
Loading

0 comments on commit fd331bf

Please sign in to comment.