Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Update to latest go-amqp (#254)
Browse files Browse the repository at this point in the history
Includes updates due to API breaking changes.
Cleaned up various linter bugs.
Removed some dead code.
  • Loading branch information
jhendrixMSFT authored Oct 1, 2021
1 parent 0f63768 commit 4974e2f
Show file tree
Hide file tree
Showing 17 changed files with 53 additions and 106 deletions.
6 changes: 3 additions & 3 deletions amqphandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

type amqpHandler interface {
Handle(ctx context.Context, msg *amqp.Message) error
Handle(ctx context.Context, msg *amqp.Message, r *amqp.Receiver) error
}

// amqpAdapterHandler is a middleware handler that translates amqp messages into servicebus messages
Expand All @@ -46,10 +46,10 @@ func newAmqpAdapterHandler(receiver *Receiver, next Handler) *amqpAdapterHandler
}
}

func (h *amqpAdapterHandler) Handle(ctx context.Context, msg *amqp.Message) error {
func (h *amqpAdapterHandler) Handle(ctx context.Context, msg *amqp.Message, r *amqp.Receiver) error {
const optName = "sb.amqpHandler.Handle"

event, err := messageFromAMQPMessage(msg)
event, err := messageFromAMQPMessage(msg, r)
if err != nil {
_, span := h.receiver.startConsumerSpanFromContext(ctx, optName)
span.Logger().Error(err)
Expand Down
6 changes: 3 additions & 3 deletions auto_forward_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"os"
"time"

"github.com/Azure/azure-service-bus-go"
servicebus "github.com/Azure/azure-service-bus-go"
)

type MessagePrinter struct{}
Expand Down Expand Up @@ -79,12 +79,12 @@ func Example_autoForward() {
}

func ensureQueue(ctx context.Context, qm *servicebus.QueueManager, name string, opts ...servicebus.QueueManagementOption) (*servicebus.QueueEntity, error) {
qe, err := qm.Get(ctx, name)
_, err := qm.Get(ctx, name)
if err == nil {
_ = qm.Delete(ctx, name)
}

qe, err = qm.Put(ctx, name, opts...)
qe, err := qm.Put(ctx, name, opts...)
if err != nil {
fmt.Println(err)
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion batch_disposition.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (bdi *BatchDispositionIterator) Done() bool {

// Next iterates to the next LockToken
func (bdi *BatchDispositionIterator) Next() (uuid *uuid.UUID) {
if done := bdi.Done(); done == false {
if done := bdi.Done(); !done {
uuid = bdi.LockTokenIDs[bdi.cursor]
bdi.cursor++
}
Expand Down
2 changes: 1 addition & 1 deletion entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (e *entity) getEntity() *entity {
// unable to complete the operation, or an empty slice of messages and an instance of "ErrNoMessages" signifying that
// there are currently no messages in the queue with a sequence ID larger than previously viewed ones.
func (re *receivingEntity) Peek(ctx context.Context, options ...PeekOption) (MessageIterator, error) {
ctx, span := re.entity.startSpanFromContext(ctx, "sb.entity.Peek")
_, span := re.entity.startSpanFromContext(ctx, "sb.entity.Peek")
defer span.End()

return newPeekIterator(re.entity, options...)
Expand Down
2 changes: 1 addition & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (e ErrMissingField) Error() string {
}

func (e ErrMalformedMessage) Error() string {
return fmt.Sprintf("message was expected in the form of []byte was not a []byte")
return "message was expected in the form of []byte was not a []byte"
}

// NewErrIncorrectType lets you skip using the `reflect` package. Just provide a variable of the desired type as
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ module github.com/Azure/azure-service-bus-go
go 1.12

require (
github.com/Azure/azure-amqp-common-go/v3 v3.2.0
github.com/Azure/azure-amqp-common-go/v3 v3.2.1
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible
github.com/Azure/go-amqp v0.13.13
github.com/Azure/go-amqp v0.16.0
github.com/Azure/go-autorest/autorest v0.11.18
github.com/Azure/go-autorest/autorest/adal v0.9.13
github.com/Azure/go-autorest/autorest/date v0.3.0
github.com/Azure/go-autorest/autorest/to v0.4.0
github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect
github.com/devigned/tab v0.1.1
github.com/joho/godotenv v1.3.0
github.com/mitchellh/mapstructure v1.3.3
Expand Down
10 changes: 4 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
github.com/Azure/azure-amqp-common-go/v3 v3.2.0 h1:BK/3P4TW4z2HLD6G5tMlHRvptOxxi4s9ee5r8sdHBUs=
github.com/Azure/azure-amqp-common-go/v3 v3.2.0/go.mod h1:zN7QL/vfCsq3XQxQaTkg4ScO786CA2rQnZ1LXX7QryE=
github.com/Azure/azure-amqp-common-go/v3 v3.2.1 h1:uQyDk81yn5hTP1pW4Za+zHzy97/f4vDz9o1d/exI4j4=
github.com/Azure/azure-amqp-common-go/v3 v3.2.1/go.mod h1:O6X1iYHP7s2x7NjUKsXVhkwWrQhxrd+d8/3rRadj4CI=
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible h1:7uk6GWtUqKg6weLv2dbKnzwb0ml1Qn70AdtRccZ543w=
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/go-amqp v0.13.13 h1:OBPwCO50EzniOyZR0M4VbGJYDxceIy3SFOnKVMJktdY=
github.com/Azure/go-amqp v0.13.13/go.mod h1:D5ZrjQqB1dyp1A+G73xeL/kNn7D5qHJIIsNNps7YNmk=
github.com/Azure/go-amqp v0.16.0 h1:6mhxUxaKLjMtHlGqzeih/LKqjUPLZxbM6zwfz5/C4NQ=
github.com/Azure/go-amqp v0.16.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest v0.11.18 h1:90Y4srNYrwOtAgVo3ndrQkTYn6kf1Eg/AjTFJ8Is2aM=
Expand Down Expand Up @@ -74,8 +74,6 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OH
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
Expand Down
24 changes: 13 additions & 11 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type (
ec entityConnector // if an entityConnector is present, a message should send disposition via mgmt
useSession bool
sessionID *string
receiver *amqp.Receiver
}

// DispositionAction represents the action to notify Azure Service Bus of the Message's disposition
Expand Down Expand Up @@ -133,7 +134,7 @@ func NewMessage(data []byte) *Message {

// getLinkName returns associated link name or empty string if receiver or link is not defined,
func (m *Message) getLinkName() string {
return m.message.GetLinkName()
return m.message.LinkName()
}

// CompleteAction will notify Azure Service Bus that the message was successfully handled and should be deleted from the
Expand Down Expand Up @@ -187,7 +188,7 @@ func (m *Message) Complete(ctx context.Context) error {
return sendMgmtDisposition(ctx, m, disposition{Status: completedDisposition})
}

return m.message.Accept(ctx)
return m.receiver.AcceptMessage(ctx, m.message)
}

// Abandon will notify Azure Service Bus the message failed but should be re-queued for delivery.
Expand All @@ -202,7 +203,7 @@ func (m *Message) Abandon(ctx context.Context) error {
return sendMgmtDisposition(ctx, m, d)
}

return m.message.Modify(ctx, false, false, nil)
return m.receiver.ModifyMessage(ctx, m.message, false, false, nil)
}

// Defer will set aside the message for later processing
Expand All @@ -228,7 +229,7 @@ func (m *Message) Defer(ctx context.Context) error {
_, span := m.startSpanFromContext(ctx, "sb.Message.Defer")
defer span.End()

return m.message.Modify(ctx, true, true, nil)
return m.receiver.ModifyMessage(ctx, m.message, true, true, nil)
}

// Release will notify Azure Service Bus the message should be re-queued without failure.
Expand Down Expand Up @@ -259,7 +260,7 @@ func (m *Message) DeadLetter(ctx context.Context, err error) error {
Condition: amqp.ErrorCondition(ErrorInternalError),
Description: err.Error(),
}
return m.message.Reject(ctx, &amqpErr)
return m.receiver.RejectMessage(ctx, m.message, &amqpErr)

}

Expand Down Expand Up @@ -291,7 +292,7 @@ func (m *Message) DeadLetterWithInfo(ctx context.Context, err error, condition M
Description: err.Error(),
Info: info,
}
return m.message.Reject(ctx, &amqpErr)
return m.receiver.RejectMessage(ctx, m.message, &amqpErr)
}

// ScheduleAt will ensure Azure Service Bus delivers the message after the time specified
Expand Down Expand Up @@ -402,14 +403,15 @@ func addMapToAnnotations(a amqp.Annotations, m map[string]interface{}) amqp.Anno
return a
}

func messageFromAMQPMessage(msg *amqp.Message) (*Message, error) {
return newMessage(msg.GetData(), msg)
func messageFromAMQPMessage(msg *amqp.Message, r *amqp.Receiver) (*Message, error) {
return newMessage(msg.GetData(), msg, r)
}

func newMessage(data []byte, amqpMsg *amqp.Message) (*Message, error) {
func newMessage(data []byte, amqpMsg *amqp.Message, r *amqp.Receiver) (*Message, error) {
msg := &Message{
Data: data,
message: amqpMsg,
Data: data,
message: amqpMsg,
receiver: r,
}

if amqpMsg == nil {
Expand Down
2 changes: 1 addition & 1 deletion message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (suite *serviceBusSuite) TestAMQPMessageToMessage() {
Data: [][]byte{[]byte("foo")},
}

msg, err := messageFromAMQPMessage(aMsg)
msg, err := messageFromAMQPMessage(aMsg, nil)
if suite.NoError(err) {
suite.Equal(msg.ID, aMsg.Properties.MessageID, "messageID")
suite.Equal(*msg.GroupSequence, aMsg.Properties.GroupSequence, "groupSequence")
Expand Down
2 changes: 1 addition & 1 deletion namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (ns *Namespace) negotiateClaim(ctx context.Context, client *amqp.Client, en
tab.For(refreshCtx).Error(err)
select {
case <-refreshCtx.Done():
break
return
case <-time.After(5 * time.Second):
// retry
}
Expand Down
6 changes: 3 additions & 3 deletions prefetch_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"os"
"time"

"github.com/Azure/azure-service-bus-go"
servicebus "github.com/Azure/azure-service-bus-go"
)

func Example_prefetch() {
Expand Down Expand Up @@ -84,7 +84,7 @@ func Example_prefetch() {
fmt.Println(err)
return
}
totalPrefetch1 <- time.Now().Sub(start)
totalPrefetch1 <- time.Since(start)
}()

totalPrefetch1000 := make(chan time.Duration)
Expand All @@ -94,7 +94,7 @@ func Example_prefetch() {
fmt.Println(err)
return
}
totalPrefetch1000 <- time.Now().Sub(start)
totalPrefetch1000 <- time.Since(start)
}()

tp1 := <-totalPrefetch1
Expand Down
10 changes: 5 additions & 5 deletions priority_subscription_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"strings"
"time"

"github.com/Azure/azure-service-bus-go"
servicebus "github.com/Azure/azure-service-bus-go"
)

type PrioritySubscription struct {
Expand Down Expand Up @@ -171,12 +171,12 @@ func Example_prioritySubscriptions() {
}

func ensureTopic(ctx context.Context, tm *servicebus.TopicManager, name string, opts ...servicebus.TopicManagementOption) (*servicebus.TopicEntity, error) {
te, err := tm.Get(ctx, name)
_, err := tm.Get(ctx, name)
if err == nil {
_ = tm.Delete(ctx, name)
}

te, err = tm.Put(ctx, name, opts...)
te, err := tm.Put(ctx, name, opts...)
if err != nil {
fmt.Println(err)
return nil, err
Expand All @@ -186,12 +186,12 @@ func ensureTopic(ctx context.Context, tm *servicebus.TopicManager, name string,
}

func ensureSubscription(ctx context.Context, sm *servicebus.SubscriptionManager, name string, opts ...servicebus.SubscriptionManagementOption) (*servicebus.SubscriptionEntity, error) {
subEntity, err := sm.Get(ctx, name)
_, err := sm.Get(ctx, name)
if err == nil {
_ = sm.Delete(ctx, name)
}

subEntity, err = sm.Put(ctx, name, opts...)
subEntity, err := sm.Put(ctx, name, opts...)
if err != nil {
fmt.Println(err)
return nil, err
Expand Down
61 changes: 2 additions & 59 deletions receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,62 +226,6 @@ func (r *Receiver) Listen(ctx context.Context, handler Handler) *ListenerHandle
}
}

func (r *Receiver) handleMessage(ctx context.Context, msg *amqp.Message, handler Handler) {
const optName = "sb.Receiver.handleMessage"

event, err := messageFromAMQPMessage(msg)
if err != nil {
_, span := r.startConsumerSpanFromContext(ctx, optName)
span.Logger().Error(err)
r.setLastError(err)
if r.doneListening != nil {
r.doneListening()
}
return
}

ctx, span := tab.StartSpanWithRemoteParent(ctx, optName, event)
defer span.End()

id := messageID(msg)
if idStr, ok := id.(string); ok {
span.AddAttributes(tab.StringAttribute("amqp.message.id", idStr))
}

if err := handler.Handle(ctx, event); err != nil {
// stop handling messages since the message consumer ran into an unexpected error
r.setLastError(err)
if r.doneListening != nil {
r.doneListening()
}
return
}

// nothing more to be done. The message was settled when it was accepted by the Receiver
if r.mode == ReceiveAndDeleteMode {
return
}

// nothing more to be done. The Receiver has no default disposition, so the handler is solely responsible for
// disposition
if r.DefaultDisposition == nil {
return
}

// default disposition is set, so try to send the disposition. If the message disposition has already been set, the
// underlying AMQP library will ignore the second disposition respecting the disposition of the handler func.
if err := r.DefaultDisposition(ctx); err != nil {
// if an error is returned by the default disposition, then we must alert the message consumer as we can't
// be sure the final message disposition.
tab.For(ctx).Error(err)
r.setLastError(err)
if r.doneListening != nil {
r.doneListening()
}
return
}
}

func (r *Receiver) listenForMessages(ctx context.Context, handler amqpHandler) {
ctx, span := r.startConsumerSpanFromContext(ctx, "sb.Receiver.listenForMessages")
defer span.End()
Expand Down Expand Up @@ -346,13 +290,12 @@ func (r *Receiver) listenForMessage(ctx context.Context, handler amqpHandler) er
}
receiver = r.receiver
r.clientMu.RUnlock()
err := receiver.HandleMessage(ctx, func(message *amqp.Message) error {
return handler.Handle(ctx, message)
})
msg, err := receiver.Receive(ctx)
if err != nil {
tab.For(ctx).Debug(err.Error())
return err
}
handler.Handle(ctx, msg, r.receiver)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func (r *rpcClient) ReceiveDeferred(ctx context.Context, mode ReceiveMode, seque
return nil, err
}

transformedMessages[i], err = messageFromAMQPMessage(&rehydrated)
transformedMessages[i], err = messageFromAMQPMessage(&rehydrated, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -476,7 +476,7 @@ func (r *rpcClient) GetNextPage(ctx context.Context, fromSequenceNumber int64, m
return nil, err
}

transformedMessages[i], err = messageFromAMQPMessage(&rehydrated)
transformedMessages[i], err = messageFromAMQPMessage(&rehydrated, nil)
if err != nil {
tab.For(ctx).Error(err)
return nil, err
Expand Down
11 changes: 5 additions & 6 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,11 @@ type (
//Messages are received from a subscription identically to the way they are received from a queue.
Subscription struct {
*receivingEntity
Topic *Topic
receiver *Receiver
receiverMu sync.Mutex
receiveMode ReceiveMode
requiredSessionID *string
prefetchCount *uint32
Topic *Topic
receiver *Receiver
receiverMu sync.Mutex
receiveMode ReceiveMode
prefetchCount *uint32
}

// SubscriptionOption configures the Subscription Azure Service Bus client
Expand Down
Loading

0 comments on commit 4974e2f

Please sign in to comment.