Skip to content

Commit

Permalink
Merge pull request #206 from p4tin/issue_205
Browse files Browse the repository at this point in the history
Fixed SNS config for http added sns http raw delivery
  • Loading branch information
p4tin committed Oct 5, 2019
2 parents 333fc18 + f7a1691 commit 709c065
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 59 deletions.
3 changes: 3 additions & 0 deletions app/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package app

/*** config ***/
type EnvSubsciption struct {
Protocol string
EndPoint string
TopicArn string
QueueName string
Raw bool
FilterPolicy string
Expand Down
2 changes: 1 addition & 1 deletion app/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func HashAttributes(attributes map[string]app.MessageAttributeValue) string {

func sortedKeys(attributes map[string]app.MessageAttributeValue) []string {
var keys []string
for key, _ := range attributes {
for key := range attributes {
keys = append(keys, key)
}
sort.Strings(keys)
Expand Down
82 changes: 50 additions & 32 deletions app/conf/config.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package conf

import (
"encoding/json"
"io/ioutil"
"path/filepath"
"strings"

log "github.com/sirupsen/logrus"

"encoding/json"

"github.com/ghodss/yaml"
"github.com/p4tin/goaws/app"
"github.com/p4tin/goaws/app/common"
Expand Down Expand Up @@ -105,40 +105,26 @@ func LoadYamlConfig(filename string, env string) []string {

for _, subs := range topic.Subscriptions {
if _, ok := app.SyncQueues.Queues[subs.QueueName]; !ok {
//Queue does not exist yet, create it.
queueUrl := "http://" + app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port +
"/" + app.CurrentEnvironment.AccountID + "/" + subs.QueueName
if app.CurrentEnvironment.Region != "" {
queueUrl = "http://sqs." + app.CurrentEnvironment.Region + "." + app.CurrentEnvironment.Host + ":" +
app.CurrentEnvironment.Port + "/" + app.CurrentEnvironment.AccountID + "/" + subs.QueueName
var newSub *app.Subscription
if strings.Contains(subs.Protocol, "http") {
newSub = createHttpSubscription(subs)
} else {
//Queue does not exist yet, create it.
newSub = createSqsSubscription(subs, topicArn)
}
queueArn := "arn:aws:sqs:" + app.CurrentEnvironment.Region + ":" + app.CurrentEnvironment.AccountID + ":" + subs.QueueName
app.SyncQueues.Queues[subs.QueueName] = &app.Queue{
Name: subs.QueueName,
TimeoutSecs: app.CurrentEnvironment.QueueAttributeDefaults.VisibilityTimeout,
Arn: queueArn,
URL: queueUrl,
ReceiveWaitTimeSecs: app.CurrentEnvironment.QueueAttributeDefaults.ReceiveMessageWaitTimeSeconds,
IsFIFO: app.HasFIFOQueueName(subs.QueueName),
if subs.FilterPolicy != "" {
filterPolicy := &app.FilterPolicy{}
err = json.Unmarshal([]byte(subs.FilterPolicy), filterPolicy)
if err != nil {
log.Errorf("err: %s", err)
return ports
}
newSub.FilterPolicy = filterPolicy
}
}
qArn := app.SyncQueues.Queues[subs.QueueName].Arn
newSub := &app.Subscription{EndPoint: qArn, Protocol: "sqs", TopicArn: topicArn, Raw: subs.Raw}
subArn, _ := common.NewUUID()
subArn = topicArn + ":" + subArn
newSub.SubscriptionArn = subArn

if subs.FilterPolicy != "" {
filterPolicy := &app.FilterPolicy{}
err = json.Unmarshal([]byte(subs.FilterPolicy), filterPolicy)
if err != nil {
log.Errorf("err: %s", err)
return ports
}
newSub.FilterPolicy = filterPolicy

newTopic.Subscriptions = append(newTopic.Subscriptions, newSub)
}

newTopic.Subscriptions = append(newTopic.Subscriptions, newSub)
}
app.SyncTopics.Topics[topic.Name] = newTopic
}
Expand All @@ -148,3 +134,35 @@ func LoadYamlConfig(filename string, env string) []string {

return ports
}

func createHttpSubscription(configSubscription app.EnvSubsciption) *app.Subscription {
newSub := &app.Subscription{EndPoint: configSubscription.EndPoint, Protocol: configSubscription.Protocol, TopicArn: configSubscription.TopicArn, Raw: configSubscription.Raw}
subArn, _ := common.NewUUID()
subArn = configSubscription.TopicArn + ":" + subArn
newSub.SubscriptionArn = subArn
return newSub
}

func createSqsSubscription(configSubscription app.EnvSubsciption, topicArn string) *app.Subscription {
queueUrl := "http://" + app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port +
"/" + app.CurrentEnvironment.AccountID + "/" + configSubscription.QueueName
if app.CurrentEnvironment.Region != "" {
queueUrl = "http://sqs." + app.CurrentEnvironment.Region + "." + app.CurrentEnvironment.Host + ":" +
app.CurrentEnvironment.Port + "/" + app.CurrentEnvironment.AccountID + "/" + configSubscription.QueueName
}
queueArn := "arn:aws:sqs:" + app.CurrentEnvironment.Region + ":" + app.CurrentEnvironment.AccountID + ":" + configSubscription.QueueName
app.SyncQueues.Queues[configSubscription.QueueName] = &app.Queue{
Name: configSubscription.QueueName,
TimeoutSecs: app.CurrentEnvironment.QueueAttributeDefaults.VisibilityTimeout,
Arn: queueArn,
URL: queueUrl,
ReceiveWaitTimeSecs: app.CurrentEnvironment.QueueAttributeDefaults.ReceiveMessageWaitTimeSeconds,
IsFIFO: app.HasFIFOQueueName(configSubscription.QueueName),
}
qArn := app.SyncQueues.Queues[configSubscription.QueueName].Arn
newSub := &app.Subscription{EndPoint: qArn, Protocol: "sqs", TopicArn: topicArn, Raw: configSubscription.Raw}
subArn, _ := common.NewUUID()
subArn = topicArn + ":" + subArn
newSub.SubscriptionArn = subArn
return newSub
}
5 changes: 0 additions & 5 deletions app/conf/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,4 @@ func TestConfig_NoQueueAttributeDefaults(t *testing.T) {
if receiveWaitTime != 20 {
t.Errorf("Expected local-queue2 Queue to be configured with ReceiveMessageWaitTimeSeconds: 20 but got %d\n", receiveWaitTime)
}

filterPolicy := app.SyncTopics.Topics["local-topic1"].Subscriptions[1].FilterPolicy
if (*filterPolicy)["foo"][0] != "bar" {
t.Errorf("Expected FilterPolicy subscription on local-topic1 to be: bar but got %s\n", (*filterPolicy)["foo"][0])
}
}
21 changes: 14 additions & 7 deletions app/conf/goaws.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
Local: # Environment name that can be passed on the command line
# (i.e.: ./goaws [Local | Dev] -- defaults to 'Local')
# (i.e.: ./goaws [Local | Dev] -- defaults to 'Local')
Host: goaws.com # hostname of the goaws system (for docker-compose this is the tag name of the container)
# you can now use either 1 port for both sns and sqs or alternatively you can comment out Port and use SqsPort + SnsPort for compatibilyt with
# yopa and (fage-sns + face-sqs). If both ways are in the config file on the one "Port" will be used by GoAws
# you can now use either 1 port for both sns and sqs or alternatively you can comment out Port and use SqsPort + SnsPort for compatibilyt with
# yopa and (fage-sns + face-sqs). If both ways are in the config file on the one "Port" will be used by GoAws
Port: 4100 # port to listen on.
# SqsPort: 9324 # alterante Sqs Port
# SnsPort: 9292 # alternate Sns Port
# SqsPort: 9324 # alterante Sqs Port
# SnsPort: 9292 # alternate Sns Port
Region: us-east-1
AccountId: "100010001000"
LogMessages: true # Log messages (true/false)
Expand All @@ -26,12 +26,19 @@ Local: # Environment name that can be passed on the
Raw: true # Raw message delivery (true/false)
#FilterPolicy: '{"foo": ["bar"]}' # Subscription's FilterPolicy, json object as a string
- Name: local-topic2 # Topic name - no Subscriptions
- Name: my_topic # Topic name - http subscription
Subscriptions:
- Protocol: https
EndPoint: https://enkrogwitfcgi.x.pipedream.net
TopicArn: arn:aws:sns:us-east-1:000000000000:my_topic
FilterPolicy: '{"event": ["my_event"]}'
Raw: true

Dev: # Another environment
Host: localhost
Port: 4100
# SqsPort: 9324
# SnsPort: 9292
# SqsPort: 9324
# SnsPort: 9292
AccountId: "794373491471"
LogMessages: true
LogFile: ./goaws_messages.log
Expand Down
6 changes: 3 additions & 3 deletions app/conf/mock-data/mock-config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Local: # Environment name that can be passed on the command line
# (i.e.: ./goaws [Local | Dev] -- defaults to 'Local')
# (i.e.: ./goaws [Local | Dev] -- defaults to 'Local')
Host: localhost # hostname of the goaws system (for docker-compose this is the tag name of the container)
Port: 4100 # port to listen on.
Region: us-east-1
Expand All @@ -24,7 +24,7 @@ Local: # Environment name that can be passed on the
FilterPolicy: '{"foo":["bar"]}' # Subscription's FilterPolicy, json like a string
- Name: local-topic2 # Topic name - no Subscriptions

NoQueuesOrTopics: # Another environment
NoQueuesOrTopics: # Another environment
Host: localhost
Port: 4100
LogMessages: true
Expand All @@ -40,4 +40,4 @@ NoQueueAttributeDefaults:
Queues:
- Name: local-queue1
- Name: local-queue2
ReceiveMessageWaitTimeSeconds: 20
ReceiveMessageWaitTimeSeconds: 20
15 changes: 11 additions & 4 deletions app/gosns/gosns.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func Subscribe(w http.ResponseWriter, req *http.Request) {
} else {
snsMSG.Signature = signature
}
err = callEndpoint(subscription.EndPoint, uuid, *snsMSG)
err = callEndpoint(subscription.EndPoint, uuid, *snsMSG, subscription.Raw)
if err != nil {
log.Error("Error posting to url ", err)
}
Expand Down Expand Up @@ -563,7 +563,7 @@ func publishHTTP(subs *app.Subscription, messageBody string, messageAttributes m
} else {
msg.Signature = signature
}
err = callEndpoint(subs.EndPoint, subs.SubscriptionArn, msg)
err = callEndpoint(subs.EndPoint, subs.SubscriptionArn, msg, subs.Raw)
if err != nil {
log.WithFields(log.Fields{
"EndPoint": subs.EndPoint,
Expand All @@ -584,13 +584,20 @@ func formatAttributes(values map[string]app.MessageAttributeValue) map[string]ap
return attr
}

func callEndpoint(endpoint string, subArn string, msg app.SNSMessage) error {
func callEndpoint(endpoint string, subArn string, msg app.SNSMessage, raw bool) error {
log.WithFields(log.Fields{
"sns": msg,
"subArn": subArn,
"endpoint": endpoint,
}).Debug("Calling endpoint")
byteData, err := json.Marshal(msg)
var err error
var byteData []byte

if raw {
byteData, err = json.Marshal(msg.Message)
} else {
byteData, err = json.Marshal(msg)
}
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions app/sns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,23 @@ func TestFilterPolicy_IsSatisfiedBy(t *testing.T) {
}{
{
&FilterPolicy{"foo": {"bar"}},
map[string]MessageAttributeValue{"foo": MessageAttributeValue{DataType: "String", Value: "bar"}},
map[string]MessageAttributeValue{"foo": {DataType: "String", Value: "bar"}},
true,
},
{
&FilterPolicy{"foo": {"bar", "xyz"}},
map[string]MessageAttributeValue{"foo": MessageAttributeValue{DataType: "String", Value: "xyz"}},
map[string]MessageAttributeValue{"foo": {DataType: "String", Value: "xyz"}},
true,
},
{
&FilterPolicy{"foo": {"bar", "xyz"}, "abc": {"def"}},
map[string]MessageAttributeValue{"foo": MessageAttributeValue{DataType: "String", Value: "xyz"},
"abc": MessageAttributeValue{DataType: "String", Value: "def"}},
map[string]MessageAttributeValue{"foo": {DataType: "String", Value: "xyz"},
"abc": {DataType: "String", Value: "def"}},
true,
},
{
&FilterPolicy{"foo": {"bar"}},
map[string]MessageAttributeValue{"foo": MessageAttributeValue{DataType: "String", Value: "baz"}},
map[string]MessageAttributeValue{"foo": {DataType: "String", Value: "baz"}},
false,
},
{
Expand All @@ -38,12 +38,12 @@ func TestFilterPolicy_IsSatisfiedBy(t *testing.T) {
},
{
&FilterPolicy{"foo": {"bar"}, "abc": {"def"}},
map[string]MessageAttributeValue{"foo": MessageAttributeValue{DataType: "String", Value: "bar"}},
map[string]MessageAttributeValue{"foo": {DataType: "String", Value: "bar"}},
false,
},
{
&FilterPolicy{"foo": {"bar"}},
map[string]MessageAttributeValue{"foo": MessageAttributeValue{DataType: "Binary", Value: "bar"}},
map[string]MessageAttributeValue{"foo": {DataType: "Binary", Value: "bar"}},
false,
},
}
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384 h1:TFlARGu6Czu1z7q93HTxcP1P+/ZFC/IKythI5RzrnRg=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
gopkg.in/airbrake/gobrake.v2 v2.0.9 h1:7z2uVWwn7oVeeugY1DtlPAy5H+KYgB1KeKTnqjNatLo=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
Expand Down

0 comments on commit 709c065

Please sign in to comment.