diff --git a/amqphandler.go b/amqphandler.go index b6aec41..39543d4 100644 --- a/amqphandler.go +++ b/amqphandler.go @@ -30,7 +30,7 @@ import ( ) type amqpHandler interface { - Handle(ctx context.Context, msg *amqp.Message) error + Handle(ctx context.Context, msg *amqp.Message, r *amqp.Receiver) error } // amqpAdapterHandler is a middleware handler that translates amqp messages into servicebus messages @@ -46,10 +46,10 @@ func newAmqpAdapterHandler(receiver *Receiver, next Handler) *amqpAdapterHandler } } -func (h *amqpAdapterHandler) Handle(ctx context.Context, msg *amqp.Message) error { +func (h *amqpAdapterHandler) Handle(ctx context.Context, msg *amqp.Message, r *amqp.Receiver) error { const optName = "sb.amqpHandler.Handle" - event, err := messageFromAMQPMessage(msg) + event, err := messageFromAMQPMessage(msg, r) if err != nil { _, span := h.receiver.startConsumerSpanFromContext(ctx, optName) span.Logger().Error(err) diff --git a/auto_forward_example_test.go b/auto_forward_example_test.go index 55f474c..a54b819 100644 --- a/auto_forward_example_test.go +++ b/auto_forward_example_test.go @@ -6,7 +6,7 @@ import ( "os" "time" - "github.com/Azure/azure-service-bus-go" + servicebus "github.com/Azure/azure-service-bus-go" ) type MessagePrinter struct{} @@ -79,12 +79,12 @@ func Example_autoForward() { } func ensureQueue(ctx context.Context, qm *servicebus.QueueManager, name string, opts ...servicebus.QueueManagementOption) (*servicebus.QueueEntity, error) { - qe, err := qm.Get(ctx, name) + _, err := qm.Get(ctx, name) if err == nil { _ = qm.Delete(ctx, name) } - qe, err = qm.Put(ctx, name, opts...) + qe, err := qm.Put(ctx, name, opts...) if err != nil { fmt.Println(err) return nil, err diff --git a/batch_disposition.go b/batch_disposition.go index 421409c..7d0b548 100644 --- a/batch_disposition.go +++ b/batch_disposition.go @@ -58,7 +58,7 @@ func (bdi *BatchDispositionIterator) Done() bool { // Next iterates to the next LockToken func (bdi *BatchDispositionIterator) Next() (uuid *uuid.UUID) { - if done := bdi.Done(); done == false { + if done := bdi.Done(); !done { uuid = bdi.LockTokenIDs[bdi.cursor] bdi.cursor++ } diff --git a/entity.go b/entity.go index 1de469b..88cffe5 100644 --- a/entity.go +++ b/entity.go @@ -92,7 +92,7 @@ func (e *entity) getEntity() *entity { // unable to complete the operation, or an empty slice of messages and an instance of "ErrNoMessages" signifying that // there are currently no messages in the queue with a sequence ID larger than previously viewed ones. func (re *receivingEntity) Peek(ctx context.Context, options ...PeekOption) (MessageIterator, error) { - ctx, span := re.entity.startSpanFromContext(ctx, "sb.entity.Peek") + _, span := re.entity.startSpanFromContext(ctx, "sb.entity.Peek") defer span.End() return newPeekIterator(re.entity, options...) diff --git a/errors.go b/errors.go index 79addaf..b812371 100644 --- a/errors.go +++ b/errors.go @@ -62,7 +62,7 @@ func (e ErrMissingField) Error() string { } func (e ErrMalformedMessage) Error() string { - return fmt.Sprintf("message was expected in the form of []byte was not a []byte") + return "message was expected in the form of []byte was not a []byte" } // NewErrIncorrectType lets you skip using the `reflect` package. Just provide a variable of the desired type as diff --git a/go.mod b/go.mod index 356457a..d4ac7f9 100644 --- a/go.mod +++ b/go.mod @@ -3,13 +3,14 @@ module github.com/Azure/azure-service-bus-go go 1.12 require ( - github.com/Azure/azure-amqp-common-go/v3 v3.2.0 + github.com/Azure/azure-amqp-common-go/v3 v3.2.1 github.com/Azure/azure-sdk-for-go v51.1.0+incompatible - github.com/Azure/go-amqp v0.13.13 + github.com/Azure/go-amqp v0.16.0 github.com/Azure/go-autorest/autorest v0.11.18 github.com/Azure/go-autorest/autorest/adal v0.9.13 github.com/Azure/go-autorest/autorest/date v0.3.0 github.com/Azure/go-autorest/autorest/to v0.4.0 + github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect github.com/devigned/tab v0.1.1 github.com/joho/godotenv v1.3.0 github.com/mitchellh/mapstructure v1.3.3 diff --git a/go.sum b/go.sum index a586c30..5fe49e2 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,9 @@ -github.com/Azure/azure-amqp-common-go/v3 v3.2.0 h1:BK/3P4TW4z2HLD6G5tMlHRvptOxxi4s9ee5r8sdHBUs= -github.com/Azure/azure-amqp-common-go/v3 v3.2.0/go.mod h1:zN7QL/vfCsq3XQxQaTkg4ScO786CA2rQnZ1LXX7QryE= +github.com/Azure/azure-amqp-common-go/v3 v3.2.1 h1:uQyDk81yn5hTP1pW4Za+zHzy97/f4vDz9o1d/exI4j4= +github.com/Azure/azure-amqp-common-go/v3 v3.2.1/go.mod h1:O6X1iYHP7s2x7NjUKsXVhkwWrQhxrd+d8/3rRadj4CI= github.com/Azure/azure-sdk-for-go v51.1.0+incompatible h1:7uk6GWtUqKg6weLv2dbKnzwb0ml1Qn70AdtRccZ543w= github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= -github.com/Azure/go-amqp v0.13.13 h1:OBPwCO50EzniOyZR0M4VbGJYDxceIy3SFOnKVMJktdY= -github.com/Azure/go-amqp v0.13.13/go.mod h1:D5ZrjQqB1dyp1A+G73xeL/kNn7D5qHJIIsNNps7YNmk= +github.com/Azure/go-amqp v0.16.0 h1:6mhxUxaKLjMtHlGqzeih/LKqjUPLZxbM6zwfz5/C4NQ= +github.com/Azure/go-amqp v0.16.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg= github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= github.com/Azure/go-autorest/autorest v0.11.18 h1:90Y4srNYrwOtAgVo3ndrQkTYn6kf1Eg/AjTFJ8Is2aM= @@ -74,8 +74,6 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OH github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= diff --git a/message.go b/message.go index e4328a7..397d2a5 100644 --- a/message.go +++ b/message.go @@ -58,6 +58,7 @@ type ( ec entityConnector // if an entityConnector is present, a message should send disposition via mgmt useSession bool sessionID *string + receiver *amqp.Receiver } // DispositionAction represents the action to notify Azure Service Bus of the Message's disposition @@ -133,7 +134,7 @@ func NewMessage(data []byte) *Message { // getLinkName returns associated link name or empty string if receiver or link is not defined, func (m *Message) getLinkName() string { - return m.message.GetLinkName() + return m.message.LinkName() } // CompleteAction will notify Azure Service Bus that the message was successfully handled and should be deleted from the @@ -187,7 +188,7 @@ func (m *Message) Complete(ctx context.Context) error { return sendMgmtDisposition(ctx, m, disposition{Status: completedDisposition}) } - return m.message.Accept(ctx) + return m.receiver.AcceptMessage(ctx, m.message) } // Abandon will notify Azure Service Bus the message failed but should be re-queued for delivery. @@ -202,7 +203,7 @@ func (m *Message) Abandon(ctx context.Context) error { return sendMgmtDisposition(ctx, m, d) } - return m.message.Modify(ctx, false, false, nil) + return m.receiver.ModifyMessage(ctx, m.message, false, false, nil) } // Defer will set aside the message for later processing @@ -228,7 +229,7 @@ func (m *Message) Defer(ctx context.Context) error { _, span := m.startSpanFromContext(ctx, "sb.Message.Defer") defer span.End() - return m.message.Modify(ctx, true, true, nil) + return m.receiver.ModifyMessage(ctx, m.message, true, true, nil) } // Release will notify Azure Service Bus the message should be re-queued without failure. @@ -259,7 +260,7 @@ func (m *Message) DeadLetter(ctx context.Context, err error) error { Condition: amqp.ErrorCondition(ErrorInternalError), Description: err.Error(), } - return m.message.Reject(ctx, &amqpErr) + return m.receiver.RejectMessage(ctx, m.message, &amqpErr) } @@ -291,7 +292,7 @@ func (m *Message) DeadLetterWithInfo(ctx context.Context, err error, condition M Description: err.Error(), Info: info, } - return m.message.Reject(ctx, &amqpErr) + return m.receiver.RejectMessage(ctx, m.message, &amqpErr) } // ScheduleAt will ensure Azure Service Bus delivers the message after the time specified @@ -402,14 +403,15 @@ func addMapToAnnotations(a amqp.Annotations, m map[string]interface{}) amqp.Anno return a } -func messageFromAMQPMessage(msg *amqp.Message) (*Message, error) { - return newMessage(msg.GetData(), msg) +func messageFromAMQPMessage(msg *amqp.Message, r *amqp.Receiver) (*Message, error) { + return newMessage(msg.GetData(), msg, r) } -func newMessage(data []byte, amqpMsg *amqp.Message) (*Message, error) { +func newMessage(data []byte, amqpMsg *amqp.Message, r *amqp.Receiver) (*Message, error) { msg := &Message{ - Data: data, - message: amqpMsg, + Data: data, + message: amqpMsg, + receiver: r, } if amqpMsg == nil { diff --git a/message_test.go b/message_test.go index 1a63da9..7f1fad7 100644 --- a/message_test.go +++ b/message_test.go @@ -173,7 +173,7 @@ func (suite *serviceBusSuite) TestAMQPMessageToMessage() { Data: [][]byte{[]byte("foo")}, } - msg, err := messageFromAMQPMessage(aMsg) + msg, err := messageFromAMQPMessage(aMsg, nil) if suite.NoError(err) { suite.Equal(msg.ID, aMsg.Properties.MessageID, "messageID") suite.Equal(*msg.GroupSequence, aMsg.Properties.GroupSequence, "groupSequence") diff --git a/namespace.go b/namespace.go index d83c8de..40c4433 100644 --- a/namespace.go +++ b/namespace.go @@ -282,7 +282,7 @@ func (ns *Namespace) negotiateClaim(ctx context.Context, client *amqp.Client, en tab.For(refreshCtx).Error(err) select { case <-refreshCtx.Done(): - break + return case <-time.After(5 * time.Second): // retry } diff --git a/prefetch_example_test.go b/prefetch_example_test.go index 66f9861..735f54a 100644 --- a/prefetch_example_test.go +++ b/prefetch_example_test.go @@ -7,7 +7,7 @@ import ( "os" "time" - "github.com/Azure/azure-service-bus-go" + servicebus "github.com/Azure/azure-service-bus-go" ) func Example_prefetch() { @@ -84,7 +84,7 @@ func Example_prefetch() { fmt.Println(err) return } - totalPrefetch1 <- time.Now().Sub(start) + totalPrefetch1 <- time.Since(start) }() totalPrefetch1000 := make(chan time.Duration) @@ -94,7 +94,7 @@ func Example_prefetch() { fmt.Println(err) return } - totalPrefetch1000 <- time.Now().Sub(start) + totalPrefetch1000 <- time.Since(start) }() tp1 := <-totalPrefetch1 diff --git a/priority_subscription_example_test.go b/priority_subscription_example_test.go index a4496e9..93b42bd 100644 --- a/priority_subscription_example_test.go +++ b/priority_subscription_example_test.go @@ -8,7 +8,7 @@ import ( "strings" "time" - "github.com/Azure/azure-service-bus-go" + servicebus "github.com/Azure/azure-service-bus-go" ) type PrioritySubscription struct { @@ -171,12 +171,12 @@ func Example_prioritySubscriptions() { } func ensureTopic(ctx context.Context, tm *servicebus.TopicManager, name string, opts ...servicebus.TopicManagementOption) (*servicebus.TopicEntity, error) { - te, err := tm.Get(ctx, name) + _, err := tm.Get(ctx, name) if err == nil { _ = tm.Delete(ctx, name) } - te, err = tm.Put(ctx, name, opts...) + te, err := tm.Put(ctx, name, opts...) if err != nil { fmt.Println(err) return nil, err @@ -186,12 +186,12 @@ func ensureTopic(ctx context.Context, tm *servicebus.TopicManager, name string, } func ensureSubscription(ctx context.Context, sm *servicebus.SubscriptionManager, name string, opts ...servicebus.SubscriptionManagementOption) (*servicebus.SubscriptionEntity, error) { - subEntity, err := sm.Get(ctx, name) + _, err := sm.Get(ctx, name) if err == nil { _ = sm.Delete(ctx, name) } - subEntity, err = sm.Put(ctx, name, opts...) + subEntity, err := sm.Put(ctx, name, opts...) if err != nil { fmt.Println(err) return nil, err diff --git a/receiver.go b/receiver.go index d3503ef..996e5c1 100644 --- a/receiver.go +++ b/receiver.go @@ -226,62 +226,6 @@ func (r *Receiver) Listen(ctx context.Context, handler Handler) *ListenerHandle } } -func (r *Receiver) handleMessage(ctx context.Context, msg *amqp.Message, handler Handler) { - const optName = "sb.Receiver.handleMessage" - - event, err := messageFromAMQPMessage(msg) - if err != nil { - _, span := r.startConsumerSpanFromContext(ctx, optName) - span.Logger().Error(err) - r.setLastError(err) - if r.doneListening != nil { - r.doneListening() - } - return - } - - ctx, span := tab.StartSpanWithRemoteParent(ctx, optName, event) - defer span.End() - - id := messageID(msg) - if idStr, ok := id.(string); ok { - span.AddAttributes(tab.StringAttribute("amqp.message.id", idStr)) - } - - if err := handler.Handle(ctx, event); err != nil { - // stop handling messages since the message consumer ran into an unexpected error - r.setLastError(err) - if r.doneListening != nil { - r.doneListening() - } - return - } - - // nothing more to be done. The message was settled when it was accepted by the Receiver - if r.mode == ReceiveAndDeleteMode { - return - } - - // nothing more to be done. The Receiver has no default disposition, so the handler is solely responsible for - // disposition - if r.DefaultDisposition == nil { - return - } - - // default disposition is set, so try to send the disposition. If the message disposition has already been set, the - // underlying AMQP library will ignore the second disposition respecting the disposition of the handler func. - if err := r.DefaultDisposition(ctx); err != nil { - // if an error is returned by the default disposition, then we must alert the message consumer as we can't - // be sure the final message disposition. - tab.For(ctx).Error(err) - r.setLastError(err) - if r.doneListening != nil { - r.doneListening() - } - return - } -} - func (r *Receiver) listenForMessages(ctx context.Context, handler amqpHandler) { ctx, span := r.startConsumerSpanFromContext(ctx, "sb.Receiver.listenForMessages") defer span.End() @@ -346,13 +290,12 @@ func (r *Receiver) listenForMessage(ctx context.Context, handler amqpHandler) er } receiver = r.receiver r.clientMu.RUnlock() - err := receiver.HandleMessage(ctx, func(message *amqp.Message) error { - return handler.Handle(ctx, message) - }) + msg, err := receiver.Receive(ctx) if err != nil { tab.For(ctx).Debug(err.Error()) return err } + handler.Handle(ctx, msg, r.receiver) return nil } diff --git a/rpc.go b/rpc.go index 5af4214..2b63910 100644 --- a/rpc.go +++ b/rpc.go @@ -370,7 +370,7 @@ func (r *rpcClient) ReceiveDeferred(ctx context.Context, mode ReceiveMode, seque return nil, err } - transformedMessages[i], err = messageFromAMQPMessage(&rehydrated) + transformedMessages[i], err = messageFromAMQPMessage(&rehydrated, nil) if err != nil { return nil, err } @@ -476,7 +476,7 @@ func (r *rpcClient) GetNextPage(ctx context.Context, fromSequenceNumber int64, m return nil, err } - transformedMessages[i], err = messageFromAMQPMessage(&rehydrated) + transformedMessages[i], err = messageFromAMQPMessage(&rehydrated, nil) if err != nil { tab.For(ctx).Error(err) return nil, err diff --git a/subscription.go b/subscription.go index 25b3e4d..cbb0cb3 100644 --- a/subscription.go +++ b/subscription.go @@ -36,12 +36,11 @@ type ( //Messages are received from a subscription identically to the way they are received from a queue. Subscription struct { *receivingEntity - Topic *Topic - receiver *Receiver - receiverMu sync.Mutex - receiveMode ReceiveMode - requiredSessionID *string - prefetchCount *uint32 + Topic *Topic + receiver *Receiver + receiverMu sync.Mutex + receiveMode ReceiveMode + prefetchCount *uint32 } // SubscriptionOption configures the Subscription Azure Service Bus client diff --git a/subscription_manager.go b/subscription_manager.go index 8e91451..2a293a6 100644 --- a/subscription_manager.go +++ b/subscription_manager.go @@ -553,7 +553,7 @@ func SubscriptionWithLockDuration(window *time.Duration) SubscriptionManagementO window = &duration } if *window > time.Duration(5*time.Minute) { - return fmt.Errorf("Lock duration must be shorter than 5 minutes got: %v", *window) + return fmt.Errorf("lock duration must be shorter than 5 minutes got: %v", *window) } s.LockDuration = ptrString(durationTo8601Seconds(*window)) diff --git a/subscription_test.go b/subscription_test.go index cda9bb6..055be91 100644 --- a/subscription_test.go +++ b/subscription_test.go @@ -347,6 +347,7 @@ func testSubscriptionWithFalseRule(ctx context.Context, t *testing.T, sm *Subscr _, err := sm.PutRule(ctx, s.Name, "falseRule", FalseFilter{}) require.NoError(t, err) rules, err := sm.ListRules(ctx, s.Name) + require.NoError(t, err) require.Len(t, rules, 2) rule := rules[1] assert.Equal(t, "falseRule", rule.Name) @@ -359,6 +360,7 @@ func testSubscriptionWithSQLFilterRule(ctx context.Context, t *testing.T, sm *Su _, err := sm.PutRule(ctx, s.Name, "sqlRuleNotNullLabel", SQLFilter{Expression: "label IS NOT NULL"}) require.NoError(t, err) rules, err := sm.ListRules(ctx, s.Name) + require.NoError(t, err) require.Len(t, rules, 2) rule := rules[1] assert.Equal(t, "sqlRuleNotNullLabel", rule.Name) @@ -377,6 +379,7 @@ func testSubscriptionWithCorrelationFilterRule(ctx context.Context, t *testing.T _, err := sm.PutRule(ctx, s.Name, "correlationRule", filter) require.NoError(t, err) rules, err := sm.ListRules(ctx, s.Name) + require.NoError(t, err) require.Len(t, rules, 2) rule := rules[1] assert.Equal(t, "correlationRule", rule.Name) @@ -659,6 +662,7 @@ func (suite *serviceBusSuite) TestSubscriptionSessionClient() { subName := suite.randEntityName() subCleanup := makeSubscription(ctx, t, topic, subName, SubscriptionWithRequiredSessions()) subscription, err := topic.NewSubscription(subName) + suite.Require().NoError(err) id, err := uuid.NewV4() suite.Require().NoError(err) sessionID := id.String()