Skip to content

Commit

Permalink
Properly distinguish between input and output link handles (#293)
Browse files Browse the repository at this point in the history
* Properly distinguish between input and output link handles

Per spec, peers can have different values for link handles.  When
sending a frame, its Handle is set to the output (our) handle.  When
receiving a frame, its Handle is set to the input (remote) handle.
Fixed session data structures to properly map an input handle to a link.
Renamed handle fields and supporting types to follow spec nomenclature.
Added a live test that reproduced the issue.

* set release date in changelog

* add debug logging message on link attach complete

* bump release date
  • Loading branch information
jhendrixMSFT authored Jun 8, 2023
1 parent aa7222e commit e17dc33
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 65 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release History

## 1.0.1 (2023-06-08)

### Bugs Fixed

* Fixed an issue that could cause links to terminate with error "received disposition frame with unknown link handle X".

## 1.0.0 (2023-05-04)

### Features Added
Expand Down
55 changes: 55 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,61 @@ func TestNewReceiverWithCancelledContext(t *testing.T) {
checkLeaks()
}

func TestMultipleSendersSharedSession(t *testing.T) {
if localBrokerAddr == "" {
t.Skip()
}

checkLeaks := leaktest.Check(t)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
client, err := amqp.Dial(ctx, localBrokerAddr, &amqp.ConnOptions{
SASLType: amqp.SASLTypeAnonymous(),
})
cancel()
require.NoError(t, err)

ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
session, err := client.NewSession(ctx, nil)
cancel()
require.NoError(t, err)

const msgSize = 1024 * 10
bigMsgData := make([]byte, msgSize)
for i := 0; i < msgSize; i++ {
bigMsgData[i] = byte(i % 256)
}

wg := &sync.WaitGroup{}
for senders := 0; senders < 100; senders++ {
wg.Add(1)
go func() {
defer wg.Done()
for msgCount := 0; msgCount < 50; msgCount++ {
sender, err := session.NewSender(context.Background(), "TestMultipleSendersSharedSession", nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err = sender.Send(ctx, amqp.NewMessage(bigMsgData), nil)
cancel()
if err != nil {
panic(err)
}
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
err = sender.Close(ctx)
if err != nil {
panic(err)
}
cancel()
time.Sleep(time.Millisecond * 100)
}
}()
}

wg.Wait()
require.NoError(t, client.Close())
checkLeaks()
}

func repeatStrings(count int, strs ...string) []string {
var out []string
for i := 0; i < count; i += len(strs) {
Expand Down
24 changes: 15 additions & 9 deletions link.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@ type linkKey struct {

// link contains the common state and methods for sending and receiving links
type link struct {
key linkKey // Name and direction
handle uint32 // our handle
remoteHandle uint32 // remote's handle
dynamicAddr bool // request a dynamic link address from the server
key linkKey // Name and direction

// NOTE: outputHandle and inputHandle might not have the same value

// our handle
outputHandle uint32

// remote's handle
inputHandle uint32

// frames destined for this link are added to this queue by Session.muxFrameToLink
rxQ *queue.Holder[frames.FrameBody]
Expand Down Expand Up @@ -66,6 +71,7 @@ type link struct {
maxMessageSize uint64

closeInProgress bool // indicates that the detach performative has been sent
dynamicAddr bool // request a dynamic link address from the server
}

func newLink(s *Session, r encoding.Role) link {
Expand Down Expand Up @@ -122,7 +128,7 @@ func (l *link) attach(ctx context.Context, beforeAttach func(*frames.PerformAtta

attach := &frames.PerformAttach{
Name: l.key.name,
Handle: l.handle,
Handle: l.outputHandle,
ReceiverSettleMode: l.receiverSettleMode,
SenderSettleMode: l.senderSettleMode,
MaxMessageSize: l.maxMessageSize,
Expand Down Expand Up @@ -183,7 +189,7 @@ func (l *link) attach(ctx context.Context, beforeAttach func(*frames.PerformAtta

// send return detach
fr = &frames.PerformDetach{
Handle: l.handle,
Handle: l.outputHandle,
Closed: true,
}
if err := l.txFrameAndWait(ctx, fr); err != nil {
Expand All @@ -206,7 +212,7 @@ func (l *link) attach(ctx context.Context, beforeAttach func(*frames.PerformAtta
if err := l.setSettleModes(resp); err != nil {
// close the link as there's a mismatch on requested/supported settlement modes
dr := &frames.PerformDetach{
Handle: l.handle,
Handle: l.outputHandle,
Closed: true,
}
if err := l.txFrameAndWait(ctx, dr); err != nil {
Expand Down Expand Up @@ -268,7 +274,7 @@ func (l *link) muxHandleFrame(fr frames.FrameBody) error {
}

dr := &frames.PerformDetach{
Handle: l.handle,
Handle: l.outputHandle,
Closed: true,
}
l.txFrame(context.Background(), dr, nil)
Expand Down Expand Up @@ -329,7 +335,7 @@ func (l *link) closeWithError(cnd ErrCond, desc string) {
}

dr := &frames.PerformDetach{
Handle: l.handle,
Handle: l.outputHandle,
Closed: true,
Error: amqpErr,
}
Expand Down
10 changes: 5 additions & 5 deletions link_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,10 @@ func newTestLink(t *testing.T) *Receiver {
// debug(1, "FLOW Link Mux half: source: %s, inflight: %d, credit: %d, deliveryCount: %d, messages: %d, unsettled: %d, maxCredit : %d, settleMode: %s", l.source.Address, l.receiver.inFlight.len(), l.l.linkCredit, l.deliveryCount, len(l.messages), l.countUnsettled(), l.receiver.maxCredit, l.receiverSettleMode.String())
done: make(chan struct{}),
session: &Session{
tx: make(chan frameBodyEnvelope, 100),
done: make(chan struct{}),
conn: conn,
handles: bitmap.New(32),
tx: make(chan frameBodyEnvelope, 100),
done: make(chan struct{}),
conn: conn,
outputHandles: bitmap.New(32),
},
rxQ: queue.NewHolder(queue.New[frames.FrameBody](100)),
close: make(chan struct{}),
Expand All @@ -204,7 +204,7 @@ func newTestLink(t *testing.T) *Receiver {
func closeTestLink(l *link) {
close(l.close)
q := l.rxQ.Acquire()
q.Enqueue(&frames.PerformDetach{Handle: l.handle, Closed: true})
q.Enqueue(&frames.PerformDetach{Handle: l.outputHandle, Closed: true})
l.rxQ.Release(q)
}

Expand Down
6 changes: 3 additions & 3 deletions receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ func (r *Receiver) mux(hooks receiverTestHooks) {
// receiver is being closed by the client
r.l.closeInProgress = true
fr := &frames.PerformDetach{
Handle: r.l.handle,
Handle: r.l.outputHandle,
Closed: true,
}
r.l.txFrame(context.Background(), fr, nil)
Expand All @@ -615,7 +615,7 @@ func (r *Receiver) muxFlow(linkCredit uint32, drain bool) error {
)

fr := &frames.PerformFlow{
Handle: &r.l.handle,
Handle: &r.l.outputHandle,
DeliveryCount: &deliveryCount,
LinkCredit: &linkCredit, // max number of messages,
Drain: drain,
Expand Down Expand Up @@ -671,7 +671,7 @@ func (r *Receiver) muxHandleFrame(fr frames.FrameBody) error {

// send flow
resp := &frames.PerformFlow{
Handle: &r.l.handle,
Handle: &r.l.outputHandle,
DeliveryCount: &deliveryCount,
LinkCredit: &linkCredit, // max number of messages
}
Expand Down
8 changes: 4 additions & 4 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (s *Sender) send(ctx context.Context, msg *Message, opts *SendOptions) (cha
}

fr := frames.PerformTransfer{
Handle: s.l.handle,
Handle: s.l.outputHandle,
DeliveryID: &needsDeliveryID,
DeliveryTag: deliveryTag,
MessageFormat: &msg.Format,
Expand Down Expand Up @@ -170,7 +170,7 @@ func (s *Sender) send(ctx context.Context, msg *Message, opts *SendOptions) (cha

sent := make(chan error, 1)
select {
case s.transfers <- transferEnvelope{Ctx: ctx, Frame: fr, Sent: sent}:
case s.transfers <- transferEnvelope{Ctx: ctx, InputHandle: s.l.inputHandle, Frame: fr, Sent: sent}:
// frame was sent to our mux
case <-s.l.done:
return nil, s.l.doneErr
Expand Down Expand Up @@ -389,7 +389,7 @@ Loop:
// sender is being closed by the client
s.l.closeInProgress = true
fr := &frames.PerformDetach{
Handle: s.l.handle,
Handle: s.l.outputHandle,
Closed: true,
}
s.l.txFrame(context.Background(), fr, nil)
Expand Down Expand Up @@ -431,7 +431,7 @@ func (s *Sender) muxHandleFrame(fr frames.FrameBody) error {

// send flow
resp := &frames.PerformFlow{
Handle: &s.l.handle,
Handle: &s.l.outputHandle,
DeliveryCount: &deliveryCount,
LinkCredit: &linkCredit, // max number of messages
}
Expand Down
2 changes: 1 addition & 1 deletion sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,7 @@ func TestSenderFlowFrameWithEcho(t *testing.T) {

nextIncomingID := uint32(1)
b, err := fake.EncodeFrame(frames.TypeAMQP, 0, &frames.PerformFlow{
Handle: &sender.l.handle,
Handle: &sender.l.outputHandle,
NextIncomingID: &nextIncomingID,
IncomingWindow: 100,
OutgoingWindow: 100,
Expand Down
Loading

0 comments on commit e17dc33

Please sign in to comment.