@@ -2,7 +2,6 @@ package snssqs
22
33import (
44 "context"
5- "crypto/sha256"
65 "encoding/json"
76 "errors"
87 "fmt"
@@ -20,8 +19,8 @@ import (
2019type snsSqs struct {
2120 // key is the topic name, value is the ARN of the topic
2221 topics map [string ]string
23- // key is the hashed topic name, value is the actual topic name
24- topicHash map [string ]string
22+ // key is the sanitized topic name, value is the actual topic name
23+ topicSanitized map [string ]string
2524 // key is the topic name, value holds the ARN of the queue and its url
2625 queues map [string ]* sqsQueueInfo
2726 snsClient * sns.SNS
@@ -93,13 +92,28 @@ func parseInt64(input string, propertyName string) (int64, error) {
9392 return int64 (number ), nil
9493}
9594
96- // take a name and hash it for compatibility with AWS resource names
97- // the output is fixed at 64 characters
98- func nameToHash (name string ) string {
99- h := sha256 .New ()
100- h .Write ([]byte (name ))
95+ // sanitize topic/queue name to conform with:
96+ // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-queues.html
97+ func nameToAWSSanitizedName (name string ) string {
98+ s := []byte (name )
99+
100+ j := 0
101+ for _ , b := range s {
102+ if ('a' <= b && b <= 'z' ) ||
103+ ('A' <= b && b <= 'Z' ) ||
104+ ('0' <= b && b <= '9' ) ||
105+ (b == '-' ) ||
106+ (b == '_' ) {
107+ s [j ] = b
108+ j ++
109+
110+ if j == 80 {
111+ break
112+ }
113+ }
114+ }
101115
102- return fmt . Sprintf ( "%x" , h . Sum ( nil ) )
116+ return string ( s [: j ] )
103117}
104118
105119func (s * snsSqs ) getSnsSqsMetatdata (metadata pubsub.Metadata ) (* snsSqsMetadata , error ) {
@@ -207,7 +221,7 @@ func (s *snsSqs) Init(metadata pubsub.Metadata) error {
207221 // both Publish and Subscribe need reference the topic ARN
208222 // track these ARNs in this map
209223 s .topics = make (map [string ]string )
210- s .topicHash = make (map [string ]string )
224+ s .topicSanitized = make (map [string ]string )
211225 s .queues = make (map [string ]* sqsQueueInfo )
212226 sess , err := aws_auth .GetClient (md .AccessKey , md .SecretKey , md .SessionToken , md .Region , md .Endpoint )
213227 if err != nil {
@@ -220,16 +234,16 @@ func (s *snsSqs) Init(metadata pubsub.Metadata) error {
220234}
221235
222236func (s * snsSqs ) createTopic (topic string ) (string , string , error ) {
223- hashedName := nameToHash (topic )
237+ sanitizedName := nameToAWSSanitizedName (topic )
224238 createTopicResponse , err := s .snsClient .CreateTopic (& sns.CreateTopicInput {
225- Name : aws .String (hashedName ),
239+ Name : aws .String (sanitizedName ),
226240 Tags : []* sns.Tag {{Key : aws .String (awsSnsTopicNameKey ), Value : aws .String (topic )}},
227241 })
228242 if err != nil {
229243 return "" , "" , err
230244 }
231245
232- return * (createTopicResponse .TopicArn ), hashedName , nil
246+ return * (createTopicResponse .TopicArn ), sanitizedName , nil
233247}
234248
235249// get the topic ARN from the topics map. If it doesn't exist in the map, try to fetch it from AWS, if it doesn't exist
@@ -245,7 +259,7 @@ func (s *snsSqs) getOrCreateTopic(topic string) (string, error) {
245259
246260 s .logger .Debugf ("No topic ARN found for %s\n Creating topic instead." , topic )
247261
248- topicArn , hashedName , err := s .createTopic (topic )
262+ topicArn , sanitizedName , err := s .createTopic (topic )
249263 if err != nil {
250264 s .logger .Errorf ("error creating new topic %s: %v" , topic , err )
251265
@@ -254,14 +268,14 @@ func (s *snsSqs) getOrCreateTopic(topic string) (string, error) {
254268
255269 // record topic ARN
256270 s .topics [topic ] = topicArn
257- s .topicHash [ hashedName ] = topic
271+ s .topicSanitized [ sanitizedName ] = topic
258272
259273 return topicArn , nil
260274}
261275
262276func (s * snsSqs ) createQueue (queueName string ) (* sqsQueueInfo , error ) {
263277 createQueueResponse , err := s .sqsClient .CreateQueue (& sqs.CreateQueueInput {
264- QueueName : aws .String (nameToHash (queueName )),
278+ QueueName : aws .String (nameToAWSSanitizedName (queueName )),
265279 Tags : map [string ]* string {awsSqsQueueNameKey : aws .String (queueName )},
266280 })
267281 if err != nil {
@@ -397,7 +411,7 @@ func (s *snsSqs) handleMessage(message *sqs.Message, queueInfo *sqsQueueInfo, ha
397411 }
398412
399413 topic := parseTopicArn (messageBody .TopicArn )
400- topic = s .topicHash [topic ]
414+ topic = s .topicSanitized [topic ]
401415 err = handler (context .Background (), & pubsub.NewMessage {
402416 Data : []byte (messageBody .Message ),
403417 Topic : topic ,
@@ -491,12 +505,6 @@ func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler)
491505}
492506
493507func (s * snsSqs ) Close () error {
494- for _ , sub := range s .subscriptions {
495- s .snsClient .Unsubscribe (& sns.UnsubscribeInput {
496- SubscriptionArn : sub ,
497- })
498- }
499-
500508 return nil
501509}
502510
0 commit comments