diff --git a/pubsub/azure/servicebus/servicebus.go b/pubsub/azure/servicebus/servicebus.go index 0e2448d5d8..e4cb6e8298 100644 --- a/pubsub/azure/servicebus/servicebus.go +++ b/pubsub/azure/servicebus/servicebus.go @@ -413,7 +413,14 @@ func (a *azureServiceBus) Subscribe(req pubsub.SubscribeRequest, handler pubsub. a.metadata.MaxActiveMessages, a.metadata.MaxActiveMessagesRecoveryInSec) if innerErr != nil { - a.logger.Error(innerErr) + var detachError *amqp.DetachError + var ampqError *amqp.Error + if errors.Is(innerErr, detachError) || + (errors.As(innerErr, &qError) && ampqError.Condition == amqp.ErrorDetachForced) { + a.logger.Debug(innerErr) + } else { + a.logger.Error(innerErr) + } } cancel() // Cancel receive context diff --git a/pubsub/azure/servicebus/subscription.go b/pubsub/azure/servicebus/subscription.go index 1d60e811eb..ae7f6bf837 100644 --- a/pubsub/azure/servicebus/subscription.go +++ b/pubsub/azure/servicebus/subscription.go @@ -3,7 +3,6 @@ package servicebus import ( "context" "fmt" - "strings" "sync" "time" @@ -203,11 +202,7 @@ func (s *subscription) tryRenewLocks() { func (s *subscription) receiveMessage(ctx context.Context, handler azservicebus.HandlerFunc) error { s.logger.Debugf("Waiting to receive message on topic %s", s.topic) if err := s.entity.ReceiveOne(ctx, handler); err != nil { - if strings.Contains(err.Error(), "force detached") { - return nil - } - - return fmt.Errorf("%s error receiving message on topic %s, %s", errorMessagePrefix, s.topic, err) + return fmt.Errorf("%s error receiving message on topic %s, %w", errorMessagePrefix, s.topic, err) } return nil