Skip to content

Commit

Permalink
Feat: defined a type for exchange kinds
Browse files Browse the repository at this point in the history
  • Loading branch information
techerfan committed Oct 29, 2024
1 parent dc67c21 commit bf3f676
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 35 deletions.
2 changes: 1 addition & 1 deletion _examples/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func redial(ctx context.Context, url string) chan chan session {
log.Fatalf("cannot create channel: %v", err)
}

if err := ch.ExchangeDeclare(exchange, "fanout", false, true, false, false, nil); err != nil {
if err := ch.ExchangeDeclare(exchange, amqp.Fanout, false, true, false, false, nil); err != nil {
log.Fatalf("cannot declare fanout exchange: %v", err)
}

Expand Down
8 changes: 4 additions & 4 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -1301,15 +1301,15 @@ to respond to any exceptions.
Optional amqp.Table of arguments that are specific to the server's implementation of
the exchange can be sent for exchange types that require extra parameters.
*/
func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error {
func (ch *Channel) ExchangeDeclare(name string, kind ExchangeType, durable, autoDelete, internal, noWait bool, args Table) error {
if err := args.Validate(); err != nil {
return err
}

return ch.call(
&exchangeDeclare{
Exchange: name,
Type: kind,
Type: string(kind),
Passive: false,
Durable: durable,
AutoDelete: autoDelete,
Expand All @@ -1328,15 +1328,15 @@ exchange is assumed by RabbitMQ to already exist, and attempting to connect to a
non-existent exchange will cause RabbitMQ to throw an exception. This function
can be used to detect the existence of an exchange.
*/
func (ch *Channel) ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error {
func (ch *Channel) ExchangeDeclarePassive(name string, kind ExchangeType, durable, autoDelete, internal, noWait bool, args Table) error {
if err := args.Validate(); err != nil {
return err
}

return ch.call(
&exchangeDeclare{
Exchange: name,
Type: kind,
Type: string(kind),
Passive: true,
Durable: durable,
AutoDelete: autoDelete,
Expand Down
8 changes: 4 additions & 4 deletions examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func ExampleChannel_Confirm_bridge() {
log.Fatalf("channel.open source: %s", err)
}

if err := chs.ExchangeDeclare("log", "topic", true, false, false, false, nil); err != nil {
if err := chs.ExchangeDeclare("log", amqp.Topic, true, false, false, false, nil); err != nil {
log.Fatalf("exchange.declare destination: %s", err)
}

Expand Down Expand Up @@ -159,7 +159,7 @@ func ExampleChannel_Confirm_bridge() {
log.Fatalf("channel.open destination: %s", err)
}

if err := chd.ExchangeDeclare("log", "topic", true, false, false, false, nil); err != nil {
if err := chd.ExchangeDeclare("log", amqp.Topic, true, false, false, false, nil); err != nil {
log.Fatalf("exchange.declare destination: %s", err)
}

Expand Down Expand Up @@ -239,7 +239,7 @@ func ExampleChannel_Consume() {
// are the same. This is part of AMQP being a programmable messaging model.
//
// See the Channel.Publish example for the complimentary declare.
err = c.ExchangeDeclare("logs", "topic", true, false, false, false, nil)
err = c.ExchangeDeclare("logs", amqp.Topic, true, false, false, false, nil)
if err != nil {
log.Fatalf("exchange.declare: %s", err)
}
Expand Down Expand Up @@ -365,7 +365,7 @@ func ExampleChannel_PublishWithContext() {
// are the same. This is part of AMQP being a programmable messaging model.
//
// See the Channel.Consume example for the complimentary declare.
err = c.ExchangeDeclare("logs", "topic", true, false, false, false, nil)
err = c.ExchangeDeclare("logs", amqp.Topic, true, false, false, false, nil)
if err != nil {
log.Fatalf("exchange.declare: %v", err)
}
Expand Down
44 changes: 22 additions & 22 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,12 @@ func TestExchangePassiveOnMissingExchangeShouldError(t *testing.T) {

if err := ch.ExchangeDeclarePassive(
"test-integration-missing-passive-exchange",
"direct", // type
false, // duration (note: is durable)
true, // auto-delete
false, // internal
false, // nowait
nil, // args
Direct, // type
false, // duration (note: is durable)
true, // auto-delete
false, // internal
false, // nowait
nil, // args
); err == nil {
t.Fatal("ExchangeDeclarePassive of a missing exchange should return error")
}
Expand All @@ -199,7 +199,7 @@ func TestIntegrationExchangeDeclarePassiveOnDeclaredShouldNotError(t *testing.T)

if err := ch.ExchangeDeclare(
exchange, // name
"direct", // type
Direct, // type
false, // durable
true, // auto-delete
false, // internal
Expand All @@ -211,7 +211,7 @@ func TestIntegrationExchangeDeclarePassiveOnDeclaredShouldNotError(t *testing.T)

if err := ch.ExchangeDeclarePassive(
exchange, // name
"direct", // type
Direct, // type
false, // durable
true, // auto-delete
false, // internal
Expand All @@ -238,7 +238,7 @@ func TestIntegrationExchange(t *testing.T) {

if err := channel.ExchangeDeclare(
exchange, // name
"direct", // type
Direct, // type
false, // duration
true, // auto-delete
false, // internal
Expand Down Expand Up @@ -396,7 +396,7 @@ func TestIntegrationBasicQueueOperations(t *testing.T) {
for _, deleteQueueFirst := range deleteQueueFirstOptions {
if err := channel.ExchangeDeclare(
exchangeName, // name
"direct", // type
Direct, // type
true, // duration (note: is durable)
false, // auto-delete
false, // internal
Expand Down Expand Up @@ -1502,11 +1502,11 @@ func TestDeclareArgsRejectToDeadLetterQueue(t *testing.T) {

ch, _ := conn.Channel()

if err := ch.ExchangeDeclare(ex, "fanout", false, true, false, false, nil); err != nil {
if err := ch.ExchangeDeclare(ex, Fanout, false, true, false, false, nil); err != nil {
t.Fatalf("cannot declare %v: got: %v", ex, err)
}

if err := ch.ExchangeDeclare(dlex, "fanout", false, true, false, false, nil); err != nil {
if err := ch.ExchangeDeclare(dlex, Fanout, false, true, false, false, nil); err != nil {
t.Fatalf("cannot declare %v: got: %v", dlex, err)
}

Expand Down Expand Up @@ -1722,7 +1722,7 @@ func TestChannelExceptionWithCloseIssue43(t *testing.T) {

// This ensures that the 2nd channel is unaffected by the channel exception
// on channel 1.
err = c2.ExchangeDeclare("test-channel-still-exists", "direct", false, true, false, false, nil)
err = c2.ExchangeDeclare("test-channel-still-exists", Direct, false, true, false, false, nil)
if err != nil {
t.Fatalf("failed to declare exchange, got: %v", err)
}
Expand Down Expand Up @@ -1834,20 +1834,20 @@ func TestExchangeDeclarePrecondition(t *testing.T) {

err = ch.ExchangeDeclare(
exchange,
"direct", // exchangeType
false, // durable
true, // auto-delete
false, // internal
false, // noWait
nil, // arguments
Direct, // exchangeType
false, // durable
true, // auto-delete
false, // internal
false, // noWait
nil, // arguments
)
if err != nil {
t.Fatalf("Could not initially declare exchange")
}

err = ch.ExchangeDeclare(
exchange,
"direct",
Direct,
true, // different durability
true,
false,
Expand Down Expand Up @@ -2041,7 +2041,7 @@ func TestIntegrationGetNextPublishSeqNo(t *testing.T) {
}

ex := "test-get-next-pub"
if err = ch.ExchangeDeclare(ex, "direct", false, false, false, false, nil); err != nil {
if err = ch.ExchangeDeclare(ex, Direct, false, false, false, false, nil); err != nil {
t.Fatalf("cannot declare %v: got: %v", ex, err)
}

Expand Down Expand Up @@ -2075,7 +2075,7 @@ func TestIntegrationGetNextPublishSeqNoRace(t *testing.T) {
}

ex := "test-get-next-pub"
if err = ch.ExchangeDeclare(ex, "direct", false, false, false, false, nil); err != nil {
if err = ch.ExchangeDeclare(ex, Direct, false, false, false, false, nil); err != nil {
t.Fatalf("cannot declare %v: got: %v", ex, err)
}

Expand Down
11 changes: 7 additions & 4 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ import (
// name. Applications can route to a queue using the queue name as routing key.
const DefaultExchange = ""

// Type of Exchanges
type ExchangeType string

// Constants for standard AMQP 0-9-1 exchange types.
const (
ExchangeDirect = "direct"
ExchangeFanout = "fanout"
ExchangeTopic = "topic"
ExchangeHeaders = "headers"
Direct ExchangeType = "direct"
Fanout ExchangeType = "fanout"
Topic ExchangeType = "topic"
Headers ExchangeType = "headers"
)

var (
Expand Down

0 comments on commit bf3f676

Please sign in to comment.