Skip to content

Commit

Permalink
Un-deprecate non-context Publish functions (#259)
Browse files Browse the repository at this point in the history
The context was not honoured in any of the *WithContext functions. This
is confusing, and arguably broken. However, we cannot immediately fix
the context-support situation due to #124 (comment)

This commit undeprecates the non-context variants of publish, and
documents that both variants are equivalent. The example now favours the
non-context variants.

Related to #195

Signed-off-by: Aitor Perez Cedres <aitor.perez@broadcom.com>
  • Loading branch information
Zerpet authored May 7, 2024
1 parent c519d62 commit 2bc185e
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 58 deletions.
8 changes: 3 additions & 5 deletions _examples/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package main

import (
"context"
"flag"
amqp "github.com/rabbitmq/amqp091-go"
"log"
Expand Down Expand Up @@ -43,7 +42,7 @@ func main() {

startConfirmHandler(publishOkCh, confirmsCh, confirmsDoneCh, exitCh)

publish(context.Background(), publishOkCh, confirmsCh, confirmsDoneCh, exitCh)
publish(publishOkCh, confirmsCh, confirmsDoneCh, exitCh)
}

func setupCloseHandler(exitCh chan struct{}) {
Expand All @@ -56,7 +55,7 @@ func setupCloseHandler(exitCh chan struct{}) {
}()
}

func publish(ctx context.Context, publishOkCh <-chan struct{}, confirmsCh chan<- *amqp.DeferredConfirmation, confirmsDoneCh <-chan struct{}, exitCh chan struct{}) {
func publish(publishOkCh <-chan struct{}, confirmsCh chan<- *amqp.DeferredConfirmation, confirmsDoneCh <-chan struct{}, exitCh chan struct{}) {
config := amqp.Config{
Vhost: "/",
Properties: amqp.NewConnectionProperties(),
Expand Down Expand Up @@ -140,8 +139,7 @@ func publish(ctx context.Context, publishOkCh <-chan struct{}, confirmsCh chan<-
}

Log.Printf("producer: publishing %dB body (%q)", len(*body), *body)
dConfirmation, err := channel.PublishWithDeferredConfirmWithContext(
ctx,
dConfirmation, err := channel.PublishWithDeferredConfirm(
*exchange,
*routingKey,
true,
Expand Down
9 changes: 2 additions & 7 deletions _examples/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@ import (
"crypto/sha1"
"flag"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"io"
"log"
"os"
"time"

amqp "github.com/rabbitmq/amqp091-go"
)

var url = flag.String("url", "amqp:///", "AMQP url for both the publisher and subscriber")
Expand Down Expand Up @@ -88,9 +86,6 @@ func redial(ctx context.Context, url string) chan chan session {
// publish publishes messages to a reconnecting session to a fanout exchange.
// It receives from the application specific source of messages.
func publish(sessions chan chan session, messages <-chan message) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

pending := make(chan message, 1)

for session := range sessions {
Expand Down Expand Up @@ -128,7 +123,7 @@ func publish(sessions chan chan session, messages <-chan message) {

case body = <-pending:
routingKey := "ignored for fanout exchanges, application dependent for other exchanges"
err := pub.PublishWithContext(ctx, exchange, routingKey, false, false, amqp.Publishing{
err := pub.Publish(exchange, routingKey, false, false, amqp.Publishing{
Body: body,
})
// Retry failed delivery on the next session
Expand Down
49 changes: 22 additions & 27 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package amqp091

import (
"context"
"errors"
"reflect"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -1484,17 +1483,17 @@ confirmations start at 1. Exit when all publishings are confirmed.
When Publish does not return an error and the channel is in confirm mode, the
internal counter for DeliveryTags with the first confirmation starts at 1.
Deprecated: Use PublishWithContext instead.
*/
func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error {
_, err := ch.PublishWithDeferredConfirmWithContext(context.Background(), exchange, key, mandatory, immediate, msg)
_, err := ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg)
return err
}

/*
PublishWithContext sends a Publishing from the client to an exchange on the server.
NOTE: this function is equivalent to [Channel.Publish]. Context is not honoured.
When you want a single message to be delivered to a single queue, you can
publish to the default exchange with the routingKey of the queue name. This is
because every declared queue gets an implicit route to the default exchange.
Expand Down Expand Up @@ -1524,34 +1523,17 @@ confirmations start at 1. Exit when all publishings are confirmed.
When Publish does not return an error and the channel is in confirm mode, the
internal counter for DeliveryTags with the first confirmation starts at 1.
*/
func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error {
_, err := ch.PublishWithDeferredConfirmWithContext(ctx, exchange, key, mandatory, immediate, msg)
return err
func (ch *Channel) PublishWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error {
return ch.Publish(exchange, key, mandatory, immediate, msg)
}

/*
PublishWithDeferredConfirm behaves identically to Publish but additionally returns a
DeferredConfirmation, allowing the caller to wait on the publisher confirmation
for this message. If the channel has not been put into confirm mode,
the DeferredConfirmation will be nil.
Deprecated: Use PublishWithDeferredConfirmWithContext instead.
PublishWithDeferredConfirm behaves identically to Publish, but additionally
returns a DeferredConfirmation, allowing the caller to wait on the publisher
confirmation for this message. If the channel has not been put into confirm
mode, the DeferredConfirmation will be nil.
*/
func (ch *Channel) PublishWithDeferredConfirm(exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) {
return ch.PublishWithDeferredConfirmWithContext(context.Background(), exchange, key, mandatory, immediate, msg)
}

/*
PublishWithDeferredConfirmWithContext behaves identically to Publish but additionally returns a
DeferredConfirmation, allowing the caller to wait on the publisher confirmation
for this message. If the channel has not been put into confirm mode,
the DeferredConfirmation will be nil.
*/
func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) {
if ctx == nil {
return nil, errors.New("amqp091-go: nil Context")
}

if err := msg.Headers.Validate(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1595,6 +1577,19 @@ func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, ex
return dc, nil
}

/*
PublishWithDeferredConfirmWithContext behaves identically to Publish but additionally returns a
DeferredConfirmation, allowing the caller to wait on the publisher confirmation
for this message. If the channel has not been put into confirm mode,
the DeferredConfirmation will be nil.
NOTE: PublishWithDeferredConfirmWithContext is equivalent to its non-context variant. The context passed
to this function is not honoured.
*/
func (ch *Channel) PublishWithDeferredConfirmWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) {
return ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg)
}

/*
Get synchronously receives a single Delivery from the head of a queue from the
server to the client. In almost all cases, using Channel.Consume will be
Expand Down
33 changes: 14 additions & 19 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,11 @@ prior to calling [Channel.PublishWithContext] or [Channel.Consume].
When Dial encounters an amqps:// scheme, it will use the zero value of a
tls.Config. This will only perform server certificate and host verification.
Use DialTLS when you wish to provide a client certificate (recommended),
include a private certificate authority's certificate in the cert chain for
server validity, or run insecure by not verifying the server certificate dial
your own connection. DialTLS will use the provided tls.Config when it
encounters an amqps:// scheme and will dial a plain connection when it
encounters an amqp:// scheme.
Use DialTLS when you wish to provide a client certificate (recommended), include
a private certificate authority's certificate in the cert chain for server
validity, or run insecure by not verifying the server certificate. DialTLS will
use the provided tls.Config when it encounters an amqps:// scheme and will dial
a plain connection when it encounters an amqp:// scheme.
SSL/TLS in RabbitMQ is documented here: http://www.rabbitmq.com/ssl.html
Expand All @@ -110,17 +109,18 @@ In order to be notified when a connection or channel gets closed, both
structures offer the possibility to register channels using
[Channel.NotifyClose] and [Connection.NotifyClose] functions:
notifyConnCloseCh := conn.NotifyClose(make(chan *amqp.Error))
notifyConnCloseCh := conn.NotifyClose(make(chan *amqp.Error, 1))
No errors will be sent in case of a graceful connection close. In case of a
non-graceful closure due to e.g. network issue, or forced connection closure
from the Management UI, the error will be notified synchronously by the library.
The error is sent synchronously to the channel, so that the flow will wait until
the receiver consumes from the channel. To avoid deadlocks in the library, it is
necessary to consume from the channels. This could be done inside a
different goroutine with a select listening on the two channels inside a for
loop like:
The library sends to notification channels just once. After sending a
notification to all channels, the library closes all registered notification
channels. After receiving a notification, the application should create and
register a new channel. To avoid deadlocks in the library, it is necessary to
consume from the channels. This could be done inside a different goroutine with
a select listening on the two channels inside a for loop like:
go func() {
for notifyConnClose != nil || notifyChanClose != nil {
Expand All @@ -141,13 +141,8 @@ loop like:
}
}()
Another approach is to use buffered channels:
notifyConnCloseCh := conn.NotifyClose(make(chan *amqp.Error, 1))
The library sends to notification channels just once. After sending a notification
to all channels, the library closes all registered notification channels. After
receiving a notification, the application should create and register a new channel.
It is strongly recommended to use buffered channels to avoid deadlocks inside
the library.
# Best practises for NotifyPublish notifications:
Expand Down

0 comments on commit 2bc185e

Please sign in to comment.