Skip to content

Commit

Permalink
Fix amqp resource leak
Browse files Browse the repository at this point in the history
Signed-off-by: Jack Ding <jackding@gmail.com>
  • Loading branch information
jzding committed Dec 14, 2022
1 parent 6d25e1b commit ebd6308
Showing 1 changed file with 2 additions and 1 deletion.
3 changes: 2 additions & 1 deletion pkg/protocol/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@ func (q *Router) QDRRouter(wg *sync.WaitGroup) {
localmetrics.UpdateStatusCheckCount(statusAddress, localmetrics.FAILED, 1)
}
}

sendToStatusChannel := func(d *channel.DataChan, e *cloudevents.Event) {
if d.StatusChan == nil || e == nil {
return
Expand Down Expand Up @@ -461,6 +460,7 @@ func (q *Router) ReceiveMsg(d *channel.DataChan, f func(d *channel.DataChan, e *
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
defer p.Close(ctx)
msg, errRec := p.Receive(ctx)
if errRec != nil {
log.Errorf("error in waiting for current state %s = %s", errRec, *d.ReturnAddress)
Expand Down Expand Up @@ -592,6 +592,7 @@ func (q *Router) setReceiver(wg *sync.WaitGroup, d *channel.DataChan) error {
if sender, senderErr = q.NewSenderObject(*out.ReturnAddress); senderErr == nil {
ctx, cancel := context.WithTimeout(context.Background(), q.cancelTimeout)
defer cancel()
defer sender.Protocol.Close(ctx)
if result := sender.Client.Send(ctx, *out.Data); cloudevents.IsUndelivered(result) {
log.Errorf("failed to send(TO): %s result %v ", *out.ReturnAddress, result)
out.Status = channel.FAILED
Expand Down

0 comments on commit ebd6308

Please sign in to comment.