From 54840c2b837e2e61fa61f7af5ff8203c61b12bd9 Mon Sep 17 00:00:00 2001 From: Phil Kedy Date: Fri, 23 Jul 2021 17:01:06 -0400 Subject: [PATCH 1/3] Fixing the handling of detach errors (#1030) * Improve error message in case of missing property (#1012) Co-authored-by: Artur Souza * Remove vestigial pubsub/nats code (#1024) The pubsub/nats component was replaced by pubsub/natsstreaming as part of https://github.com/dapr/dapr/pull/2003, but the corresponding code in dapr/components-contrib was not removed, so this change removes it. * Fixing the handling of detach errors Co-authored-by: Maarten Mulders Co-authored-by: Artur Souza Co-authored-by: Simon Leet <31784195+CodeMonkeyLeet@users.noreply.github.com> --- bindings/smtp/smtp.go | 2 +- pubsub/azure/servicebus/servicebus.go | 9 ++- pubsub/azure/servicebus/subscription.go | 7 +- pubsub/nats/metadata.go | 11 --- pubsub/nats/nats.go | 99 ------------------------- pubsub/nats/nats_test.go | 69 ----------------- 6 files changed, 10 insertions(+), 187 deletions(-) delete mode 100644 pubsub/nats/metadata.go delete mode 100644 pubsub/nats/nats.go delete mode 100644 pubsub/nats/nats_test.go diff --git a/bindings/smtp/smtp.go b/bindings/smtp/smtp.go index 28385a5f14..d71aefb69c 100644 --- a/bindings/smtp/smtp.go +++ b/bindings/smtp/smtp.go @@ -63,7 +63,7 @@ func (s *Mailer) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, // Merge config metadata with request metadata metadata := s.metadata.mergeWithRequestMetadata(req) if metadata.EmailFrom == "" { - return nil, fmt.Errorf("smtp binding error: fromEmail property not supplied in configuration- or request-metadata") + return nil, fmt.Errorf("smtp binding error: emailFrom property not supplied in configuration- or request-metadata") } if metadata.EmailTo == "" { return nil, fmt.Errorf("smtp binding error: emailTo property not supplied in configuration- or request-metadata") diff --git a/pubsub/azure/servicebus/servicebus.go b/pubsub/azure/servicebus/servicebus.go index 0e2448d5d8..e4cb6e8298 100644 --- a/pubsub/azure/servicebus/servicebus.go +++ b/pubsub/azure/servicebus/servicebus.go @@ -413,7 +413,14 @@ func (a *azureServiceBus) Subscribe(req pubsub.SubscribeRequest, handler pubsub. a.metadata.MaxActiveMessages, a.metadata.MaxActiveMessagesRecoveryInSec) if innerErr != nil { - a.logger.Error(innerErr) + var detachError *amqp.DetachError + var ampqError *amqp.Error + if errors.Is(innerErr, detachError) || + (errors.As(innerErr, &qError) && ampqError.Condition == amqp.ErrorDetachForced) { + a.logger.Debug(innerErr) + } else { + a.logger.Error(innerErr) + } } cancel() // Cancel receive context diff --git a/pubsub/azure/servicebus/subscription.go b/pubsub/azure/servicebus/subscription.go index 1d60e811eb..ae7f6bf837 100644 --- a/pubsub/azure/servicebus/subscription.go +++ b/pubsub/azure/servicebus/subscription.go @@ -3,7 +3,6 @@ package servicebus import ( "context" "fmt" - "strings" "sync" "time" @@ -203,11 +202,7 @@ func (s *subscription) tryRenewLocks() { func (s *subscription) receiveMessage(ctx context.Context, handler azservicebus.HandlerFunc) error { s.logger.Debugf("Waiting to receive message on topic %s", s.topic) if err := s.entity.ReceiveOne(ctx, handler); err != nil { - if strings.Contains(err.Error(), "force detached") { - return nil - } - - return fmt.Errorf("%s error receiving message on topic %s, %s", errorMessagePrefix, s.topic, err) + return fmt.Errorf("%s error receiving message on topic %s, %w", errorMessagePrefix, s.topic, err) } return nil diff --git a/pubsub/nats/metadata.go b/pubsub/nats/metadata.go deleted file mode 100644 index e48fa1bb9a..0000000000 --- a/pubsub/nats/metadata.go +++ /dev/null @@ -1,11 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation and Dapr Contributors. -// Licensed under the MIT License. -// ------------------------------------------------------------ - -package nats - -type metadata struct { - natsURL string - natsQueueGroupName string -} diff --git a/pubsub/nats/nats.go b/pubsub/nats/nats.go deleted file mode 100644 index 84693debc3..0000000000 --- a/pubsub/nats/nats.go +++ /dev/null @@ -1,99 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation and Dapr Contributors. -// Licensed under the MIT License. -// ------------------------------------------------------------ - -package nats - -import ( - "context" - "errors" - "fmt" - - "github.com/dapr/components-contrib/pubsub" - "github.com/dapr/kit/logger" - nats "github.com/nats-io/nats.go" -) - -const ( - natsURL = "natsURL" - consumerID = "consumerID" -) - -type natsPubSub struct { - metadata metadata - natsConn *nats.Conn - - logger logger.Logger -} - -// NewNATSPubSub returns a new NATS pub-sub implementation -func NewNATSPubSub(logger logger.Logger) pubsub.PubSub { - return &natsPubSub{logger: logger} -} - -func parseNATSMetadata(meta pubsub.Metadata) (metadata, error) { - m := metadata{} - if val, ok := meta.Properties[natsURL]; ok && val != "" { - m.natsURL = val - } else { - return m, errors.New("nats error: missing nats URL") - } - - if val, ok := meta.Properties[consumerID]; ok && val != "" { - m.natsQueueGroupName = val - } else { - return m, errors.New("nats error: missing queue name") - } - - return m, nil -} - -func (n *natsPubSub) Init(metadata pubsub.Metadata) error { - m, err := parseNATSMetadata(metadata) - if err != nil { - return err - } - - n.metadata = m - natsConn, err := nats.Connect(m.natsURL) - if err != nil { - return fmt.Errorf("nats: error connecting to nats at %s: %s", m.natsURL, err) - } - n.logger.Debugf("connected to nats at %s", m.natsURL) - - n.natsConn = natsConn - - return nil -} - -func (n *natsPubSub) Publish(req *pubsub.PublishRequest) error { - err := n.natsConn.Publish(req.Topic, req.Data) - if err != nil { - return fmt.Errorf("nats: error from publish: %s", err) - } - - return nil -} - -func (n *natsPubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error { - sub, err := n.natsConn.QueueSubscribe(req.Topic, n.metadata.natsQueueGroupName, func(natsMsg *nats.Msg) { - handler(context.Background(), &pubsub.NewMessage{Topic: req.Topic, Data: natsMsg.Data}) - }) - if err != nil { - n.logger.Warnf("nats: error subscribe: %s", err) - } - n.logger.Debugf("nats: subscribed to subject %s with queue group %s", sub.Subject, sub.Queue) - - return nil -} - -func (n *natsPubSub) Close() error { - n.natsConn.Close() - - return nil -} - -func (n *natsPubSub) Features() []pubsub.Feature { - return nil -} diff --git a/pubsub/nats/nats_test.go b/pubsub/nats/nats_test.go deleted file mode 100644 index dfe805ef5f..0000000000 --- a/pubsub/nats/nats_test.go +++ /dev/null @@ -1,69 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation and Dapr Contributors. -// Licensed under the MIT License. -// ------------------------------------------------------------ - -package nats - -import ( - "errors" - "testing" - - "github.com/dapr/components-contrib/pubsub" - "github.com/stretchr/testify/assert" -) - -func TestParseNATSMetadata(t *testing.T) { - t.Run("metadata is correct", func(t *testing.T) { - fakeProperties := map[string]string{ - natsURL: "foonats1", - consumerID: "fooq1", - } - fakeMetaData := pubsub.Metadata{ - Properties: fakeProperties, - } - - // act - m, err := parseNATSMetadata(fakeMetaData) - - // assert - assert.NoError(t, err) - assert.NotEmpty(t, m.natsURL) - assert.NotEmpty(t, m.natsQueueGroupName) - assert.Equal(t, fakeProperties[natsURL], m.natsURL) - assert.Equal(t, fakeProperties[consumerID], m.natsQueueGroupName) - }) - - t.Run("queue is not given", func(t *testing.T) { - fakeProperties := map[string]string{ - natsURL: "foonats2", - consumerID: "", - } - - fakeMetaData := pubsub.Metadata{ - Properties: fakeProperties, - } - - // act - m, err := parseNATSMetadata(fakeMetaData) - // assert - assert.Error(t, errors.New("nats error: missing queue name"), err) - assert.Equal(t, fakeProperties[natsURL], m.natsURL) - assert.Empty(t, m.natsQueueGroupName) - }) - - t.Run("nats url is not given", func(t *testing.T) { - fakeProperties := map[string]string{ - natsURL: "", - consumerID: "fooq2", - } - fakeMetaData := pubsub.Metadata{ - Properties: fakeProperties, - } - // act - m, err := parseNATSMetadata(fakeMetaData) - // assert - assert.Error(t, errors.New("nats error: missing nats URL"), err) - assert.Empty(t, m.natsURL) - }) -} From bc5d71653563307df2032ef83bc3fa4a9fd1561e Mon Sep 17 00:00:00 2001 From: Phil Kedy Date: Fri, 23 Jul 2021 17:36:42 -0400 Subject: [PATCH 2/3] Revert "Fixing the handling of detach errors (#1030)" (#1031) This reverts commit 54840c2b837e2e61fa61f7af5ff8203c61b12bd9. --- bindings/smtp/smtp.go | 2 +- pubsub/azure/servicebus/servicebus.go | 9 +-- pubsub/azure/servicebus/subscription.go | 7 +- pubsub/nats/metadata.go | 11 +++ pubsub/nats/nats.go | 99 +++++++++++++++++++++++++ pubsub/nats/nats_test.go | 69 +++++++++++++++++ 6 files changed, 187 insertions(+), 10 deletions(-) create mode 100644 pubsub/nats/metadata.go create mode 100644 pubsub/nats/nats.go create mode 100644 pubsub/nats/nats_test.go diff --git a/bindings/smtp/smtp.go b/bindings/smtp/smtp.go index d71aefb69c..28385a5f14 100644 --- a/bindings/smtp/smtp.go +++ b/bindings/smtp/smtp.go @@ -63,7 +63,7 @@ func (s *Mailer) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, // Merge config metadata with request metadata metadata := s.metadata.mergeWithRequestMetadata(req) if metadata.EmailFrom == "" { - return nil, fmt.Errorf("smtp binding error: emailFrom property not supplied in configuration- or request-metadata") + return nil, fmt.Errorf("smtp binding error: fromEmail property not supplied in configuration- or request-metadata") } if metadata.EmailTo == "" { return nil, fmt.Errorf("smtp binding error: emailTo property not supplied in configuration- or request-metadata") diff --git a/pubsub/azure/servicebus/servicebus.go b/pubsub/azure/servicebus/servicebus.go index e4cb6e8298..0e2448d5d8 100644 --- a/pubsub/azure/servicebus/servicebus.go +++ b/pubsub/azure/servicebus/servicebus.go @@ -413,14 +413,7 @@ func (a *azureServiceBus) Subscribe(req pubsub.SubscribeRequest, handler pubsub. a.metadata.MaxActiveMessages, a.metadata.MaxActiveMessagesRecoveryInSec) if innerErr != nil { - var detachError *amqp.DetachError - var ampqError *amqp.Error - if errors.Is(innerErr, detachError) || - (errors.As(innerErr, &qError) && ampqError.Condition == amqp.ErrorDetachForced) { - a.logger.Debug(innerErr) - } else { - a.logger.Error(innerErr) - } + a.logger.Error(innerErr) } cancel() // Cancel receive context diff --git a/pubsub/azure/servicebus/subscription.go b/pubsub/azure/servicebus/subscription.go index ae7f6bf837..1d60e811eb 100644 --- a/pubsub/azure/servicebus/subscription.go +++ b/pubsub/azure/servicebus/subscription.go @@ -3,6 +3,7 @@ package servicebus import ( "context" "fmt" + "strings" "sync" "time" @@ -202,7 +203,11 @@ func (s *subscription) tryRenewLocks() { func (s *subscription) receiveMessage(ctx context.Context, handler azservicebus.HandlerFunc) error { s.logger.Debugf("Waiting to receive message on topic %s", s.topic) if err := s.entity.ReceiveOne(ctx, handler); err != nil { - return fmt.Errorf("%s error receiving message on topic %s, %w", errorMessagePrefix, s.topic, err) + if strings.Contains(err.Error(), "force detached") { + return nil + } + + return fmt.Errorf("%s error receiving message on topic %s, %s", errorMessagePrefix, s.topic, err) } return nil diff --git a/pubsub/nats/metadata.go b/pubsub/nats/metadata.go new file mode 100644 index 0000000000..e48fa1bb9a --- /dev/null +++ b/pubsub/nats/metadata.go @@ -0,0 +1,11 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation and Dapr Contributors. +// Licensed under the MIT License. +// ------------------------------------------------------------ + +package nats + +type metadata struct { + natsURL string + natsQueueGroupName string +} diff --git a/pubsub/nats/nats.go b/pubsub/nats/nats.go new file mode 100644 index 0000000000..84693debc3 --- /dev/null +++ b/pubsub/nats/nats.go @@ -0,0 +1,99 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation and Dapr Contributors. +// Licensed under the MIT License. +// ------------------------------------------------------------ + +package nats + +import ( + "context" + "errors" + "fmt" + + "github.com/dapr/components-contrib/pubsub" + "github.com/dapr/kit/logger" + nats "github.com/nats-io/nats.go" +) + +const ( + natsURL = "natsURL" + consumerID = "consumerID" +) + +type natsPubSub struct { + metadata metadata + natsConn *nats.Conn + + logger logger.Logger +} + +// NewNATSPubSub returns a new NATS pub-sub implementation +func NewNATSPubSub(logger logger.Logger) pubsub.PubSub { + return &natsPubSub{logger: logger} +} + +func parseNATSMetadata(meta pubsub.Metadata) (metadata, error) { + m := metadata{} + if val, ok := meta.Properties[natsURL]; ok && val != "" { + m.natsURL = val + } else { + return m, errors.New("nats error: missing nats URL") + } + + if val, ok := meta.Properties[consumerID]; ok && val != "" { + m.natsQueueGroupName = val + } else { + return m, errors.New("nats error: missing queue name") + } + + return m, nil +} + +func (n *natsPubSub) Init(metadata pubsub.Metadata) error { + m, err := parseNATSMetadata(metadata) + if err != nil { + return err + } + + n.metadata = m + natsConn, err := nats.Connect(m.natsURL) + if err != nil { + return fmt.Errorf("nats: error connecting to nats at %s: %s", m.natsURL, err) + } + n.logger.Debugf("connected to nats at %s", m.natsURL) + + n.natsConn = natsConn + + return nil +} + +func (n *natsPubSub) Publish(req *pubsub.PublishRequest) error { + err := n.natsConn.Publish(req.Topic, req.Data) + if err != nil { + return fmt.Errorf("nats: error from publish: %s", err) + } + + return nil +} + +func (n *natsPubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error { + sub, err := n.natsConn.QueueSubscribe(req.Topic, n.metadata.natsQueueGroupName, func(natsMsg *nats.Msg) { + handler(context.Background(), &pubsub.NewMessage{Topic: req.Topic, Data: natsMsg.Data}) + }) + if err != nil { + n.logger.Warnf("nats: error subscribe: %s", err) + } + n.logger.Debugf("nats: subscribed to subject %s with queue group %s", sub.Subject, sub.Queue) + + return nil +} + +func (n *natsPubSub) Close() error { + n.natsConn.Close() + + return nil +} + +func (n *natsPubSub) Features() []pubsub.Feature { + return nil +} diff --git a/pubsub/nats/nats_test.go b/pubsub/nats/nats_test.go new file mode 100644 index 0000000000..dfe805ef5f --- /dev/null +++ b/pubsub/nats/nats_test.go @@ -0,0 +1,69 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation and Dapr Contributors. +// Licensed under the MIT License. +// ------------------------------------------------------------ + +package nats + +import ( + "errors" + "testing" + + "github.com/dapr/components-contrib/pubsub" + "github.com/stretchr/testify/assert" +) + +func TestParseNATSMetadata(t *testing.T) { + t.Run("metadata is correct", func(t *testing.T) { + fakeProperties := map[string]string{ + natsURL: "foonats1", + consumerID: "fooq1", + } + fakeMetaData := pubsub.Metadata{ + Properties: fakeProperties, + } + + // act + m, err := parseNATSMetadata(fakeMetaData) + + // assert + assert.NoError(t, err) + assert.NotEmpty(t, m.natsURL) + assert.NotEmpty(t, m.natsQueueGroupName) + assert.Equal(t, fakeProperties[natsURL], m.natsURL) + assert.Equal(t, fakeProperties[consumerID], m.natsQueueGroupName) + }) + + t.Run("queue is not given", func(t *testing.T) { + fakeProperties := map[string]string{ + natsURL: "foonats2", + consumerID: "", + } + + fakeMetaData := pubsub.Metadata{ + Properties: fakeProperties, + } + + // act + m, err := parseNATSMetadata(fakeMetaData) + // assert + assert.Error(t, errors.New("nats error: missing queue name"), err) + assert.Equal(t, fakeProperties[natsURL], m.natsURL) + assert.Empty(t, m.natsQueueGroupName) + }) + + t.Run("nats url is not given", func(t *testing.T) { + fakeProperties := map[string]string{ + natsURL: "", + consumerID: "fooq2", + } + fakeMetaData := pubsub.Metadata{ + Properties: fakeProperties, + } + // act + m, err := parseNATSMetadata(fakeMetaData) + // assert + assert.Error(t, errors.New("nats error: missing nats URL"), err) + assert.Empty(t, m.natsURL) + }) +} From f31b4e40a99526f117aa704ed0a909d8dd8f2d3d Mon Sep 17 00:00:00 2001 From: Phil Kedy Date: Fri, 23 Jul 2021 17:46:16 -0400 Subject: [PATCH 3/3] Fixing the handling of detach errors (#1032) --- pubsub/azure/servicebus/servicebus.go | 9 ++++++++- pubsub/azure/servicebus/subscription.go | 7 +------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/pubsub/azure/servicebus/servicebus.go b/pubsub/azure/servicebus/servicebus.go index 0e2448d5d8..e4cb6e8298 100644 --- a/pubsub/azure/servicebus/servicebus.go +++ b/pubsub/azure/servicebus/servicebus.go @@ -413,7 +413,14 @@ func (a *azureServiceBus) Subscribe(req pubsub.SubscribeRequest, handler pubsub. a.metadata.MaxActiveMessages, a.metadata.MaxActiveMessagesRecoveryInSec) if innerErr != nil { - a.logger.Error(innerErr) + var detachError *amqp.DetachError + var ampqError *amqp.Error + if errors.Is(innerErr, detachError) || + (errors.As(innerErr, &qError) && ampqError.Condition == amqp.ErrorDetachForced) { + a.logger.Debug(innerErr) + } else { + a.logger.Error(innerErr) + } } cancel() // Cancel receive context diff --git a/pubsub/azure/servicebus/subscription.go b/pubsub/azure/servicebus/subscription.go index 1d60e811eb..ae7f6bf837 100644 --- a/pubsub/azure/servicebus/subscription.go +++ b/pubsub/azure/servicebus/subscription.go @@ -3,7 +3,6 @@ package servicebus import ( "context" "fmt" - "strings" "sync" "time" @@ -203,11 +202,7 @@ func (s *subscription) tryRenewLocks() { func (s *subscription) receiveMessage(ctx context.Context, handler azservicebus.HandlerFunc) error { s.logger.Debugf("Waiting to receive message on topic %s", s.topic) if err := s.entity.ReceiveOne(ctx, handler); err != nil { - if strings.Contains(err.Error(), "force detached") { - return nil - } - - return fmt.Errorf("%s error receiving message on topic %s, %s", errorMessagePrefix, s.topic, err) + return fmt.Errorf("%s error receiving message on topic %s, %w", errorMessagePrefix, s.topic, err) } return nil