Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature - Support for delayed exchange #3

Open
wants to merge 8 commits into
base: dev_master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 0 additions & 40 deletions Gopkg.lock

This file was deleted.

30 changes: 0 additions & 30 deletions Gopkg.toml

This file was deleted.

10 changes: 10 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module github.com/verloop/hedwig-go

go 1.19

require (
github.com/sirupsen/logrus v1.7.0
github.com/streadway/amqp v1.0.0
)

require golang.org/x/sys v0.0.0-20220927170352-d9d178bc13c6 // indirect
13 changes: 13 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220927170352-d9d178bc13c6 h1:cy1ko5847T/lJ45eyg/7uLprIE/amW5IXxGtEnQdYMI=
golang.org/x/sys v0.0.0-20220927170352-d9d178bc13c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
154 changes: 135 additions & 19 deletions hedwig.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package hedwig

import (
"context"
"errors"
"fmt"
"strconv"
"strings"
Expand All @@ -12,8 +14,9 @@ import (
)

const (
PublishChannel = "publish"
SubscribeChannel = "subscribe"
PublishChannel = "publish"
DelayedPublishChannel = "publish_delay"
SubscribeChannel = "subscribe"
)

type Callback func(<-chan amqp.Delivery, *sync.WaitGroup)
Expand All @@ -32,6 +35,20 @@ func DefaultSettings() *Settings {
}
}

func DefaultDelayedSettings() *Settings {
return &Settings{
Exchange: "hedwig-delayed",
ExchangeType: ExchangeTypeDelayed,
ExchangeArgs: amqp.Table{DelayedExchangeArgKey: amqp.ExchangeTopic},
HeartBeatInterval: 5 * time.Second,
SocketTimeout: 1 * time.Second,
Host: "localhost",
Port: 5672,
Consumer: &ConsumerSetting{
Queues: make(map[string]*QueueSetting)},
}
}

func DefaultQueueSetting(callback Callback, bindings ...string) *QueueSetting {
return &QueueSetting{
Bindings: bindings,
Expand All @@ -40,11 +57,14 @@ func DefaultQueueSetting(callback Callback, bindings ...string) *QueueSetting {
}
}

func New(settings *Settings) *Hedwig {
if settings == nil {
settings = DefaultSettings()
func New(exchangeSettings *Settings, delayedExchangeSettings *Settings) *Hedwig {
if exchangeSettings == nil {
exchangeSettings = DefaultSettings()
}
if delayedExchangeSettings == nil {
delayedExchangeSettings = DefaultDelayedSettings()
}
return &Hedwig{Settings: settings, wg: &sync.WaitGroup{}, channels: make(map[string]*amqp.Channel), consumeTags: make(map[string]bool)}
return &Hedwig{Settings: exchangeSettings, DelayedSettings: delayedExchangeSettings, wg: &sync.WaitGroup{}, channels: make(map[string]*amqp.Channel), consumeTags: make(map[string]bool)}
}

type QueueSetting struct {
Expand All @@ -54,7 +74,9 @@ type QueueSetting struct {
AutoDelete bool
Exclusive bool
NoAck bool
QueueArgs amqp.Table
}

type ConsumerSetting struct {
tag string
Queues map[string]*QueueSetting
Expand All @@ -76,13 +98,14 @@ type Settings struct {

type Hedwig struct {
sync.Mutex
wg *sync.WaitGroup
Settings *Settings
Error error
conn *amqp.Connection
channels map[string]*amqp.Channel
consumeTags map[string]bool
closedChan chan *amqp.Error
wg *sync.WaitGroup
Settings *Settings
DelayedSettings *Settings
Error error
conn *amqp.Connection
channels map[string]*amqp.Channel
consumeTags map[string]bool
closedChan chan *amqp.Error
}

func (h *Hedwig) AddQueue(qSetting *QueueSetting, qName string) error {
Expand All @@ -109,10 +132,50 @@ func (h *Hedwig) AddQueue(qSetting *QueueSetting, qName string) error {
return nil
}

func (h *Hedwig) PublishWithContext(ctx context.Context, key string, body []byte) (err error) {
return h.DoPublish(ctx, h.Settings.Exchange, PublishChannel, key, body, nil)
}

func (h *Hedwig) DelayedPublishWithContext(ctx context.Context, key string, body []byte, delay time.Duration) (err error) {
headers := amqp.Table{DelayHeader: delay.Milliseconds()}
return h.DoPublish(ctx, h.DelayedSettings.Exchange, DelayedPublishChannel, key, body, headers)
}

func (h *Hedwig) DoPublish(ctx context.Context, exchange, channel, key string, body []byte, headers map[string]interface{}) error {
h.Lock()
defer h.Unlock()

c, err := h.getChannel(channel)
if err != nil {
return err
}

if err := c.Publish(exchange, key, false, false, amqp.Publishing{
Body: body,
Headers: headers,
}); err != nil {
// We already listen to closedChan [ref connect()] when connections are dropped.
// In most cases github.com/streadway/amqp reports it.
// We have observed some cases where this is not reported and we end with stale connections.
// Only way to resolve this to restart the service to reconnect.

// We manually check for error while publishing and if we get an error which says connection has been closed, we
// notify on closedChan so that hedwig reconnects to RMQ
if errors.Is(err, amqp.ErrClosed) {
logrus.WithError(err).Error("Publish failed, reconnecting")
h.closedChan <- amqp.ErrClosed
}
return err
}
return nil
}

// Deprecated, use PublishWithContext
func (h *Hedwig) Publish(key string, body []byte) (err error) {
return h.PublishWithHeaders(key, body, nil)
}

// Deprecated, use DelayedPublishWithContext
func (h *Hedwig) PublishWithDelay(key string, body []byte, delay time.Duration) (err error) {
// from: https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/
// To delay a message a user must publish the message with the special header called x-delay which takes an integer
Expand All @@ -122,6 +185,7 @@ func (h *Hedwig) PublishWithDelay(key string, body []byte, delay time.Duration)
return h.PublishWithHeaders(key, body, headers)
}

// Deprecated, use DoPublish
func (h *Hedwig) PublishWithHeaders(key string, body []byte, headers map[string]interface{}) (err error) {
h.Lock()
defer h.Unlock()
Expand All @@ -131,12 +195,27 @@ func (h *Hedwig) PublishWithHeaders(key string, body []byte, headers map[string]
return err
}

return c.Publish(h.Settings.Exchange, key, false, false, amqp.Publishing{
if err := c.Publish(h.Settings.Exchange, key, false, false, amqp.Publishing{
Body: body,
Headers: headers,
})
}); err != nil {
// We already listen to closedChan [ref connect()] when connections are dropped.
// In most cases github.com/streadway/amqp reports it.
// We have observed some cases where this is not reported and we end with stale connections.
// Only way to resolve this to restart the service to reconnect.

// We manually check for error while publishing and if we get an error which says connection has been closed, we
// notify on closedChan so that hedwig reconnects to RMQ
if errors.Is(err, amqp.ErrClosed) {
logrus.WithError(err).Error("Publish failed, reconnecting")
h.closedChan <- amqp.ErrClosed
}
return err
}
return nil
}

// We continue to consume from the original hedwig exchange
func (h *Hedwig) Consume() error {
if h == nil {
return ErrNilHedwig
Expand Down Expand Up @@ -175,11 +254,12 @@ func (h *Hedwig) setupListeners() (err error) {
qSetting.Durable = false
qSetting.Exclusive = true
}
q, err := c.QueueDeclare(qName, qSetting.Durable, qSetting.AutoDelete, qSetting.Exclusive, false, nil)
q, err := c.QueueDeclare(qName, qSetting.Durable, qSetting.AutoDelete, qSetting.Exclusive, false, qSetting.QueueArgs)
if err != nil {
return err
}
for _, binding := range qSetting.Bindings {
// We only bind to the hedwig exchange
err := c.QueueBind(q.Name, binding, h.Settings.Exchange, false, nil)
if err != nil {
return err
Expand Down Expand Up @@ -222,15 +302,49 @@ func (h *Hedwig) getChannel(name string) (ch *amqp.Channel, err error) {
if err != nil {
return nil, err
}
err = h.channels[name].ExchangeDeclare(
h.Settings.Exchange, h.Settings.ExchangeType, true,
false, false, false, h.Settings.ExchangeArgs)

if name == PublishChannel || name == SubscribeChannel {
err = h.channels[name].ExchangeDeclare(
h.Settings.Exchange, h.Settings.ExchangeType, true,
false, false, false, h.Settings.ExchangeArgs,
)
if err != nil {
return nil, err
}
} else if name == DelayedPublishChannel && h.DelayedSettings != nil {
err = h.channels[name].ExchangeDeclare(
h.DelayedSettings.Exchange, h.DelayedSettings.ExchangeType, true,
false, false, false, h.DelayedSettings.ExchangeArgs,
)

if err != nil {
return nil, err
}

// Make sure we have an normal exchange declared
_, err := h.getChannel(PublishChannel)
if err != nil {
return nil, err
}

// Bind our normal exchange to delayed exchange
h.channels[name].ExchangeBind(
h.Settings.Exchange,
"#",
h.DelayedSettings.Exchange,
false,
nil,
)
}

if err != nil {
return nil, err
}
return h.channels[name], nil
}

// Both hedwig exchange and delayed exchange will use the same broker
// hence no need to update here. The same connection object can be used.
func (h *Hedwig) Disconnect() error {
h.Lock()
defer h.Unlock()
Expand Down Expand Up @@ -261,6 +375,8 @@ func (h *Hedwig) Disconnect() error {
return nil
}

// Both hedwig exchange and delayed exchange will use the same broker
// hence no need to update here. The same connection object can be used.
func (h *Hedwig) connect() (err error) {
if h == nil {
return ErrNilHedwig
Expand Down
3 changes: 2 additions & 1 deletion types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ const (
DelayedExchangeArgKey = "x-delayed-type"
// Following header should be used when you are publishing to an Delay exchange. The header value will be
// delay in milliseconds
DelayHeader = "x-delay"
DelayHeader = "x-delay"
MessageTTLArgKey = "x-message-ttl"
)