Skip to content

Commit

Permalink
[azservicebus] Create a programmatically useful code for when a sessi…
Browse files Browse the repository at this point in the history
…on is not available (#19113)

Adding a new error Code (CodeTimeout) for when we get back a com.microsoft:timeout error from Service Bus. The primary use is if we are doing an AcceptNextSessionFor<Entity>() and there aren't any available sessions.

This should allow customers to figure out if there's a genuine failure or if their session-enabled queue/subscription is "empty".

Fixes #19039
  • Loading branch information
richardpark-msft authored Sep 14, 2022
1 parent efe7e47 commit 523337c
Show file tree
Hide file tree
Showing 12 changed files with 510 additions and 117 deletions.
4 changes: 4 additions & 0 deletions sdk/messaging/azservicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

### Bugs Fixed

- AcceptNextSessionForQueue and AcceptNextSessionForSubscription now return an azservicebus.Error with
Code set to CodeTimeout when they fail due to no sessions being available. Examples for this have
been added for `AcceptNextSessionForQueue`. PR#TBD.

### Other Changes

## 1.1.0 (2022-08-09)
Expand Down
90 changes: 44 additions & 46 deletions sdk/messaging/azservicebus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net"
"sync"
"sync/atomic"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
Expand Down Expand Up @@ -37,6 +38,10 @@ type Client struct {
internal.NamespaceForAMQPLinks
}
retryOptions RetryOptions

// acceptNextTimeout controls how long the session accept can take before
// the server stops waiting.
acceptNextTimeout time.Duration
}

// ClientOptions contains options for the `NewClient` and `NewClientFromConnectionString`
Expand Down Expand Up @@ -262,11 +267,11 @@ func (client *Client) AcceptSessionForSubscription(ctx context.Context, topicNam
toReceiverOptions(options))

if err != nil {
return nil, err
return nil, internal.TransformError(err)
}

if err := sessionReceiver.init(ctx); err != nil {
return nil, err
return nil, internal.TransformError(err)
}

client.addCloseable(id, sessionReceiver)
Expand All @@ -275,56 +280,24 @@ func (client *Client) AcceptSessionForSubscription(ctx context.Context, topicNam

// AcceptNextSessionForQueue accepts the next available session from a queue.
// NOTE: this receiver is initialized immediately, not lazily.
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
//
// If the operation fails and the failure is actionable this function will return
// an *azservicebus.Error. If, for example, the operation times out because there
// are no available sessions it will return an *azservicebus.Error where the
// Code is CodeTimeout.
func (client *Client) AcceptNextSessionForQueue(ctx context.Context, queueName string, options *SessionReceiverOptions) (*SessionReceiver, error) {
id, cleanupOnClose := client.getCleanupForCloseable()
sessionReceiver, err := newSessionReceiver(
ctx,
newSessionReceiverArgs{
sessionID: nil,
ns: client.namespace,
entity: entity{Queue: queueName},
cleanupOnClose: cleanupOnClose,
retryOptions: client.retryOptions,
}, toReceiverOptions(options))

if err != nil {
return nil, err
}

if err := sessionReceiver.init(ctx); err != nil {
return nil, err
}

client.addCloseable(id, sessionReceiver)
return sessionReceiver, nil
return client.acceptNextSessionForEntity(ctx, entity{Queue: queueName}, options)
}

// AcceptNextSessionForSubscription accepts the next available session from a subscription.
// NOTE: this receiver is initialized immediately, not lazily.
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
//
// If the operation fails and the failure is actionable this function will return
// an *azservicebus.Error. If, for example, the operation times out because there
// are no available sessions it will return an *azservicebus.Error where the
// Code is CodeTimeout.
func (client *Client) AcceptNextSessionForSubscription(ctx context.Context, topicName string, subscriptionName string, options *SessionReceiverOptions) (*SessionReceiver, error) {
id, cleanupOnClose := client.getCleanupForCloseable()
sessionReceiver, err := newSessionReceiver(
ctx,
newSessionReceiverArgs{
sessionID: nil,
ns: client.namespace,
entity: entity{Topic: topicName, Subscription: subscriptionName},
cleanupOnClose: cleanupOnClose,
retryOptions: client.retryOptions,
}, toReceiverOptions(options))

if err != nil {
return nil, err
}

if err := sessionReceiver.init(ctx); err != nil {
return nil, err
}

client.addCloseable(id, sessionReceiver)
return sessionReceiver, nil
return client.acceptNextSessionForEntity(ctx, entity{Topic: topicName, Subscription: subscriptionName}, options)
}

// Close closes the current connection Service Bus as well as any Senders or Receivers created
Expand All @@ -349,6 +322,31 @@ func (client *Client) Close(ctx context.Context) error {
return client.namespace.Close(ctx, true)
}

func (client *Client) acceptNextSessionForEntity(ctx context.Context, entity entity, options *SessionReceiverOptions) (*SessionReceiver, error) {
id, cleanupOnClose := client.getCleanupForCloseable()
sessionReceiver, err := newSessionReceiver(
ctx,
newSessionReceiverArgs{
sessionID: nil,
ns: client.namespace,
entity: entity,
cleanupOnClose: cleanupOnClose,
retryOptions: client.retryOptions,
acceptNextTimeout: client.acceptNextTimeout,
}, toReceiverOptions(options))

if err != nil {
return nil, internal.TransformError(err)
}

if err := sessionReceiver.init(ctx); err != nil {
return nil, internal.TransformError(err)
}

client.addCloseable(id, sessionReceiver)
return sessionReceiver, nil
}

func (client *Client) addCloseable(id uint64, closeable internal.Closeable) {
client.linksMu.Lock()
client.links[id] = closeable
Expand Down
6 changes: 6 additions & 0 deletions sdk/messaging/azservicebus/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ const (
// This message will be available again after the lock period expires, or, potentially
// go to the dead letter queue if delivery attempts have been exceeded.
CodeLockLost = exported.CodeLockLost

// CodeTimeout means the service timed out during an operation.
// For instance, if you use ServiceBusClient.AcceptNextSessionForQueue() and there aren't
// any available sessions it will eventually time out and return an *azservicebus.Error
// with this code.
CodeTimeout = exported.CodeTimeout
)

// Error represents a Service Bus specific error.
Expand Down
99 changes: 91 additions & 8 deletions sdk/messaging/azservicebus/example_session_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,111 @@ package azservicebus_test

import (
"context"
"errors"
"fmt"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func ExampleClient_AcceptSessionForQueue() {
sessionReceiver, err := client.AcceptSessionForQueue(context.TODO(), "exampleSessionQueue", "Example Session ID", nil)
exitOnError("Failed to create session receiver", err)
sessionReceiver, err := client.AcceptSessionForQueue(context.TODO(), "exampleSessionQueue", "exampleSessionId", nil)

if err != nil {
panic(err)
}

defer sessionReceiver.Close(context.TODO())

// session receivers work just like non-session receivers
// with one difference - instead of a lock per message there is a lock
// for the session itself.

// Like a message lock, you'll want to periodically renew your session lock.
err = sessionReceiver.RenewSessionLock(context.TODO(), nil)

if err != nil {
panic(err)
}

// session receivers function the same as any other receiver
messages, err := sessionReceiver.ReceiveMessages(context.TODO(), 5, nil)
exitOnError("Failed to receive a message", err)

if err != nil {
panic(err)
}

for _, message := range messages {
err = sessionReceiver.CompleteMessage(context.TODO(), message, nil)
exitOnError("Failed to complete message", err)

if err != nil {
panic(err)
}

fmt.Printf("Received message from session ID \"%s\" and completed it", *message.SessionID)
}
}

func ExampleClient_AcceptSessionForSubscription() {
sessionReceiver, err := client.AcceptSessionForSubscription(context.TODO(), "exampleTopic", "exampleSubscription", "exampleSessionId", nil)

if err != nil {
panic(err)
}

defer sessionReceiver.Close(context.TODO())

// see ExampleClient_AcceptSessionForQueue() for usage of the SessionReceiver.
}

func ExampleClient_AcceptNextSessionForQueue() {
sessionReceiver, err := client.AcceptNextSessionForQueue(context.TODO(), "exampleSessionQueue", nil)
exitOnError("Failed to create session receiver", err)
for {
func() {
sessionReceiver, err := client.AcceptNextSessionForQueue(context.TODO(), "exampleSessionQueue", nil)

if err != nil {
var sbErr *azservicebus.Error

if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeTimeout {
// there are no sessions available. This isn't fatal - we can use the client and
// try to AcceptNextSessionForQueue() again.
fmt.Printf("No session available\n")
return
} else {
panic(err)
}
}

fmt.Printf("Session receiver was assigned session ID \"%s\"", sessionReceiver.SessionID())
defer sessionReceiver.Close(context.TODO())

fmt.Printf("Session receiver was assigned session ID \"%s\"", sessionReceiver.SessionID())

// see ExampleClient_AcceptSessionForQueue() for usage of the SessionReceiver.
}()
}
}

func ExampleClient_AcceptNextSessionForSubscription() {
for {
func() {
sessionReceiver, err := client.AcceptNextSessionForSubscription(context.TODO(), "exampleTopicName", "exampleSubscriptionName", nil)

if err != nil {
var sbErr *azservicebus.Error

if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeTimeout {
// there are no sessions available. This isn't fatal - we can use the client and
// try to AcceptNextSessionForSubscription() again.
fmt.Printf("No session available\n")
return
} else {
panic(err)
}
}

defer sessionReceiver.Close(context.TODO())

fmt.Printf("Session receiver was assigned session ID \"%s\"", sessionReceiver.SessionID())

// see AcceptSessionForSubscription() for some usage of the SessionReceiver itself.
}()
}
}
73 changes: 73 additions & 0 deletions sdk/messaging/azservicebus/example_session_roundrobin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package azservicebus_test

import (
"context"
"errors"
"fmt"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func ExampleClient_AcceptNextSessionForQueue_roundrobin() {
var sessionEnabledQueueName = "exampleSessionQueue"

for {
// You can have multiple active session receivers, provided they're each receiving
// from different sessions.
//
// AcceptNextSessionForQueue (or AcceptNextSessionForSubscription) makes it simple to implement
// this pattern, consuming multiple session receivers in parallel.
sessionReceiver, err := client.AcceptNextSessionForQueue(context.TODO(), sessionEnabledQueueName, nil)

if err != nil {
var sbErr *azservicebus.Error

if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeTimeout {
fmt.Printf("No sessions available\n")

// NOTE: you could also continue here, which will block and wait again for a
// session to become available.
break
}

panic(err)
}

fmt.Printf("Got receiving for session '%s'\n", sessionReceiver.SessionID())

// consume the session
go func() {
defer func() {
fmt.Printf("Closing receiver for session '%s'\n", sessionReceiver.SessionID())
err := sessionReceiver.Close(context.TODO())

if err != nil {
panic(err)
}
}()

// we're only reading a few messages here, but you can also receive in a loop
messages, err := sessionReceiver.ReceiveMessages(context.TODO(), 10, nil)

if err != nil {
panic(err)
}

fmt.Printf("Received %d messages from session '%s'\n", len(messages), sessionReceiver.SessionID())

for _, m := range messages {
if err := processMessageFromSession(context.TODO(), sessionReceiver, m); err != nil {
panic(err)
}
}
}()
}
}

func processMessageFromSession(ctx context.Context, receiver *azservicebus.SessionReceiver, receivedMsg *azservicebus.ReceivedMessage) error {
fmt.Printf("Received message for session %s, message ID %s\n", *receivedMsg.SessionID, receivedMsg.MessageID)
return receiver.CompleteMessage(ctx, receivedMsg, nil)
}
17 changes: 17 additions & 0 deletions sdk/messaging/azservicebus/internal/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ func TransformError(err error) error {
return exported.NewError(exported.CodeLockLost, err)
}

if isMicrosoftTimeoutError(err) {
// one scenario where this error pops up is if you're waiting for an available
// session and there are none available. It waits, up to one minute, and then
// returns this error.
return exported.NewError(exported.CodeTimeout, err)
}

rk := GetRecoveryKind(err)

switch rk {
Expand All @@ -79,6 +86,16 @@ func TransformError(err error) error {
}
}

func isMicrosoftTimeoutError(err error) bool {
var amqpErr *amqp.Error

if errors.As(err, &amqpErr) && amqpErr.Condition == amqp.ErrorCondition("com.microsoft:timeout") {
return true
}

return false
}

func IsDetachError(err error) bool {
var de *amqp.DetachError
return errors.As(err, &de)
Expand Down
6 changes: 6 additions & 0 deletions sdk/messaging/azservicebus/internal/exported/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ const (
// This message will be available again after the lock period expires, or, potentially
// go to the dead letter queue if delivery attempts have been exceeded.
CodeLockLost Code = "locklost"

// CodeTimeout means the service timed out during an operation.
// For instance, if you use ServiceBusClient.AcceptNextSessionForQueue() and there aren't
// any available sessions it will eventually time out and return an *azservicebus.Error
// with this code.
CodeTimeout Code = "timeout"
)

// Error represents a Service Bus specific error.
Expand Down
Loading

0 comments on commit 523337c

Please sign in to comment.