Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions pubsub/aws/snssqs/policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package snssqs

type arnEquals struct {
AwsSourceArn string `json:"aws:SourceArn"`
}

type condition struct {
ArnEquals arnEquals
}

type principal struct {
Service string
}

type statement struct {
Effect string
Principal principal
Action string
Resource string
Condition condition
}

type policy struct {
Version string
Statement []statement
}

func (p *policy) statementExists(other *statement) bool {
for _, s := range p.Statement {
if s.Effect == other.Effect &&
s.Principal.Service == other.Principal.Service &&
s.Action == other.Action &&
s.Resource == other.Resource &&
s.Condition.ArnEquals.AwsSourceArn == other.Condition.ArnEquals.AwsSourceArn {
return true
}
}

return false
}

func (p *policy) addStatement(other *statement) {
p.Statement = append(p.Statement, *other)
}
53 changes: 7 additions & 46 deletions pubsub/aws/snssqs/snssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,30 +74,10 @@ type snsSqsMetadata struct {
messageMaxNumber int64
}

type arnEquals struct {
AwsSourceArn string `json:"aws:SourceArn"`
}

type condition struct {
ArnEquals arnEquals
}

type statement struct {
Effect string
Principal string
Action string
Resource string
Condition condition
}

type policy struct {
Version string
Statement []statement
}

const (
awsSqsQueueNameKey = "dapr-queue-name"
awsSnsTopicNameKey = "dapr-topic-name"
awsSqsFifoSuffix = ".fifo"
maxAWSNameLength = 80
)

Expand Down Expand Up @@ -146,13 +126,11 @@ func parseBool(input string, propertyName string) (bool, error) {
// sanitize topic/queue name to conform with:
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-queues.html
func nameToAWSSanitizedName(name string, isFifo bool) string {
suffix := ".fifo"

// first remove suffix if exists, and user requested a FIFO name, then sanitize the passed in name.
hasFifoSuffix := false
if strings.HasSuffix(name, suffix) && isFifo {
if strings.HasSuffix(name, awsSqsFifoSuffix) && isFifo {
hasFifoSuffix = true
name = name[:len(name)-len(suffix)]
name = name[:len(name)-len(awsSqsFifoSuffix)]
}

s := []byte(name)
Expand All @@ -174,33 +152,16 @@ func nameToAWSSanitizedName(name string, isFifo bool) string {

// reattach/add the suffix to the sanitized name, trim more if adding the suffix would exceed the maxLength.
if hasFifoSuffix || isFifo {
delta := j + len(suffix) - maxAWSNameLength
delta := j + len(awsSqsFifoSuffix) - maxAWSNameLength
if delta > 0 {
j -= delta
}
return string(s[:j]) + suffix
return string(s[:j]) + awsSqsFifoSuffix
}

return string(s[:j])
}

func (p *policy) statementExists(other *statement) bool {
for _, s := range p.Statement {
if s.Effect == other.Effect &&
s.Principal == other.Principal &&
s.Action == other.Action &&
s.Resource == other.Resource &&
s.Condition.ArnEquals.AwsSourceArn == other.Condition.ArnEquals.AwsSourceArn {
return true
}
}
return false
}

func (p *policy) addStatement(other *statement) {
p.Statement = append(p.Statement, *other)
}

func (s *snsSqs) getSnsSqsMetatdata(metadata pubsub.Metadata) (*snsSqsMetadata, error) {
md := snsSqsMetadata{}
props := metadata.Properties
Expand Down Expand Up @@ -648,7 +609,7 @@ func (s *snsSqs) restrictQueuePublishPolicyToOnlySNS(sqsQueueInfo *sqsQueueInfo,

newStatement := &statement{
Effect: "Allow",
Principal: `{"Service": "sns.amazonaws.com"}`,
Principal: principal{Service: "sns.amazonaws.com"},
Action: "sqs:SendMessage",
Resource: sqsQueueInfo.arn,
Condition: condition{
Expand All @@ -658,7 +619,7 @@ func (s *snsSqs) restrictQueuePublishPolicyToOnlySNS(sqsQueueInfo *sqsQueueInfo,
},
}

policy := &policy{Version: "2012-11-05"}
policy := &policy{Version: "2012-10-17"}
if policyStr, ok := getQueueAttributesOutput.Attributes[sqs.QueueAttributeNamePolicy]; ok {
// look for the current statement if exists, else add it and store.
if err = json.Unmarshal([]byte(*policyStr), policy); err != nil {
Expand Down