From ddd80ff876bf43287d669abf75be610268e75f14 Mon Sep 17 00:00:00 2001 From: Dmytro Kasianenko Date: Wed, 3 Jan 2024 16:13:46 +0100 Subject: [PATCH] fix(publisher): move publish to publisher --- Makefile | 1 + examples/publisher/main.go | 11 ++- gorabbit.go | 4 +- publish/middleware.go | 36 --------- publish/publish.go => publisher/channel.go | 2 +- .../channel_test.go | 14 ++-- publisher/middleware.go | 76 +++++++++++++++++++ {publish => publisher}/middleware_test.go | 8 +- publisher/options.go | 61 --------------- publisher/publisher.go | 44 ++++------- publisher/publisher_test.go | 25 +++--- 11 files changed, 121 insertions(+), 161 deletions(-) delete mode 100644 publish/middleware.go rename publish/publish.go => publisher/channel.go (98%) rename publish/publish_test.go => publisher/channel_test.go (67%) create mode 100644 publisher/middleware.go rename {publish => publisher}/middleware_test.go (86%) delete mode 100644 publisher/options.go diff --git a/Makefile b/Makefile index 7c64628..d484697 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,5 @@ current_dir = $(shell pwd) +netrc_file = "${HOME}/.netrc" # based on https://gist.github.com/prwhite/8168133 help: ## show this help diff --git a/examples/publisher/main.go b/examples/publisher/main.go index 7ba27c0..146d741 100644 --- a/examples/publisher/main.go +++ b/examples/publisher/main.go @@ -6,7 +6,6 @@ import ( "time" "github.com/heureka/gorabbit" - "github.com/heureka/gorabbit/publish" "github.com/heureka/gorabbit/publisher" amqp "github.com/rabbitmq/amqp091-go" ) @@ -15,7 +14,7 @@ func main() { pub, err := gorabbit.NewPublisher( "amqp://localhost:5672", "example-exchange", - publisher.WithConstHeaders(amqp.Table{"x-example-header": "example-value"}), + publisher.WithHeaders(amqp.Table{"x-example-header": "example-value"}), publisher.WithTransientDeliveryMode(), publisher.WithImmediate(), publisher.WithMandatory(), @@ -36,12 +35,12 @@ func main() { "example-key", []byte("hello again!"), // add headers - publish.WithHeaders(amqp.Table{"x-other-header": "only for thing publishing"}), + publisher.WithHeaders(amqp.Table{"x-other-header": "only for thing publishing"}), // set another expiration - publish.WithExpiration(time.Hour), + publisher.WithExpiration(time.Hour), // custom overriding - func(channel publish.Channel) publish.Channel { - return publish.ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { + func(channel publisher.Channel) publisher.Channel { + return publisher.ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { // set another exchange only for this publishing return channel.PublishWithContext(ctx, "other-exchange", key, mandatory, immediate, msg) }) diff --git a/gorabbit.go b/gorabbit.go index 18a953c..be73f3a 100644 --- a/gorabbit.go +++ b/gorabbit.go @@ -24,13 +24,13 @@ func NewConsumer(url string, queue string, ops ...consumer.Option) (*consumer.Co // NewPublisher creates a new published to RabbitMQ, which will publish to exchange. // Will automatically re-open channel on channel errors. // Reconnection is done with exponential backoff. -func NewPublisher(url string, exchange string, ops ...publisher.Option) (publisher.Publisher, error) { +func NewPublisher(url string, exchange string, mws ...publisher.Middleware) (publisher.Publisher, error) { ch, err := prepareChannel(url) if err != nil { return publisher.Publisher{}, err } - return publisher.New(ch, exchange, ops...), nil + return publisher.New(ch, exchange, mws...), nil } func prepareChannel(url string) (*channel.Reopener, error) { diff --git a/publish/middleware.go b/publish/middleware.go deleted file mode 100644 index ca950e8..0000000 --- a/publish/middleware.go +++ /dev/null @@ -1,36 +0,0 @@ -package publish - -import ( - "context" - "strconv" - "time" - - amqp "github.com/rabbitmq/amqp091-go" -) - -// WithHeaders adds headers to the published message. -func WithHeaders(table amqp.Table) Middleware { - return func(channel Channel) Channel { - return ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { - if msg.Headers == nil { - msg.Headers = make(amqp.Table) - } - for k, v := range table { - msg.Headers[k] = v - } - - return channel.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg) - }) - } -} - -// WithExpiration sets publishing Expire property. -func WithExpiration(expire time.Duration) Middleware { - return func(channel Channel) Channel { - return ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { - msg.Expiration = strconv.FormatInt(expire.Milliseconds(), 10) - - return channel.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg) - }) - } -} diff --git a/publish/publish.go b/publisher/channel.go similarity index 98% rename from publish/publish.go rename to publisher/channel.go index 71fd42d..d004ca1 100644 --- a/publish/publish.go +++ b/publisher/channel.go @@ -1,4 +1,4 @@ -package publish +package publisher import ( "context" diff --git a/publish/publish_test.go b/publisher/channel_test.go similarity index 67% rename from publish/publish_test.go rename to publisher/channel_test.go index d214d4b..8429e12 100644 --- a/publish/publish_test.go +++ b/publisher/channel_test.go @@ -1,33 +1,33 @@ -package publish_test +package publisher_test import ( "context" "testing" - "github.com/heureka/gorabbit/publish" + "github.com/heureka/gorabbit/publisher" amqp "github.com/rabbitmq/amqp091-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestWrap(t *testing.T) { - mw1 := func(channel publish.Channel) publish.Channel { - return publish.ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { + mw1 := func(channel publisher.Channel) publisher.Channel { + return publisher.ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { msg.Body = append(msg.Body, '1') return channel.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg) }) } - mw2 := func(channel publish.Channel) publish.Channel { - return publish.ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { + mw2 := func(channel publisher.Channel) publisher.Channel { + return publisher.ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { msg.Body = append(msg.Body, '2') return channel.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg) }) } ch := &fakeChannel{} - wrapped := publish.Wrap(ch, mw1, mw2) + wrapped := publisher.Wrap(ch, mw1, mw2) err := wrapped.PublishWithContext(context.TODO(), "test", "test", false, false, amqp.Publishing{}) require.NoError(t, err) diff --git a/publisher/middleware.go b/publisher/middleware.go new file mode 100644 index 0000000..ec0e0a1 --- /dev/null +++ b/publisher/middleware.go @@ -0,0 +1,76 @@ +package publisher + +import ( + "context" + "strconv" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +// WithHeaders adds headers to the published message. +func WithHeaders(table amqp.Table) Middleware { + return func(channel Channel) Channel { + return ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { + if msg.Headers == nil { + msg.Headers = make(amqp.Table) + } + for k, v := range table { + msg.Headers[k] = v + } + + return channel.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg) + }) + } +} + +// WithExpiration sets publishing Expire property. +func WithExpiration(expire time.Duration) Middleware { + return func(channel Channel) Channel { + return ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { + msg.Expiration = strconv.FormatInt(expire.Milliseconds(), 10) + + return channel.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg) + }) + } +} + +// WithTransientDeliveryMode sets publishing to the Transient delivery mode. +// Transient means higher throughput but messages will not be +// restored on broker restart. +// See https://github.com/rabbitmq/amqp091-go/blob/main/types.go#L123. +func WithTransientDeliveryMode() Middleware { + return func(channel Channel) Channel { + return ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { + msg.DeliveryMode = amqp.Transient + + return channel.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg) + }) + } +} + +// WithMandatory sets server to discard a message if no queue is +// bound that matches the routing key and server will return an +// undeliverable message with a Return method. +// See https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish.mandatory. +func WithMandatory() Middleware { + return func(channel Channel) Channel { + return ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { + + return channel.PublishWithContext(ctx, exchange, key, true, immediate, msg) + }) + } +} + +// WithImmediate sets server to discard a message when +// no consumer on the matched queue is ready to accept the delivery +// and server will return an undeliverable message with a Return method. +// See https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish.immediate. +func WithImmediate() Middleware { + return func(channel Channel) Channel { + return ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { + + return channel.PublishWithContext(ctx, exchange, key, mandatory, true, msg) + }) + } +} diff --git a/publish/middleware_test.go b/publisher/middleware_test.go similarity index 86% rename from publish/middleware_test.go rename to publisher/middleware_test.go index a79bf38..73ce01b 100644 --- a/publish/middleware_test.go +++ b/publisher/middleware_test.go @@ -1,11 +1,11 @@ -package publish_test +package publisher_test import ( "context" "testing" "time" - "github.com/heureka/gorabbit/publish" + "github.com/heureka/gorabbit/publisher" amqp "github.com/rabbitmq/amqp091-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -36,7 +36,7 @@ func TestPublishWithHeaders(t *testing.T) { for name, tt := range tests { t.Run(name, func(t *testing.T) { channel := &fakeChannel{} - wrapped := publish.Wrap(channel, publish.WithHeaders(headers)) + wrapped := publisher.Wrap(channel, publisher.WithHeaders(headers)) err := wrapped.PublishWithContext(context.TODO(), "test", "test", false, false, tt.msg) require.NoError(t, err) @@ -50,7 +50,7 @@ func TestPublishWithHeaders(t *testing.T) { func TestPublishWithExpiration(t *testing.T) { channel := &fakeChannel{} - wrapped := publish.Wrap(channel, publish.WithExpiration(time.Second)) + wrapped := publisher.Wrap(channel, publisher.WithExpiration(time.Second)) err := wrapped.PublishWithContext(context.TODO(), "test", "test", false, false, amqp.Publishing{}) require.NoError(t, err) diff --git a/publisher/options.go b/publisher/options.go deleted file mode 100644 index 37360d6..0000000 --- a/publisher/options.go +++ /dev/null @@ -1,61 +0,0 @@ -package publisher - -import ( - "strconv" - "time" - - amqp "github.com/rabbitmq/amqp091-go" -) - -// Option allows to configure RabbitMQ Publisher. -type Option func(p *Publisher) - -// WithConstHeaders sets constant publishing headers, that will be added to all publishings. -func WithConstHeaders(headers amqp.Table) Option { - return func(p *Publisher) { - if p.headers == nil { - p.headers = make(amqp.Table) - } - - for k, v := range headers { - p.headers[k] = v - } - } -} - -// WithTransientDeliveryMode sets publishing to the Transient delivery mode. -// Transient means higher throughput but messages will not be -// restored on broker restart. -// See https://github.com/rabbitmq/amqp091-go/blob/main/types.go#L123. -func WithTransientDeliveryMode() Option { - return func(p *Publisher) { - p.deliveryMode = amqp.Transient - } -} - -// WithMandatory sets server to discard a message if no queue is -// bound that matches the routing key and server will return an -// undeliverable message with a Return method. -// See https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish.mandatory. -func WithMandatory() Option { - return func(c *Publisher) { - c.mandatory = true - } -} - -// WithImmediate sets server to discard a message when -// no consumer on the matched queue is ready to accept the delivery -// and server will return an undeliverable message with a Return method. -// See https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish.immediate. -func WithImmediate() Option { - return func(p *Publisher) { - p.immediate = true - } -} - -// WithExpiration sets publishing Expire property. -func WithExpiration(expire time.Duration) Option { - return func(c *Publisher) { - c.expiration = strconv.FormatInt(expire.Milliseconds(), 10) - } -} diff --git a/publisher/publisher.go b/publisher/publisher.go index 4d11a6c..885211e 100644 --- a/publisher/publisher.go +++ b/publisher/publisher.go @@ -3,52 +3,34 @@ package publisher import ( "context" - "github.com/heureka/gorabbit/publish" amqp "github.com/rabbitmq/amqp091-go" ) // Publisher is a Publisher to RabbiMQ. type Publisher struct { - channel publish.Channel - exchange string - headers amqp.Table - deliveryMode uint8 - mandatory bool - immediate bool - expiration string + channel Channel + middlewares []Middleware + exchange string } // New creates new RabbitMQ Publisher. // By default, it will publish with Persistent delivery mode, mandatory=false, immediate=false and empty args. // Pass Options to configure it as you wish. -func New(channel publish.Channel, exchange string, ops ...Option) Publisher { - pub := Publisher{ - channel: channel, - exchange: exchange, - headers: nil, - deliveryMode: amqp.Persistent, - mandatory: false, - immediate: false, - expiration: "", +func New(channel Channel, exchange string, mws ...Middleware) Publisher { + return Publisher{ + channel: channel, + middlewares: mws, + exchange: exchange, } - - for _, op := range ops { - op(&pub) - } - - return pub } -// Publish message with routing key. -func (p Publisher) Publish(ctx context.Context, key string, message []byte, mws ...publish.Middleware) error { +// Publish message with routing key. Allows to override middleware for one publishing. +func (p Publisher) Publish(ctx context.Context, key string, message []byte, mws ...Middleware) error { publishing := amqp.Publishing{ - Headers: p.headers, - DeliveryMode: p.deliveryMode, - Expiration: p.expiration, + DeliveryMode: amqp.Persistent, Body: message, } - channel := publish.Wrap(p.channel, mws...) - - return channel.PublishWithContext(ctx, p.exchange, key, p.mandatory, p.immediate, publishing) + channel := Wrap(p.channel, append(p.middlewares, mws...)...) + return channel.PublishWithContext(ctx, p.exchange, key, false, false, publishing) } diff --git a/publisher/publisher_test.go b/publisher/publisher_test.go index 692f913..5bd9fab 100644 --- a/publisher/publisher_test.go +++ b/publisher/publisher_test.go @@ -5,7 +5,6 @@ import ( "testing" "time" - "github.com/heureka/gorabbit/publish" "github.com/heureka/gorabbit/publisher" amqp "github.com/rabbitmq/amqp091-go" "github.com/stretchr/testify/assert" @@ -26,13 +25,13 @@ func TestUnitPublish(t *testing.T) { message := []byte(`test message`) tests := map[string]struct { - options []publisher.Option - publishMiddleware []publish.Middleware + middlewares []publisher.Middleware + publishMiddleware []publisher.Middleware publishError error wantCallArgs callArgs wantError error }{ - "no options": { + "no middlewares": { wantCallArgs: callArgs{ exchange: exchange, key: routingKey, @@ -58,9 +57,9 @@ func TestUnitPublish(t *testing.T) { publishError: assert.AnError, wantError: assert.AnError, }, - "all options": { - options: []publisher.Option{ - publisher.WithConstHeaders(amqp.Table{"test": "header"}), + "all middlewares": { + middlewares: []publisher.Middleware{ + publisher.WithHeaders(amqp.Table{"test": "header"}), publisher.WithTransientDeliveryMode(), publisher.WithMandatory(), publisher.WithImmediate(), @@ -81,16 +80,16 @@ func TestUnitPublish(t *testing.T) { }, }, "publish middlewares": { - options: []publisher.Option{ - publisher.WithConstHeaders(amqp.Table{"test": "header"}), + middlewares: []publisher.Middleware{ + publisher.WithHeaders(amqp.Table{"test": "header"}), publisher.WithTransientDeliveryMode(), publisher.WithMandatory(), publisher.WithImmediate(), publisher.WithExpiration(time.Second), }, - publishMiddleware: []publish.Middleware{ - publish.WithHeaders(amqp.Table{"test": "other-header", "test2": "header"}), - publish.WithExpiration(2 * time.Second), + publishMiddleware: []publisher.Middleware{ + publisher.WithHeaders(amqp.Table{"test": "other-header", "test2": "header"}), + publisher.WithExpiration(2 * time.Second), }, wantCallArgs: callArgs{ exchange: exchange, @@ -120,7 +119,7 @@ func TestUnitPublish(t *testing.T) { tt.wantCallArgs.msg, ).Return(tt.publishError) - pub := publisher.New(channel, exchange, tt.options...) + pub := publisher.New(channel, exchange, tt.middlewares...) err := pub.Publish(context.TODO(), routingKey, message, tt.publishMiddleware...) if tt.wantError != nil { assert.ErrorIs(t, err, tt.wantError)