Skip to content

Commit

Permalink
Merge pull request Azure#2 from devigned/receiverCancel
Browse files Browse the repository at this point in the history
Receiver cancel and partitioned queue FIFO
  • Loading branch information
devigned authored Jan 23, 2018
2 parents 8eefd6c + e43a9ab commit 9b4011f
Show file tree
Hide file tree
Showing 7 changed files with 332 additions and 154 deletions.
20 changes: 13 additions & 7 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

[[constraint]]
name = "pack.ag/amqp"
branch = "channels"
branch = "master"

[[constraint]]
name = "github.com/Azure/azure-sdk-for-go"
Expand All @@ -13,3 +13,7 @@
[[constraint]]
name = "github.com/sirupsen/logrus"
version = "1.0.4"

[[constraint]]
name = "github.com/satori/go.uuid"
version = "1.2.0"
148 changes: 148 additions & 0 deletions receiver.go
Original file line number Diff line number Diff line change
@@ -1 +1,149 @@
package servicebus

import (
"context"
log "github.com/sirupsen/logrus"
"net"
"pack.ag/amqp"
"time"
)

// Receiver provides session and link handling for a receiving entity path
type Receiver struct {
client *amqp.Client
session *Session
receiver *amqp.Receiver
entityPath string
done chan struct{}
}

// NewReceiver creates a new Service Bus message listener given an AMQP client and an entity path
func NewReceiver(client *amqp.Client, entityPath string) (*Receiver, error) {
receiver := &Receiver{
client: client,
entityPath: entityPath,
done: make(chan struct{}),
}
err := receiver.newSessionAndLink()
if err != nil {
return nil, err
}
return receiver, nil
}

// Close will close the AMQP session and link of the receiver
func (r *Receiver) Close() error {
close(r.done)

err := r.receiver.Close()
if err != nil {
return err
}

err = r.session.Close()
if err != nil {
return err
}

return nil
}

// Recover will attempt to close the current session and link, then rebuild them
func (r *Receiver) Recover() error {
err := r.Close()
if err != nil {
return err
}

err = r.newSessionAndLink()
if err != nil {
return err
}

return nil
}

// Listen start a listener for messages sent to the entity path
func (r *Receiver) Listen(handler Handler) {
messages := make(chan *amqp.Message)
go r.listenForMessages(messages)
go r.handleMessages(messages, handler)
}

func (r *Receiver) handleMessages(messages chan *amqp.Message, handler Handler) {
for {
select {
case <-r.done:
log.Debug("done handling messages")
return
case msg := <-messages:
ctx := context.Background()
id := interface{}("null")
if msg.Properties != nil {
id = msg.Properties.MessageID
}
log.Debugf("Message id: %s is being passed to handler", id)
err := handler(ctx, msg)

if err != nil {
msg.Reject()
log.Debugf("Message rejected: id: %s", id)
} else {
// Accept message
msg.Accept()
log.Debugf("Message accepted: id: %s", id)
}
}
}
}

func (r *Receiver) listenForMessages(msgChan chan *amqp.Message) {
for {
select {
case <-r.done:
log.Debug("done listenting for messages")
close(msgChan)
return
default:
log.Debug("attempting to receive messages")
waitCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
msg, err := r.receiver.Receive(waitCtx)
cancel()
if err, ok := err.(net.Error); ok && err.Timeout() {
log.Debug("attempting to receive messages timed out")
continue
} else if err != nil {
log.Fatalln(err)
}
if msg != nil {
id := interface{}("null")
if msg.Properties != nil {
id = msg.Properties.MessageID
}
log.Debugf("Message received: %s", id)
msgChan <- msg
}
}
}
}

// newSessionAndLink will replace the session and link on the receiver
func (r *Receiver) newSessionAndLink() error {
amqpSession, err := r.client.NewSession()
if err != nil {
return err
}

amqpReceiver, err := amqpSession.NewReceiver(
amqp.LinkAddress(r.entityPath),
amqp.LinkCredit(10),
amqp.LinkBatching(true))
if err != nil {
return err
}

r.session = NewSession(amqpSession)
r.receiver = amqpReceiver

return nil
}
96 changes: 96 additions & 0 deletions sender.go
Original file line number Diff line number Diff line change
@@ -1 +1,97 @@
package servicebus

import (
"context"
"pack.ag/amqp"
)

// Sender provides session and link handling for an sending entity path
type Sender struct {
client *amqp.Client
session *Session
sender *amqp.Sender
entityPath string
Name string
}

// NewSender creates a new Service Bus message sender given an AMQP client and entity path
func NewSender(client *amqp.Client, entityPath string) (*Sender, error) {
sender := &Sender{
client: client,
entityPath: entityPath,
}

err := sender.newSessionAndLink()
if err != nil {
return nil, err
}

return sender, nil
}

// Recover will attempt to close the current session and link, then rebuild them
func (s *Sender) Recover() error {
err := s.Close()
if err != nil {
return err
}

err = s.newSessionAndLink()
if err != nil {
return err
}

return nil
}

// Close will close the AMQP session and link of the sender
func (s *Sender) Close() error {
err := s.sender.Close()
if err != nil {
return err
}

err = s.session.Close()
if err != nil {
return err
}
return nil
}

// Send will send a message using the session and link
func (s *Sender) Send(ctx context.Context, msg *amqp.Message) error {
// TODO: Add in recovery logic in case the link / session has gone down
s.prepareMessage(msg)
err := s.sender.Send(ctx, msg)
if err != nil {
return err
}
return nil
}

func (s *Sender) prepareMessage(msg *amqp.Message) {
if msg.Properties == nil {
msg.Properties = &amqp.MessageProperties{}
}

if msg.Properties.GroupID == "" {
msg.Properties.GroupID = s.session.SessionID
}
}

// newSessionAndLink will replace the existing session and link
func (s *Sender) newSessionAndLink() error {
amqpSession, err := s.client.NewSession()
if err != nil {
return err
}

amqpSender, err := amqpSession.NewSender(amqp.LinkAddress(s.entityPath))
if err != nil {
return err
}

s.session = NewSession(amqpSession)
s.sender = amqpSender
return nil
}
Loading

0 comments on commit 9b4011f

Please sign in to comment.