diff --git a/pkg/protocol/amqp/amqp.go b/pkg/protocol/amqp/amqp.go index 5ffd712..082498f 100644 --- a/pkg/protocol/amqp/amqp.go +++ b/pkg/protocol/amqp/amqp.go @@ -286,7 +286,6 @@ func (q *Router) QDRRouter(wg *sync.WaitGroup) { localmetrics.UpdateStatusCheckCount(statusAddress, localmetrics.FAILED, 1) } } - sendToStatusChannel := func(d *channel.DataChan, e *cloudevents.Event) { if d.StatusChan == nil || e == nil { return @@ -461,6 +460,7 @@ func (q *Router) ReceiveMsg(d *channel.DataChan, f func(d *channel.DataChan, e * } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() + defer p.Close(ctx) msg, errRec := p.Receive(ctx) if errRec != nil { log.Errorf("error in waiting for current state %s = %s", errRec, *d.ReturnAddress) @@ -592,6 +592,7 @@ func (q *Router) setReceiver(wg *sync.WaitGroup, d *channel.DataChan) error { if sender, senderErr = q.NewSenderObject(*out.ReturnAddress); senderErr == nil { ctx, cancel := context.WithTimeout(context.Background(), q.cancelTimeout) defer cancel() + defer sender.Protocol.Close(ctx) if result := sender.Client.Send(ctx, *out.Data); cloudevents.IsUndelivered(result) { log.Errorf("failed to send(TO): %s result %v ", *out.ReturnAddress, result) out.Status = channel.FAILED