Skip to content

Commit

Permalink
[azservicebus/azeventhubs] Fix "$cbs node has already been opened" is…
Browse files Browse the repository at this point in the history
…sue (#19492)

When looking for leaks in negotiateClaim I missed the most "obvious" one - if you cancel the function we still need to close the link we opened. If we don't we leak it, and the service will then start failing future negotiations to open it.

This showed up in #19347, originally, but a fix I made recently in SB to test attaching/detaching links made it even more obvious.

This has also been extended to any time we create and need to clean up a link - we ignore the cancellation context in that case since leaking the leak can lead to things that look like hangs, but are actually stray links not being cleaned up.

Fixes #19347
  • Loading branch information
richardpark-msft authored Nov 7, 2022
1 parent a53adbe commit 8c8770f
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 32 deletions.
8 changes: 2 additions & 6 deletions sdk/messaging/azeventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
# Release History

## 0.2.1 (Unreleased)

### Features Added

### Breaking Changes
## 0.2.1 (2022-11-08)

### Bugs Fixed

### Other Changes
- $cbs link is properly closed, even on cancellation (#19492)

## 0.2.0 (2022-10-17)

Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/internal/eh/stress/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ annotations:
namespace: 'go'
dependencies:
- name: stress-test-addons
version: ~0.1.20
version: ~0.2.0
repository: https://stresstestcharts.blob.core.windows.net/helm/
6 changes: 5 additions & 1 deletion sdk/messaging/azeventhubs/internal/links.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,11 @@ func (l *Links[LinkT]) Close(ctx context.Context) error {
return l.closeLinks(ctx, true)
}

func (l *Links[LinkT]) closeLinks(ctx context.Context, permanent bool) error {
func (l *Links[LinkT]) closeLinks(_ context.Context, permanent bool) error {
// we're finding, in practice, that allowing cancellations when cleaning up state
// just results in inconsistencies. We'll cut cancellation off here for now.
ctx := context.Background()

if err := l.closeManagementLink(ctx); err != nil {
azlog.Writef(exported.EventConn, "Error while cleaning up management link while doing connection recovery: %s", err.Error())
}
Expand Down
21 changes: 11 additions & 10 deletions sdk/messaging/azeventhubs/internal/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,10 @@ type RPCLinkArgs struct {
LogEvent azlog.Event
}

func closeOrLog(name string, limit time.Duration, closeable interface {
func closeOrLog(name string, closeable interface {
Close(ctx context.Context) error
}) {
ctx, cancel := context.WithTimeout(context.Background(), limit)
defer cancel()

if err := closeable.Close(ctx); err != nil {
if err := closeable.Close(context.Background()); err != nil {
log.Writef(exported.EventAuth, "Failed closing %s for RPC Link: %s", name, err.Error())
}
}
Expand All @@ -121,7 +118,7 @@ func NewRPCLink(ctx context.Context, args RPCLinkArgs) (*rpcLink, error) {

linkID, err := uuid.New()
if err != nil {
closeOrLog("session", 5*time.Second, session)
closeOrLog("session", session)
return nil, err
}

Expand All @@ -143,7 +140,7 @@ func NewRPCLink(ctx context.Context, args RPCLinkArgs) (*rpcLink, error) {
nil,
)
if err != nil {
closeOrLog("session", 5*time.Second, session)
closeOrLog("session", session)
return nil, err
}

Expand All @@ -164,8 +161,8 @@ func NewRPCLink(ctx context.Context, args RPCLinkArgs) (*rpcLink, error) {

receiver, err := session.NewReceiver(ctx, args.Address, receiverOpts)
if err != nil {
closeOrLog("sender", 5*time.Second, sender)
closeOrLog("session", 5*time.Second, session)
closeOrLog("sender", sender)
closeOrLog("session", session)
return nil, err
}

Expand Down Expand Up @@ -334,7 +331,11 @@ func (l *rpcLink) RPC(ctx context.Context, msg *amqp.Message) (*RPCResponse, err
}

// Close the link receiver, sender and session
func (l *rpcLink) Close(ctx context.Context) error {
func (l *rpcLink) Close(_ context.Context) error {
// we're finding, in practice, that allowing cancellations when cleaning up state
// just results in inconsistencies. We'll cut cancellation off here for now.
ctx := context.Background()

l.rpcLinkCtxCancel()

if err := l.closeReceiver(ctx); err != nil {
Expand Down
8 changes: 2 additions & 6 deletions sdk/messaging/azservicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
# Release History

## 1.1.2 (Unreleased)

### Features Added

### Breaking Changes
## 1.1.2 (2022-11-08)

### Bugs Fixed

### Other Changes
- $cbs link is properly closed, even on cancellation (#19492)

## 1.1.1 (2022-10-11)

Expand Down
4 changes: 3 additions & 1 deletion sdk/messaging/azservicebus/internal/amqpLinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,9 @@ func (l *AMQPLinksImpl) initWithoutLocking(ctx context.Context) error {

// close closes the link.
// NOTE: No locking is done in this function, call `Close` if you require locking.
func (l *AMQPLinksImpl) closeWithoutLocking(ctx context.Context, permanent bool) error {
func (l *AMQPLinksImpl) closeWithoutLocking(_ context.Context, permanent bool) error {
ctx := context.Background()

if l.closedPermanently {
return nil
}
Expand Down
25 changes: 19 additions & 6 deletions sdk/messaging/azservicebus/internal/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
azlog "github.com/Azure/azure-sdk-for-go/sdk/internal/log"
"github.com/Azure/azure-sdk-for-go/sdk/internal/uuid"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/go-amqp"
)

Expand Down Expand Up @@ -98,6 +100,14 @@ type RPCLinkArgs struct {
LogEvent azlog.Event
}

func closeOrLog(name string, closeable interface {
Close(ctx context.Context) error
}) {
if err := closeable.Close(context.Background()); err != nil {
log.Writef(exported.EventAuth, "Failed closing %s for RPC Link: %s", name, err.Error())
}
}

// NewRPCLink will build a new request response link
func NewRPCLink(ctx context.Context, args RPCLinkArgs) (*rpcLink, error) {
session, err := args.Client.NewSession(ctx, nil)
Expand All @@ -108,6 +118,7 @@ func NewRPCLink(ctx context.Context, args RPCLinkArgs) (*rpcLink, error) {

linkID, err := uuid.New()
if err != nil {
closeOrLog("session", session)
return nil, err
}

Expand All @@ -129,6 +140,7 @@ func NewRPCLink(ctx context.Context, args RPCLinkArgs) (*rpcLink, error) {
nil,
)
if err != nil {
closeOrLog("session", session)
return nil, err
}

Expand All @@ -149,11 +161,8 @@ func NewRPCLink(ctx context.Context, args RPCLinkArgs) (*rpcLink, error) {

receiver, err := session.NewReceiver(ctx, args.Address, receiverOpts)
if err != nil {
// make sure we close the sender
clsCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

_ = sender.Close(clsCtx)
closeOrLog("sender", sender)
closeOrLog("session", session)
return nil, err
}

Expand Down Expand Up @@ -322,7 +331,11 @@ func (l *rpcLink) RPC(ctx context.Context, msg *amqp.Message) (*RPCResponse, err
}

// Close the link receiver, sender and session
func (l *rpcLink) Close(ctx context.Context) error {
func (l *rpcLink) Close(_ context.Context) error {
// we're finding, in practice, that allowing cancellations when cleaning up state
// just results in inconsistencies. We'll cut cancellation off here for now.
ctx := context.Background()

l.rpcLinkCtxCancel()

if err := l.closeReceiver(ctx); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azservicebus/internal/stress/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ annotations:
namespace: 'go'
dependencies:
- name: stress-test-addons
version: ~0.1.20
version: ~0.2.0
repository: https://stresstestcharts.blob.core.windows.net/helm/

0 comments on commit 8c8770f

Please sign in to comment.