Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bindings/smtp/smtp.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (s *Mailer) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse,
// Merge config metadata with request metadata
metadata := s.metadata.mergeWithRequestMetadata(req)
if metadata.EmailFrom == "" {
return nil, fmt.Errorf("smtp binding error: emailFrom property not supplied in configuration- or request-metadata")
return nil, fmt.Errorf("smtp binding error: fromEmail property not supplied in configuration- or request-metadata")
}
if metadata.EmailTo == "" {
return nil, fmt.Errorf("smtp binding error: emailTo property not supplied in configuration- or request-metadata")
Expand Down
9 changes: 1 addition & 8 deletions pubsub/azure/servicebus/servicebus.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,14 +413,7 @@ func (a *azureServiceBus) Subscribe(req pubsub.SubscribeRequest, handler pubsub.
a.metadata.MaxActiveMessages,
a.metadata.MaxActiveMessagesRecoveryInSec)
if innerErr != nil {
var detachError *amqp.DetachError
var ampqError *amqp.Error
if errors.Is(innerErr, detachError) ||
(errors.As(innerErr, &ampqError) && ampqError.Condition == amqp.ErrorDetachForced) {
a.logger.Debug(innerErr)
} else {
a.logger.Error(innerErr)
}
a.logger.Error(innerErr)
}
cancel() // Cancel receive context

Expand Down
7 changes: 6 additions & 1 deletion pubsub/azure/servicebus/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package servicebus
import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -202,7 +203,11 @@ func (s *subscription) tryRenewLocks() {
func (s *subscription) receiveMessage(ctx context.Context, handler azservicebus.HandlerFunc) error {
s.logger.Debugf("Waiting to receive message on topic %s", s.topic)
if err := s.entity.ReceiveOne(ctx, handler); err != nil {
return fmt.Errorf("%s error receiving message on topic %s, %w", errorMessagePrefix, s.topic, err)
if strings.Contains(err.Error(), "force detached") {
return nil
}

return fmt.Errorf("%s error receiving message on topic %s, %s", errorMessagePrefix, s.topic, err)
}

return nil
Expand Down
11 changes: 11 additions & 0 deletions pubsub/nats/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation and Dapr Contributors.
// Licensed under the MIT License.
// ------------------------------------------------------------

package nats

type metadata struct {
natsURL string
natsQueueGroupName string
}
99 changes: 99 additions & 0 deletions pubsub/nats/nats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation and Dapr Contributors.
// Licensed under the MIT License.
// ------------------------------------------------------------

package nats

import (
"context"
"errors"
"fmt"

"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
nats "github.com/nats-io/nats.go"
)

const (
natsURL = "natsURL"
consumerID = "consumerID"
)

type natsPubSub struct {
metadata metadata
natsConn *nats.Conn

logger logger.Logger
}

// NewNATSPubSub returns a new NATS pub-sub implementation
func NewNATSPubSub(logger logger.Logger) pubsub.PubSub {
return &natsPubSub{logger: logger}
}

func parseNATSMetadata(meta pubsub.Metadata) (metadata, error) {
m := metadata{}
if val, ok := meta.Properties[natsURL]; ok && val != "" {
m.natsURL = val
} else {
return m, errors.New("nats error: missing nats URL")
}

if val, ok := meta.Properties[consumerID]; ok && val != "" {
m.natsQueueGroupName = val
} else {
return m, errors.New("nats error: missing queue name")
}

return m, nil
}

func (n *natsPubSub) Init(metadata pubsub.Metadata) error {
m, err := parseNATSMetadata(metadata)
if err != nil {
return err
}

n.metadata = m
natsConn, err := nats.Connect(m.natsURL)
if err != nil {
return fmt.Errorf("nats: error connecting to nats at %s: %s", m.natsURL, err)
}
n.logger.Debugf("connected to nats at %s", m.natsURL)

n.natsConn = natsConn

return nil
}

func (n *natsPubSub) Publish(req *pubsub.PublishRequest) error {
err := n.natsConn.Publish(req.Topic, req.Data)
if err != nil {
return fmt.Errorf("nats: error from publish: %s", err)
}

return nil
}

func (n *natsPubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error {
sub, err := n.natsConn.QueueSubscribe(req.Topic, n.metadata.natsQueueGroupName, func(natsMsg *nats.Msg) {
handler(context.Background(), &pubsub.NewMessage{Topic: req.Topic, Data: natsMsg.Data})
})
if err != nil {
n.logger.Warnf("nats: error subscribe: %s", err)
}
n.logger.Debugf("nats: subscribed to subject %s with queue group %s", sub.Subject, sub.Queue)

return nil
}

func (n *natsPubSub) Close() error {
n.natsConn.Close()

return nil
}

func (n *natsPubSub) Features() []pubsub.Feature {
return nil
}
69 changes: 69 additions & 0 deletions pubsub/nats/nats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation and Dapr Contributors.
// Licensed under the MIT License.
// ------------------------------------------------------------

package nats

import (
"errors"
"testing"

"github.com/dapr/components-contrib/pubsub"
"github.com/stretchr/testify/assert"
)

func TestParseNATSMetadata(t *testing.T) {
t.Run("metadata is correct", func(t *testing.T) {
fakeProperties := map[string]string{
natsURL: "foonats1",
consumerID: "fooq1",
}
fakeMetaData := pubsub.Metadata{
Properties: fakeProperties,
}

// act
m, err := parseNATSMetadata(fakeMetaData)

// assert
assert.NoError(t, err)
assert.NotEmpty(t, m.natsURL)
assert.NotEmpty(t, m.natsQueueGroupName)
assert.Equal(t, fakeProperties[natsURL], m.natsURL)
assert.Equal(t, fakeProperties[consumerID], m.natsQueueGroupName)
})

t.Run("queue is not given", func(t *testing.T) {
fakeProperties := map[string]string{
natsURL: "foonats2",
consumerID: "",
}

fakeMetaData := pubsub.Metadata{
Properties: fakeProperties,
}

// act
m, err := parseNATSMetadata(fakeMetaData)
// assert
assert.Error(t, errors.New("nats error: missing queue name"), err)
assert.Equal(t, fakeProperties[natsURL], m.natsURL)
assert.Empty(t, m.natsQueueGroupName)
})

t.Run("nats url is not given", func(t *testing.T) {
fakeProperties := map[string]string{
natsURL: "",
consumerID: "fooq2",
}
fakeMetaData := pubsub.Metadata{
Properties: fakeProperties,
}
// act
m, err := parseNATSMetadata(fakeMetaData)
// assert
assert.Error(t, errors.New("nats error: missing nats URL"), err)
assert.Empty(t, m.natsURL)
})
}