Skip to content

Commit

Permalink
fix(publisher): move publish to publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
dmksnnk committed Jan 3, 2024
1 parent 1354ddb commit 0a1191f
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 161 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
current_dir = $(shell pwd)
netrc_file = "${HOME}/.netrc"

# based on https://gist.github.com/prwhite/8168133
help: ## show this help
Expand Down
11 changes: 5 additions & 6 deletions examples/publisher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/heureka/gorabbit"
"github.com/heureka/gorabbit/publish"
"github.com/heureka/gorabbit/publisher"
amqp "github.com/rabbitmq/amqp091-go"
)
Expand All @@ -15,7 +14,7 @@ func main() {
pub, err := gorabbit.NewPublisher(
"amqp://localhost:5672",
"example-exchange",
publisher.WithConstHeaders(amqp.Table{"x-example-header": "example-value"}),
publisher.WithHeaders(amqp.Table{"x-example-header": "example-value"}),
publisher.WithTransientDeliveryMode(),
publisher.WithImmediate(),
publisher.WithMandatory(),
Expand All @@ -36,12 +35,12 @@ func main() {
"example-key",
[]byte("hello again!"),
// add headers
publish.WithHeaders(amqp.Table{"x-other-header": "only for thing publishing"}),
publisher.WithHeaders(amqp.Table{"x-other-header": "only for thing publishing"}),
// set another expiration
publish.WithExpiration(time.Hour),
publisher.WithExpiration(time.Hour),
// custom overriding
func(channel publish.Channel) publish.Channel {
return publish.ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
func(channel publisher.Channel) publisher.Channel {
return publisher.ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
// set another exchange only for this publishing
return channel.PublishWithContext(ctx, "other-exchange", key, mandatory, immediate, msg)
})
Expand Down
4 changes: 2 additions & 2 deletions gorabbit.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ func NewConsumer(url string, queue string, ops ...consumer.Option) (*consumer.Co
// NewPublisher creates a new published to RabbitMQ, which will publish to exchange.
// Will automatically re-open channel on channel errors.
// Reconnection is done with exponential backoff.
func NewPublisher(url string, exchange string, ops ...publisher.Option) (publisher.Publisher, error) {
func NewPublisher(url string, exchange string, mws ...publisher.Middleware) (publisher.Publisher, error) {
ch, err := prepareChannel(url)
if err != nil {
return publisher.Publisher{}, err
}

return publisher.New(ch, exchange, ops...), nil
return publisher.New(ch, exchange, mws...), nil
}

func prepareChannel(url string) (*channel.Reopener, error) {
Expand Down
36 changes: 0 additions & 36 deletions publish/middleware.go

This file was deleted.

2 changes: 1 addition & 1 deletion publish/publish.go → publisher/channel.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package publish
package publisher

import (
"context"
Expand Down
14 changes: 7 additions & 7 deletions publish/publish_test.go → publisher/channel_test.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
package publish_test
package publisher_test

import (
"context"
"testing"

"github.com/heureka/gorabbit/publish"
"github.com/heureka/gorabbit/publisher"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestWrap(t *testing.T) {
mw1 := func(channel publish.Channel) publish.Channel {
return publish.ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
mw1 := func(channel publisher.Channel) publisher.Channel {
return publisher.ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
msg.Body = append(msg.Body, '1')

return channel.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg)
})
}

mw2 := func(channel publish.Channel) publish.Channel {
return publish.ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
mw2 := func(channel publisher.Channel) publisher.Channel {
return publisher.ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
msg.Body = append(msg.Body, '2')

return channel.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg)
})
}
ch := &fakeChannel{}
wrapped := publish.Wrap(ch, mw1, mw2)
wrapped := publisher.Wrap(ch, mw1, mw2)
err := wrapped.PublishWithContext(context.TODO(), "test", "test", false, false, amqp.Publishing{})
require.NoError(t, err)

Expand Down
76 changes: 76 additions & 0 deletions publisher/middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package publisher

import (
"context"
"strconv"
"time"

amqp "github.com/rabbitmq/amqp091-go"
)

// WithHeaders adds headers to the published message.
func WithHeaders(table amqp.Table) Middleware {
return func(channel Channel) Channel {
return ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
if msg.Headers == nil {
msg.Headers = make(amqp.Table)
}
for k, v := range table {
msg.Headers[k] = v
}

return channel.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg)
})
}
}

// WithExpiration sets publishing Expire property.
func WithExpiration(expire time.Duration) Middleware {
return func(channel Channel) Channel {
return ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
msg.Expiration = strconv.FormatInt(expire.Milliseconds(), 10)

return channel.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg)
})
}
}

// WithTransientDeliveryMode sets publishing to the Transient delivery mode.
// Transient means higher throughput but messages will not be
// restored on broker restart.
// See https://github.com/rabbitmq/amqp091-go/blob/main/types.go#L123.
func WithTransientDeliveryMode() Middleware {
return func(channel Channel) Channel {
return ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
msg.DeliveryMode = amqp.Transient

return channel.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg)
})
}
}

// WithMandatory sets server to discard a message if no queue is
// bound that matches the routing key and server will return an
// undeliverable message with a Return method.
// See https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish.mandatory.
func WithMandatory() Middleware {
return func(channel Channel) Channel {
return ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {

return channel.PublishWithContext(ctx, exchange, key, true, immediate, msg)
})
}
}

// WithImmediate sets server to discard a message when
// no consumer on the matched queue is ready to accept the delivery
// and server will return an undeliverable message with a Return method.
// See https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish.immediate.
func WithImmediate() Middleware {
return func(channel Channel) Channel {
return ChannelFunc(func(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {

return channel.PublishWithContext(ctx, exchange, key, mandatory, true, msg)
})
}
}
8 changes: 4 additions & 4 deletions publish/middleware_test.go → publisher/middleware_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package publish_test
package publisher_test

import (
"context"
"testing"
"time"

"github.com/heureka/gorabbit/publish"
"github.com/heureka/gorabbit/publisher"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -36,7 +36,7 @@ func TestPublishWithHeaders(t *testing.T) {
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
channel := &fakeChannel{}
wrapped := publish.Wrap(channel, publish.WithHeaders(headers))
wrapped := publisher.Wrap(channel, publisher.WithHeaders(headers))

err := wrapped.PublishWithContext(context.TODO(), "test", "test", false, false, tt.msg)
require.NoError(t, err)
Expand All @@ -50,7 +50,7 @@ func TestPublishWithHeaders(t *testing.T) {

func TestPublishWithExpiration(t *testing.T) {
channel := &fakeChannel{}
wrapped := publish.Wrap(channel, publish.WithExpiration(time.Second))
wrapped := publisher.Wrap(channel, publisher.WithExpiration(time.Second))

err := wrapped.PublishWithContext(context.TODO(), "test", "test", false, false, amqp.Publishing{})
require.NoError(t, err)
Expand Down
61 changes: 0 additions & 61 deletions publisher/options.go

This file was deleted.

44 changes: 13 additions & 31 deletions publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,52 +3,34 @@ package publisher
import (
"context"

"github.com/heureka/gorabbit/publish"
amqp "github.com/rabbitmq/amqp091-go"
)

// Publisher is a Publisher to RabbiMQ.
type Publisher struct {
channel publish.Channel
exchange string
headers amqp.Table
deliveryMode uint8
mandatory bool
immediate bool
expiration string
channel Channel
middlewares []Middleware
exchange string
}

// New creates new RabbitMQ Publisher.
// By default, it will publish with Persistent delivery mode, mandatory=false, immediate=false and empty args.
// Pass Options to configure it as you wish.
func New(channel publish.Channel, exchange string, ops ...Option) Publisher {
pub := Publisher{
channel: channel,
exchange: exchange,
headers: nil,
deliveryMode: amqp.Persistent,
mandatory: false,
immediate: false,
expiration: "",
func New(channel Channel, exchange string, mws ...Middleware) Publisher {
return Publisher{
channel: channel,
middlewares: mws,
exchange: exchange,
}

for _, op := range ops {
op(&pub)
}

return pub
}

// Publish message with routing key.
func (p Publisher) Publish(ctx context.Context, key string, message []byte, mws ...publish.Middleware) error {
// Publish message with routing key. Allows to override middleware for one publishing.
func (p Publisher) Publish(ctx context.Context, key string, message []byte, mws ...Middleware) error {
publishing := amqp.Publishing{
Headers: p.headers,
DeliveryMode: p.deliveryMode,
Expiration: p.expiration,
DeliveryMode: amqp.Persistent,
Body: message,
}
channel := publish.Wrap(p.channel, mws...)

return channel.PublishWithContext(ctx, p.exchange, key, p.mandatory, p.immediate, publishing)
channel := Wrap(p.channel, append(p.middlewares, mws...)...)

return channel.PublishWithContext(ctx, p.exchange, key, false, false, publishing)
}
Loading

0 comments on commit 0a1191f

Please sign in to comment.