Skip to content

Commit cb48d67

Browse files
Amit Morjigargandhi
authored andcommitted
1335 bugfix dapr#2 (dapr#1368)
* bugfix for sns topic deletion upon termination * removed upstream github workflow files * Update snssqs.go * dapr bot schedule * read and append queue attributes * unnecessary escaping in json tag * unexporting structs * bugfix in policy * bugfix in policy. merged from master * fifo suffix as const Signed-off-by: jigargandhi <jigarr.gandhi@gmail.com>
1 parent 258102a commit cb48d67

File tree

2 files changed

+51
-46
lines changed

2 files changed

+51
-46
lines changed

pubsub/aws/snssqs/policy.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package snssqs
2+
3+
type arnEquals struct {
4+
AwsSourceArn string `json:"aws:SourceArn"`
5+
}
6+
7+
type condition struct {
8+
ArnEquals arnEquals
9+
}
10+
11+
type principal struct {
12+
Service string
13+
}
14+
15+
type statement struct {
16+
Effect string
17+
Principal principal
18+
Action string
19+
Resource string
20+
Condition condition
21+
}
22+
23+
type policy struct {
24+
Version string
25+
Statement []statement
26+
}
27+
28+
func (p *policy) statementExists(other *statement) bool {
29+
for _, s := range p.Statement {
30+
if s.Effect == other.Effect &&
31+
s.Principal.Service == other.Principal.Service &&
32+
s.Action == other.Action &&
33+
s.Resource == other.Resource &&
34+
s.Condition.ArnEquals.AwsSourceArn == other.Condition.ArnEquals.AwsSourceArn {
35+
return true
36+
}
37+
}
38+
39+
return false
40+
}
41+
42+
func (p *policy) addStatement(other *statement) {
43+
p.Statement = append(p.Statement, *other)
44+
}

pubsub/aws/snssqs/snssqs.go

Lines changed: 7 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -74,30 +74,10 @@ type snsSqsMetadata struct {
7474
messageMaxNumber int64
7575
}
7676

77-
type arnEquals struct {
78-
AwsSourceArn string `json:"aws:SourceArn"`
79-
}
80-
81-
type condition struct {
82-
ArnEquals arnEquals
83-
}
84-
85-
type statement struct {
86-
Effect string
87-
Principal string
88-
Action string
89-
Resource string
90-
Condition condition
91-
}
92-
93-
type policy struct {
94-
Version string
95-
Statement []statement
96-
}
97-
9877
const (
9978
awsSqsQueueNameKey = "dapr-queue-name"
10079
awsSnsTopicNameKey = "dapr-topic-name"
80+
awsSqsFifoSuffix = ".fifo"
10181
maxAWSNameLength = 80
10282
)
10383

@@ -146,13 +126,11 @@ func parseBool(input string, propertyName string) (bool, error) {
146126
// sanitize topic/queue name to conform with:
147127
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-queues.html
148128
func nameToAWSSanitizedName(name string, isFifo bool) string {
149-
suffix := ".fifo"
150-
151129
// first remove suffix if exists, and user requested a FIFO name, then sanitize the passed in name.
152130
hasFifoSuffix := false
153-
if strings.HasSuffix(name, suffix) && isFifo {
131+
if strings.HasSuffix(name, awsSqsFifoSuffix) && isFifo {
154132
hasFifoSuffix = true
155-
name = name[:len(name)-len(suffix)]
133+
name = name[:len(name)-len(awsSqsFifoSuffix)]
156134
}
157135

158136
s := []byte(name)
@@ -174,33 +152,16 @@ func nameToAWSSanitizedName(name string, isFifo bool) string {
174152

175153
// reattach/add the suffix to the sanitized name, trim more if adding the suffix would exceed the maxLength.
176154
if hasFifoSuffix || isFifo {
177-
delta := j + len(suffix) - maxAWSNameLength
155+
delta := j + len(awsSqsFifoSuffix) - maxAWSNameLength
178156
if delta > 0 {
179157
j -= delta
180158
}
181-
return string(s[:j]) + suffix
159+
return string(s[:j]) + awsSqsFifoSuffix
182160
}
183161

184162
return string(s[:j])
185163
}
186164

187-
func (p *policy) statementExists(other *statement) bool {
188-
for _, s := range p.Statement {
189-
if s.Effect == other.Effect &&
190-
s.Principal == other.Principal &&
191-
s.Action == other.Action &&
192-
s.Resource == other.Resource &&
193-
s.Condition.ArnEquals.AwsSourceArn == other.Condition.ArnEquals.AwsSourceArn {
194-
return true
195-
}
196-
}
197-
return false
198-
}
199-
200-
func (p *policy) addStatement(other *statement) {
201-
p.Statement = append(p.Statement, *other)
202-
}
203-
204165
func (s *snsSqs) getSnsSqsMetatdata(metadata pubsub.Metadata) (*snsSqsMetadata, error) {
205166
md := snsSqsMetadata{}
206167
props := metadata.Properties
@@ -648,7 +609,7 @@ func (s *snsSqs) restrictQueuePublishPolicyToOnlySNS(sqsQueueInfo *sqsQueueInfo,
648609

649610
newStatement := &statement{
650611
Effect: "Allow",
651-
Principal: `{"Service": "sns.amazonaws.com"}`,
612+
Principal: principal{Service: "sns.amazonaws.com"},
652613
Action: "sqs:SendMessage",
653614
Resource: sqsQueueInfo.arn,
654615
Condition: condition{
@@ -658,7 +619,7 @@ func (s *snsSqs) restrictQueuePublishPolicyToOnlySNS(sqsQueueInfo *sqsQueueInfo,
658619
},
659620
}
660621

661-
policy := &policy{Version: "2012-11-05"}
622+
policy := &policy{Version: "2012-10-17"}
662623
if policyStr, ok := getQueueAttributesOutput.Attributes[sqs.QueueAttributeNamePolicy]; ok {
663624
// look for the current statement if exists, else add it and store.
664625
if err = json.Unmarshal([]byte(*policyStr), policy); err != nil {

0 commit comments

Comments
 (0)