From bcaa9bb5629b14f719c3d750076af246939f2f1d Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Fri, 23 Jul 2021 17:34:38 +0300 Subject: [PATCH 1/9] bugfix for sns topic deletion upon termination --- pubsub/aws/snssqs/snssqs.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 79892dcbda..746b562abd 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -70,6 +70,7 @@ func NewSnsSqs(l logger.Logger) pubsub.PubSub { return &snsSqs{ logger: l, subscriptions: []*string{}, + pattern: regexp.MustCompile("[^a-zA-Z0-9_\\-]+"), } } @@ -220,7 +221,7 @@ func (s *snsSqs) Init(metadata pubsub.Metadata) error { } func (s *snsSqs) createTopic(topic string) (string, string, error) { - hashedName := nameToHash(topic) + hashedName := s.nameToValidName(topic) createTopicResponse, err := s.snsClient.CreateTopic(&sns.CreateTopicInput{ Name: aws.String(hashedName), Tags: []*sns.Tag{{Key: aws.String(awsSnsTopicNameKey), Value: aws.String(topic)}}, @@ -491,11 +492,7 @@ func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) } func (s *snsSqs) Close() error { - for _, sub := range s.subscriptions { - s.snsClient.Unsubscribe(&sns.UnsubscribeInput{ - SubscriptionArn: sub, - }) - } + s.logger.Debugf("Closing sns-sqs pubsub component. This is NOOP") return nil } From 62ac6b01d3a1fe12e5acd28cf556cf751b78fd7a Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Fri, 23 Jul 2021 17:40:02 +0300 Subject: [PATCH 2/9] Revert "bugfix for sns topic deletion upon termination" This reverts commit bcaa9bb5629b14f719c3d750076af246939f2f1d. --- pubsub/aws/snssqs/snssqs.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 746b562abd..79892dcbda 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -70,7 +70,6 @@ func NewSnsSqs(l logger.Logger) pubsub.PubSub { return &snsSqs{ logger: l, subscriptions: []*string{}, - pattern: regexp.MustCompile("[^a-zA-Z0-9_\\-]+"), } } @@ -221,7 +220,7 @@ func (s *snsSqs) Init(metadata pubsub.Metadata) error { } func (s *snsSqs) createTopic(topic string) (string, string, error) { - hashedName := s.nameToValidName(topic) + hashedName := nameToHash(topic) createTopicResponse, err := s.snsClient.CreateTopic(&sns.CreateTopicInput{ Name: aws.String(hashedName), Tags: []*sns.Tag{{Key: aws.String(awsSnsTopicNameKey), Value: aws.String(topic)}}, @@ -492,7 +491,11 @@ func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) } func (s *snsSqs) Close() error { - s.logger.Debugf("Closing sns-sqs pubsub component. This is NOOP") + for _, sub := range s.subscriptions { + s.snsClient.Unsubscribe(&sns.UnsubscribeInput{ + SubscriptionArn: sub, + }) + } return nil } From de891cf96d568702ef5196107d3c8944fd8a7404 Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Fri, 23 Jul 2021 17:48:26 +0300 Subject: [PATCH 3/9] wip on normalizing queue/topic names --- pubsub/aws/snssqs/snssqs.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 79892dcbda..25a8a46be5 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -8,6 +8,7 @@ import ( "fmt" "strconv" "strings" + "regexp" "github.com/aws/aws-sdk-go/aws" sns "github.com/aws/aws-sdk-go/service/sns" @@ -70,6 +71,7 @@ func NewSnsSqs(l logger.Logger) pubsub.PubSub { return &snsSqs{ logger: l, subscriptions: []*string{}, + pattern: regexp.MustCompile("[^a-zA-Z0-9_\\-]+"), } } @@ -102,6 +104,14 @@ func nameToHash(name string) string { return fmt.Sprintf("%x", h.Sum(nil)) } +// normalize topic name to conform with: +// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_CreateQueue.html +func (s *snsSqs) nameToValidName(name string) string { + replacedName := s.pattern.ReplaceAllString(name, "") + replacedName[] +} + + func (s *snsSqs) getSnsSqsMetatdata(metadata pubsub.Metadata) (*snsSqsMetadata, error) { md := snsSqsMetadata{} props := metadata.Properties From d4d2df7057adf8485e93145e5ea79a3eb41c253d Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Sat, 24 Jul 2021 14:55:19 +0300 Subject: [PATCH 4/9] sanitize queue and topic names --- pubsub/aws/snssqs/snssqs.go | 31 ++++++++++++++++++------------- pubsub/aws/snssqs/snssqs_test.go | 11 +++++++++++ 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 25a8a46be5..5780c82f48 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -67,11 +67,13 @@ const ( awsSnsTopicNameKey = "dapr-topic-name" ) +var awsSnsSqsTopicAllowCharsRe = regexp.MustCompile("[^a-zA-Z0-9_\\-]+") + + func NewSnsSqs(l logger.Logger) pubsub.PubSub { return &snsSqs{ logger: l, subscriptions: []*string{}, - pattern: regexp.MustCompile("[^a-zA-Z0-9_\\-]+"), } } @@ -104,14 +106,16 @@ func nameToHash(name string) string { return fmt.Sprintf("%x", h.Sum(nil)) } -// normalize topic name to conform with: +// normalize topic/queue name to conform with: // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_CreateQueue.html -func (s *snsSqs) nameToValidName(name string) string { - replacedName := s.pattern.ReplaceAllString(name, "") - replacedName[] +func nameToValidName(name string) string { + replacedName := awsSnsSqsTopicAllowCharsRe.ReplaceAllString(name, "") + if len(replacedName) > 80 { + replacedName = replacedName[:80] + } + return replacedName } - func (s *snsSqs) getSnsSqsMetatdata(metadata pubsub.Metadata) (*snsSqsMetadata, error) { md := snsSqsMetadata{} props := metadata.Properties @@ -230,7 +234,7 @@ func (s *snsSqs) Init(metadata pubsub.Metadata) error { } func (s *snsSqs) createTopic(topic string) (string, string, error) { - hashedName := nameToHash(topic) + hashedName := nameToValidName(topic) createTopicResponse, err := s.snsClient.CreateTopic(&sns.CreateTopicInput{ Name: aws.String(hashedName), Tags: []*sns.Tag{{Key: aws.String(awsSnsTopicNameKey), Value: aws.String(topic)}}, @@ -271,7 +275,7 @@ func (s *snsSqs) getOrCreateTopic(topic string) (string, error) { func (s *snsSqs) createQueue(queueName string) (*sqsQueueInfo, error) { createQueueResponse, err := s.sqsClient.CreateQueue(&sqs.CreateQueueInput{ - QueueName: aws.String(nameToHash(queueName)), + QueueName: aws.String(nameToValidName(queueName)), Tags: map[string]*string{awsSqsQueueNameKey: aws.String(queueName)}, }) if err != nil { @@ -501,11 +505,12 @@ func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) } func (s *snsSqs) Close() error { - for _, sub := range s.subscriptions { - s.snsClient.Unsubscribe(&sns.UnsubscribeInput{ - SubscriptionArn: sub, - }) - } + s.logger.Debugf("Close was called and is now NOOP") + // for _, sub := range s.subscriptions { + // s.snsClient.Unsubscribe(&sns.UnsubscribeInput{ + // SubscriptionArn: sub, + // }) + // } return nil } diff --git a/pubsub/aws/snssqs/snssqs_test.go b/pubsub/aws/snssqs/snssqs_test.go index 3979df4001..aaabb05362 100644 --- a/pubsub/aws/snssqs/snssqs_test.go +++ b/pubsub/aws/snssqs/snssqs_test.go @@ -255,3 +255,14 @@ func Test_nameToHash(t *testing.T) { fmt.Sprintf("Invalid character %s in hashed name", string(c))) } } + +func Test_replaceInvalidQueueNameChars(t *testing.T) { + r := require.New(t) + + s := `Some invalid name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid + name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid name // for an + AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^` + v := nameToValidName(s) + r.Equal(80, len(v)) + r.Equal("SomeinvalidnameforanAWSresourceSomeinvalidnameforanAWSresourceSomeinvalidnamefor", v) +} From d3e08e72fdb98179b9d80ed25192c5e567be1cae Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Sat, 24 Jul 2021 15:53:09 +0300 Subject: [PATCH 5/9] sanitized names. bugfix for close --- pubsub/aws/snssqs/snssqs.go | 46 +++++++++++++------------------- pubsub/aws/snssqs/snssqs_test.go | 29 +++----------------- 2 files changed, 23 insertions(+), 52 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 5780c82f48..f520b0ed00 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -2,7 +2,6 @@ package snssqs import ( "context" - "crypto/sha256" "encoding/json" "errors" "fmt" @@ -21,8 +20,8 @@ import ( type snsSqs struct { // key is the topic name, value is the ARN of the topic topics map[string]string - // key is the hashed topic name, value is the actual topic name - topicHash map[string]string + // key is the sanitized topic name, value is the actual topic name + topicSanitized map[string]string // key is the topic name, value holds the ARN of the queue and its url queues map[string]*sqsQueueInfo snsClient *sns.SNS @@ -67,7 +66,7 @@ const ( awsSnsTopicNameKey = "dapr-topic-name" ) -var awsSnsSqsTopicAllowCharsRe = regexp.MustCompile("[^a-zA-Z0-9_\\-]+") +var awsSnsSqsAllowedCharsRe = regexp.MustCompile("[^a-zA-Z0-9_\\-]+") func NewSnsSqs(l logger.Logger) pubsub.PubSub { @@ -97,23 +96,16 @@ func parseInt64(input string, propertyName string) (int64, error) { return int64(number), nil } -// take a name and hash it for compatibility with AWS resource names -// the output is fixed at 64 characters -func nameToHash(name string) string { - h := sha256.New() - h.Write([]byte(name)) - return fmt.Sprintf("%x", h.Sum(nil)) -} - -// normalize topic/queue name to conform with: -// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_CreateQueue.html -func nameToValidName(name string) string { - replacedName := awsSnsSqsTopicAllowCharsRe.ReplaceAllString(name, "") - if len(replacedName) > 80 { - replacedName = replacedName[:80] +// sanitize topic/queue name to conform with: +// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-queues.html +func nameToAWSSanitizedName(name string) string { + sanitizedName := awsSnsSqsAllowedCharsRe.ReplaceAllString(name, "") + if len(sanitizedName) > 80 { + sanitizedName = sanitizedName[:80] } - return replacedName + + return sanitizedName } func (s *snsSqs) getSnsSqsMetatdata(metadata pubsub.Metadata) (*snsSqsMetadata, error) { @@ -221,7 +213,7 @@ func (s *snsSqs) Init(metadata pubsub.Metadata) error { // both Publish and Subscribe need reference the topic ARN // track these ARNs in this map s.topics = make(map[string]string) - s.topicHash = make(map[string]string) + s.topicSanitized = make(map[string]string) s.queues = make(map[string]*sqsQueueInfo) sess, err := aws_auth.GetClient(md.AccessKey, md.SecretKey, md.SessionToken, md.Region, md.Endpoint) if err != nil { @@ -234,16 +226,16 @@ func (s *snsSqs) Init(metadata pubsub.Metadata) error { } func (s *snsSqs) createTopic(topic string) (string, string, error) { - hashedName := nameToValidName(topic) + sanitizedName := nameToAWSSanitizedName(topic) createTopicResponse, err := s.snsClient.CreateTopic(&sns.CreateTopicInput{ - Name: aws.String(hashedName), + Name: aws.String(sanitizedName), Tags: []*sns.Tag{{Key: aws.String(awsSnsTopicNameKey), Value: aws.String(topic)}}, }) if err != nil { return "", "", err } - return *(createTopicResponse.TopicArn), hashedName, nil + return *(createTopicResponse.TopicArn), sanitizedName, nil } // get the topic ARN from the topics map. If it doesn't exist in the map, try to fetch it from AWS, if it doesn't exist @@ -259,7 +251,7 @@ func (s *snsSqs) getOrCreateTopic(topic string) (string, error) { s.logger.Debugf("No topic ARN found for %s\n Creating topic instead.", topic) - topicArn, hashedName, err := s.createTopic(topic) + topicArn, sanitizedName, err := s.createTopic(topic) if err != nil { s.logger.Errorf("error creating new topic %s: %v", topic, err) @@ -268,14 +260,14 @@ func (s *snsSqs) getOrCreateTopic(topic string) (string, error) { // record topic ARN s.topics[topic] = topicArn - s.topicHash[hashedName] = topic + s.topicSanitized[sanitizedName] = topic return topicArn, nil } func (s *snsSqs) createQueue(queueName string) (*sqsQueueInfo, error) { createQueueResponse, err := s.sqsClient.CreateQueue(&sqs.CreateQueueInput{ - QueueName: aws.String(nameToValidName(queueName)), + QueueName: aws.String(nameToAWSSanitizedName(queueName)), Tags: map[string]*string{awsSqsQueueNameKey: aws.String(queueName)}, }) if err != nil { @@ -411,7 +403,7 @@ func (s *snsSqs) handleMessage(message *sqs.Message, queueInfo *sqsQueueInfo, ha } topic := parseTopicArn(messageBody.TopicArn) - topic = s.topicHash[topic] + topic = s.topicSanitized[topic] err = handler(context.Background(), &pubsub.NewMessage{ Data: []byte(messageBody.Message), Topic: topic, diff --git a/pubsub/aws/snssqs/snssqs_test.go b/pubsub/aws/snssqs/snssqs_test.go index aaabb05362..b5a61caf37 100644 --- a/pubsub/aws/snssqs/snssqs_test.go +++ b/pubsub/aws/snssqs/snssqs_test.go @@ -1,8 +1,6 @@ package snssqs import ( - "fmt" - "strings" "testing" "github.com/dapr/components-contrib/pubsub" @@ -236,33 +234,14 @@ func Test_parseInt64(t *testing.T) { r.Error(err) } -func Test_nameToHash(t *testing.T) { - r := require.New(t) - - // This string is too long and contains invalid character for either an SQS queue or an SNS topic - hashedName := nameToHash(` - Some invalid name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid - name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid name // for an - AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^ - `) - - r.Equal(64, len(hashedName)) - // Output is only expected to contain lower case characters representing valid hexadecimal numerals - for _, c := range hashedName { - r.True( - strings.ContainsAny( - "abcdef0123456789", string(c)), - fmt.Sprintf("Invalid character %s in hashed name", string(c))) - } -} -func Test_replaceInvalidQueueNameChars(t *testing.T) { +func Test_replaceNameToAWSSanitizedName(t *testing.T) { r := require.New(t) - s := `Some invalid name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid + s := `Some_invalid-name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^` - v := nameToValidName(s) + v := nameToAWSSanitizedName(s) r.Equal(80, len(v)) - r.Equal("SomeinvalidnameforanAWSresourceSomeinvalidnameforanAWSresourceSomeinvalidnamefor", v) + r.Equal("Some_invalid-nameforanAWSresourceSomeinvalidnameforanAWSresourceSomeinvalidnamef", v) } From 5c9949b40a4f79afa70d54b5af19b1d941594a00 Mon Sep 17 00:00:00 2001 From: Maarten Mulders Date: Mon, 19 Jul 2021 03:44:38 +0200 Subject: [PATCH 6/9] # This is a combination of 4 commits. # This is the 1st commit message: Improve error message in case of missing property (#1012) Co-authored-by: Artur Souza # This is the commit message #2: Remove vestigial pubsub/nats code (#1024) The pubsub/nats component was replaced by pubsub/natsstreaming as part of https://github.com/dapr/dapr/pull/2003, but the corresponding code in dapr/components-contrib was not removed, so this change removes it. # This is the commit message #3: bugfix for sns topic deletion upon termination # This is the commit message #4: Revert "bugfix for sns topic deletion upon termination" This reverts commit bcaa9bb5629b14f719c3d750076af246939f2f1d. --- bindings/smtp/smtp.go | 2 +- pubsub/nats/metadata.go | 11 ----- pubsub/nats/nats.go | 99 ---------------------------------------- pubsub/nats/nats_test.go | 69 ---------------------------- 4 files changed, 1 insertion(+), 180 deletions(-) delete mode 100644 pubsub/nats/metadata.go delete mode 100644 pubsub/nats/nats.go delete mode 100644 pubsub/nats/nats_test.go diff --git a/bindings/smtp/smtp.go b/bindings/smtp/smtp.go index 28385a5f14..d71aefb69c 100644 --- a/bindings/smtp/smtp.go +++ b/bindings/smtp/smtp.go @@ -63,7 +63,7 @@ func (s *Mailer) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, // Merge config metadata with request metadata metadata := s.metadata.mergeWithRequestMetadata(req) if metadata.EmailFrom == "" { - return nil, fmt.Errorf("smtp binding error: fromEmail property not supplied in configuration- or request-metadata") + return nil, fmt.Errorf("smtp binding error: emailFrom property not supplied in configuration- or request-metadata") } if metadata.EmailTo == "" { return nil, fmt.Errorf("smtp binding error: emailTo property not supplied in configuration- or request-metadata") diff --git a/pubsub/nats/metadata.go b/pubsub/nats/metadata.go deleted file mode 100644 index e48fa1bb9a..0000000000 --- a/pubsub/nats/metadata.go +++ /dev/null @@ -1,11 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation and Dapr Contributors. -// Licensed under the MIT License. -// ------------------------------------------------------------ - -package nats - -type metadata struct { - natsURL string - natsQueueGroupName string -} diff --git a/pubsub/nats/nats.go b/pubsub/nats/nats.go deleted file mode 100644 index 84693debc3..0000000000 --- a/pubsub/nats/nats.go +++ /dev/null @@ -1,99 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation and Dapr Contributors. -// Licensed under the MIT License. -// ------------------------------------------------------------ - -package nats - -import ( - "context" - "errors" - "fmt" - - "github.com/dapr/components-contrib/pubsub" - "github.com/dapr/kit/logger" - nats "github.com/nats-io/nats.go" -) - -const ( - natsURL = "natsURL" - consumerID = "consumerID" -) - -type natsPubSub struct { - metadata metadata - natsConn *nats.Conn - - logger logger.Logger -} - -// NewNATSPubSub returns a new NATS pub-sub implementation -func NewNATSPubSub(logger logger.Logger) pubsub.PubSub { - return &natsPubSub{logger: logger} -} - -func parseNATSMetadata(meta pubsub.Metadata) (metadata, error) { - m := metadata{} - if val, ok := meta.Properties[natsURL]; ok && val != "" { - m.natsURL = val - } else { - return m, errors.New("nats error: missing nats URL") - } - - if val, ok := meta.Properties[consumerID]; ok && val != "" { - m.natsQueueGroupName = val - } else { - return m, errors.New("nats error: missing queue name") - } - - return m, nil -} - -func (n *natsPubSub) Init(metadata pubsub.Metadata) error { - m, err := parseNATSMetadata(metadata) - if err != nil { - return err - } - - n.metadata = m - natsConn, err := nats.Connect(m.natsURL) - if err != nil { - return fmt.Errorf("nats: error connecting to nats at %s: %s", m.natsURL, err) - } - n.logger.Debugf("connected to nats at %s", m.natsURL) - - n.natsConn = natsConn - - return nil -} - -func (n *natsPubSub) Publish(req *pubsub.PublishRequest) error { - err := n.natsConn.Publish(req.Topic, req.Data) - if err != nil { - return fmt.Errorf("nats: error from publish: %s", err) - } - - return nil -} - -func (n *natsPubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error { - sub, err := n.natsConn.QueueSubscribe(req.Topic, n.metadata.natsQueueGroupName, func(natsMsg *nats.Msg) { - handler(context.Background(), &pubsub.NewMessage{Topic: req.Topic, Data: natsMsg.Data}) - }) - if err != nil { - n.logger.Warnf("nats: error subscribe: %s", err) - } - n.logger.Debugf("nats: subscribed to subject %s with queue group %s", sub.Subject, sub.Queue) - - return nil -} - -func (n *natsPubSub) Close() error { - n.natsConn.Close() - - return nil -} - -func (n *natsPubSub) Features() []pubsub.Feature { - return nil -} diff --git a/pubsub/nats/nats_test.go b/pubsub/nats/nats_test.go deleted file mode 100644 index dfe805ef5f..0000000000 --- a/pubsub/nats/nats_test.go +++ /dev/null @@ -1,69 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation and Dapr Contributors. -// Licensed under the MIT License. -// ------------------------------------------------------------ - -package nats - -import ( - "errors" - "testing" - - "github.com/dapr/components-contrib/pubsub" - "github.com/stretchr/testify/assert" -) - -func TestParseNATSMetadata(t *testing.T) { - t.Run("metadata is correct", func(t *testing.T) { - fakeProperties := map[string]string{ - natsURL: "foonats1", - consumerID: "fooq1", - } - fakeMetaData := pubsub.Metadata{ - Properties: fakeProperties, - } - - // act - m, err := parseNATSMetadata(fakeMetaData) - - // assert - assert.NoError(t, err) - assert.NotEmpty(t, m.natsURL) - assert.NotEmpty(t, m.natsQueueGroupName) - assert.Equal(t, fakeProperties[natsURL], m.natsURL) - assert.Equal(t, fakeProperties[consumerID], m.natsQueueGroupName) - }) - - t.Run("queue is not given", func(t *testing.T) { - fakeProperties := map[string]string{ - natsURL: "foonats2", - consumerID: "", - } - - fakeMetaData := pubsub.Metadata{ - Properties: fakeProperties, - } - - // act - m, err := parseNATSMetadata(fakeMetaData) - // assert - assert.Error(t, errors.New("nats error: missing queue name"), err) - assert.Equal(t, fakeProperties[natsURL], m.natsURL) - assert.Empty(t, m.natsQueueGroupName) - }) - - t.Run("nats url is not given", func(t *testing.T) { - fakeProperties := map[string]string{ - natsURL: "", - consumerID: "fooq2", - } - fakeMetaData := pubsub.Metadata{ - Properties: fakeProperties, - } - // act - m, err := parseNATSMetadata(fakeMetaData) - // assert - assert.Error(t, errors.New("nats error: missing nats URL"), err) - assert.Empty(t, m.natsURL) - }) -} From 408fdcdcadf2df71da084b86894d85ef67d033ae Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Mon, 26 Jul 2021 20:55:53 +0300 Subject: [PATCH 7/9] removed debug message --- pubsub/aws/snssqs/snssqs.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index f520b0ed00..2ef8f5c82a 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -497,13 +497,6 @@ func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) } func (s *snsSqs) Close() error { - s.logger.Debugf("Close was called and is now NOOP") - // for _, sub := range s.subscriptions { - // s.snsClient.Unsubscribe(&sns.UnsubscribeInput{ - // SubscriptionArn: sub, - // }) - // } - return nil } From f3bee8a17760f2f62366be6ad708eb7b307ab7de Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Mon, 26 Jul 2021 21:40:55 +0300 Subject: [PATCH 8/9] raw string abort --- pubsub/aws/snssqs/snssqs.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 2ef8f5c82a..9dc4cda8d6 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -73,6 +73,10 @@ func NewSnsSqs(l logger.Logger) pubsub.PubSub { return &snsSqs{ logger: l, subscriptions: []*string{}, +<<<<<<< Updated upstream +======= + pattern: regexp.MustCompile(`[^a-zA-Z0-9_\-]+`), +>>>>>>> Stashed changes } } From cb0f2e5466eb65246868b2b76def097b065dde6b Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Mon, 26 Jul 2021 23:59:21 +0300 Subject: [PATCH 9/9] gofmt+remove regex and use byte iter --- pubsub/aws/snssqs/snssqs.go | 30 +++++++++++++++++------------- pubsub/aws/snssqs/snssqs_test.go | 1 - 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 9dc4cda8d6..1c8a7a332e 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -7,7 +7,6 @@ import ( "fmt" "strconv" "strings" - "regexp" "github.com/aws/aws-sdk-go/aws" sns "github.com/aws/aws-sdk-go/service/sns" @@ -66,17 +65,10 @@ const ( awsSnsTopicNameKey = "dapr-topic-name" ) -var awsSnsSqsAllowedCharsRe = regexp.MustCompile("[^a-zA-Z0-9_\\-]+") - - func NewSnsSqs(l logger.Logger) pubsub.PubSub { return &snsSqs{ logger: l, subscriptions: []*string{}, -<<<<<<< Updated upstream -======= - pattern: regexp.MustCompile(`[^a-zA-Z0-9_\-]+`), ->>>>>>> Stashed changes } } @@ -100,16 +92,28 @@ func parseInt64(input string, propertyName string) (int64, error) { return int64(number), nil } - // sanitize topic/queue name to conform with: // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-queues.html func nameToAWSSanitizedName(name string) string { - sanitizedName := awsSnsSqsAllowedCharsRe.ReplaceAllString(name, "") - if len(sanitizedName) > 80 { - sanitizedName = sanitizedName[:80] + s := []byte(name) + + j := 0 + for _, b := range s { + if ('a' <= b && b <= 'z') || + ('A' <= b && b <= 'Z') || + ('0' <= b && b <= '9') || + (b == '-') || + (b == '_') { + s[j] = b + j++ + + if j == 80 { + break + } + } } - return sanitizedName + return string(s[:j]) } func (s *snsSqs) getSnsSqsMetatdata(metadata pubsub.Metadata) (*snsSqsMetadata, error) { diff --git a/pubsub/aws/snssqs/snssqs_test.go b/pubsub/aws/snssqs/snssqs_test.go index b5a61caf37..835100bc83 100644 --- a/pubsub/aws/snssqs/snssqs_test.go +++ b/pubsub/aws/snssqs/snssqs_test.go @@ -234,7 +234,6 @@ func Test_parseInt64(t *testing.T) { r.Error(err) } - func Test_replaceNameToAWSSanitizedName(t *testing.T) { r := require.New(t)