Skip to content

Commit 253ef85

Browse files
Amit Mormthmuldersyaron2
authored
SNS and SQS topic/queue names + SNS subscription deletion bugfix (#1035)
* bugfix for sns topic deletion upon termination * Revert "bugfix for sns topic deletion upon termination" This reverts commit bcaa9bb. * wip on normalizing queue/topic names * sanitize queue and topic names * sanitized names. bugfix for close * # This is a combination of 4 commits. # This is the 1st commit message: Improve error message in case of missing property (#1012) Co-authored-by: Artur Souza <artursouza.ms@outlook.com> # This is the commit message #2: Remove vestigial pubsub/nats code (#1024) The pubsub/nats component was replaced by pubsub/natsstreaming as part of dapr/dapr#2003, but the corresponding code in dapr/components-contrib was not removed, so this change removes it. # This is the commit message #3: bugfix for sns topic deletion upon termination # This is the commit message #4: Revert "bugfix for sns topic deletion upon termination" This reverts commit bcaa9bb. * removed debug message * raw string abort * gofmt+remove regex and use byte iter Co-authored-by: Maarten Mulders <mthmulders@users.noreply.github.com> Co-authored-by: Yaron Schneider <yaronsc@microsoft.com>
1 parent 38de305 commit 253ef85

File tree

2 files changed

+37
-40
lines changed

2 files changed

+37
-40
lines changed

pubsub/aws/snssqs/snssqs.go

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package snssqs
22

33
import (
44
"context"
5-
"crypto/sha256"
65
"encoding/json"
76
"errors"
87
"fmt"
@@ -20,8 +19,8 @@ import (
2019
type snsSqs struct {
2120
// key is the topic name, value is the ARN of the topic
2221
topics map[string]string
23-
// key is the hashed topic name, value is the actual topic name
24-
topicHash map[string]string
22+
// key is the sanitized topic name, value is the actual topic name
23+
topicSanitized map[string]string
2524
// key is the topic name, value holds the ARN of the queue and its url
2625
queues map[string]*sqsQueueInfo
2726
snsClient *sns.SNS
@@ -93,13 +92,28 @@ func parseInt64(input string, propertyName string) (int64, error) {
9392
return int64(number), nil
9493
}
9594

96-
// take a name and hash it for compatibility with AWS resource names
97-
// the output is fixed at 64 characters
98-
func nameToHash(name string) string {
99-
h := sha256.New()
100-
h.Write([]byte(name))
95+
// sanitize topic/queue name to conform with:
96+
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-queues.html
97+
func nameToAWSSanitizedName(name string) string {
98+
s := []byte(name)
99+
100+
j := 0
101+
for _, b := range s {
102+
if ('a' <= b && b <= 'z') ||
103+
('A' <= b && b <= 'Z') ||
104+
('0' <= b && b <= '9') ||
105+
(b == '-') ||
106+
(b == '_') {
107+
s[j] = b
108+
j++
109+
110+
if j == 80 {
111+
break
112+
}
113+
}
114+
}
101115

102-
return fmt.Sprintf("%x", h.Sum(nil))
116+
return string(s[:j])
103117
}
104118

105119
func (s *snsSqs) getSnsSqsMetatdata(metadata pubsub.Metadata) (*snsSqsMetadata, error) {
@@ -207,7 +221,7 @@ func (s *snsSqs) Init(metadata pubsub.Metadata) error {
207221
// both Publish and Subscribe need reference the topic ARN
208222
// track these ARNs in this map
209223
s.topics = make(map[string]string)
210-
s.topicHash = make(map[string]string)
224+
s.topicSanitized = make(map[string]string)
211225
s.queues = make(map[string]*sqsQueueInfo)
212226
sess, err := aws_auth.GetClient(md.AccessKey, md.SecretKey, md.SessionToken, md.Region, md.Endpoint)
213227
if err != nil {
@@ -220,16 +234,16 @@ func (s *snsSqs) Init(metadata pubsub.Metadata) error {
220234
}
221235

222236
func (s *snsSqs) createTopic(topic string) (string, string, error) {
223-
hashedName := nameToHash(topic)
237+
sanitizedName := nameToAWSSanitizedName(topic)
224238
createTopicResponse, err := s.snsClient.CreateTopic(&sns.CreateTopicInput{
225-
Name: aws.String(hashedName),
239+
Name: aws.String(sanitizedName),
226240
Tags: []*sns.Tag{{Key: aws.String(awsSnsTopicNameKey), Value: aws.String(topic)}},
227241
})
228242
if err != nil {
229243
return "", "", err
230244
}
231245

232-
return *(createTopicResponse.TopicArn), hashedName, nil
246+
return *(createTopicResponse.TopicArn), sanitizedName, nil
233247
}
234248

235249
// get the topic ARN from the topics map. If it doesn't exist in the map, try to fetch it from AWS, if it doesn't exist
@@ -245,7 +259,7 @@ func (s *snsSqs) getOrCreateTopic(topic string) (string, error) {
245259

246260
s.logger.Debugf("No topic ARN found for %s\n Creating topic instead.", topic)
247261

248-
topicArn, hashedName, err := s.createTopic(topic)
262+
topicArn, sanitizedName, err := s.createTopic(topic)
249263
if err != nil {
250264
s.logger.Errorf("error creating new topic %s: %v", topic, err)
251265

@@ -254,14 +268,14 @@ func (s *snsSqs) getOrCreateTopic(topic string) (string, error) {
254268

255269
// record topic ARN
256270
s.topics[topic] = topicArn
257-
s.topicHash[hashedName] = topic
271+
s.topicSanitized[sanitizedName] = topic
258272

259273
return topicArn, nil
260274
}
261275

262276
func (s *snsSqs) createQueue(queueName string) (*sqsQueueInfo, error) {
263277
createQueueResponse, err := s.sqsClient.CreateQueue(&sqs.CreateQueueInput{
264-
QueueName: aws.String(nameToHash(queueName)),
278+
QueueName: aws.String(nameToAWSSanitizedName(queueName)),
265279
Tags: map[string]*string{awsSqsQueueNameKey: aws.String(queueName)},
266280
})
267281
if err != nil {
@@ -397,7 +411,7 @@ func (s *snsSqs) handleMessage(message *sqs.Message, queueInfo *sqsQueueInfo, ha
397411
}
398412

399413
topic := parseTopicArn(messageBody.TopicArn)
400-
topic = s.topicHash[topic]
414+
topic = s.topicSanitized[topic]
401415
err = handler(context.Background(), &pubsub.NewMessage{
402416
Data: []byte(messageBody.Message),
403417
Topic: topic,
@@ -491,12 +505,6 @@ func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler)
491505
}
492506

493507
func (s *snsSqs) Close() error {
494-
for _, sub := range s.subscriptions {
495-
s.snsClient.Unsubscribe(&sns.UnsubscribeInput{
496-
SubscriptionArn: sub,
497-
})
498-
}
499-
500508
return nil
501509
}
502510

pubsub/aws/snssqs/snssqs_test.go

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package snssqs
22

33
import (
4-
"fmt"
5-
"strings"
64
"testing"
75

86
"github.com/dapr/components-contrib/pubsub"
@@ -236,22 +234,13 @@ func Test_parseInt64(t *testing.T) {
236234
r.Error(err)
237235
}
238236

239-
func Test_nameToHash(t *testing.T) {
237+
func Test_replaceNameToAWSSanitizedName(t *testing.T) {
240238
r := require.New(t)
241239

242-
// This string is too long and contains invalid character for either an SQS queue or an SNS topic
243-
hashedName := nameToHash(`
244-
Some invalid name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid
240+
s := `Some_invalid-name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid
245241
name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid name // for an
246-
AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^
247-
`)
248-
249-
r.Equal(64, len(hashedName))
250-
// Output is only expected to contain lower case characters representing valid hexadecimal numerals
251-
for _, c := range hashedName {
252-
r.True(
253-
strings.ContainsAny(
254-
"abcdef0123456789", string(c)),
255-
fmt.Sprintf("Invalid character %s in hashed name", string(c)))
256-
}
242+
AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^`
243+
v := nameToAWSSanitizedName(s)
244+
r.Equal(80, len(v))
245+
r.Equal("Some_invalid-nameforanAWSresourceSomeinvalidnameforanAWSresourceSomeinvalidnamef", v)
257246
}

0 commit comments

Comments
 (0)