Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add convenience function to set connection name #108

Merged
merged 2 commits into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions _examples/simple-consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type Consumer struct {
}

func SetupCloseHandler(consumer *Consumer) {
c := make(chan os.Signal)
c := make(chan os.Signal, 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious why this change is necessary...???

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go vet was complaining about using unbuffered channels with os.Signal

go vet _examples/simple-consumer/consumer.go                                                                                                                                  
# command-line-arguments
# _examples/simple-consumer/consumer.go:70:2: misuse of unbuffered os.Signal channel as argument to signal.Notify

Happy to remove this and make another PR for govet and any other linter warnings, if you'd prefer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope! I appreciate the explanation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could have sworn we run something like go vet in CI. I'll check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect the golangci-lint tool we use is scanning only the root directory, and missing the _example directories. Likely same story for go vet ./...

signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
Expand All @@ -88,8 +88,10 @@ func NewConsumer(amqpURI, exchange, exchangeType, queueName, key, ctag string) (

var err error

config := amqp.Config{Properties: amqp.NewConnectionProperties()}
config.Properties.SetClientConnectionName("sample-consumer")
Log.Printf("dialing %q", amqpURI)
c.conn, err = amqp.Dial(amqpURI)
c.conn, err = amqp.DialConfig(amqpURI, config)
if err != nil {
return nil, fmt.Errorf("Dial: %s", err)
}
Expand Down
6 changes: 4 additions & 2 deletions _examples/simple-producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func main() {
}

func SetupCloseHandler(done chan bool) {
c := make(chan os.Signal)
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
Expand All @@ -55,8 +55,10 @@ func publish(done chan bool, amqpURI, exchange, exchangeType, routingKey, body s
// This function dials, connects, declares, publishes, and tears down,
// all in one go. In a real service, you probably want to maintain a
// long-lived connection as state, and publish against that.
config := amqp.Config{Properties: amqp.NewConnectionProperties()}
config.Properties.SetClientConnectionName("sample-producer")
Log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
connection, err := amqp.DialConfig(amqpURI, config)
if err != nil {
return fmt.Errorf("Dial: %s", err)
}
Expand Down
7 changes: 7 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ type Config struct {
Dial func(network, addr string) (net.Conn, error)
}

// NewConnectionProperties initialises an amqp.Table struct to empty value. This
// amqp.Table can be used as Properties in amqp.Config to set the connection
// name, using amqp.DialConfig()
func NewConnectionProperties() Table {
return make(Table)
}

// Connection manages the serialization and deserialization of frames from IO
// and dispatches the frames to the appropriate channel. All RPC methods and
// asynchronous Publishing, Delivery, Ack, Nack and Return messages are
Expand Down
17 changes: 17 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,20 @@ func TestReaderGoRoutineTerminatesWhenMsgIsProcessedDuringClose(t *testing.T) {
t.Log("waiting for go-routines to terminate")
wg.Wait()
}

func TestConnectionConfigPropertiesWithClientProvidedConnectionName(t *testing.T) {
const expectedConnectionName = "amqp091-go-test"

connectionProperties := NewConnectionProperties()
connectionProperties.SetClientConnectionName(expectedConnectionName)

currentConnectionName, ok := connectionProperties["connection_name"]
if !ok {
t.Fatal("Connection name was not set by Table.SetClientConnectionName")
}
if currentConnectionName != expectedConnectionName {
t.Fatalf("Connection name is set to: %s. Expected: %s",
currentConnectionName,
expectedConnectionName)
}
}
12 changes: 12 additions & 0 deletions examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,3 +422,15 @@ func ExampleConnection_NotifyBlocked() {
// Your application domain channel setup publishings
publishAllTheThings(conn)
}

func ExampleTable_SetClientConnectionName() {
// Sets the well-known connection_name property in amqp.Config. The connection
// name will be visible in RabbitMQ Management UI.
config := amqp.Config{Properties: amqp.NewConnectionProperties()}
config.Properties.SetClientConnectionName("my-client-app")
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("connection.open: %s", err)
}
defer conn.Close()
}
8 changes: 8 additions & 0 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,14 @@ func (t Table) Validate() error {
return validateField(t)
}

// Sets the connection name property. This property can be used in
// amqp.Config to set a custom connection name during amqp.DialConfig(). This
// can be helpful to identify specific connections in RabbitMQ, for debugging or
// tracing purposes.
func (t Table) SetClientConnectionName(connName string) {
t["connection_name"] = connName
}

type message interface {
id() (uint16, uint16)
wait() bool
Expand Down