From f7a16919e043c9a38e66b56997c61f041181c405 Mon Sep 17 00:00:00 2001 From: Paul Fortin Date: Sat, 5 Oct 2019 10:57:18 -0400 Subject: [PATCH] Fixed SNS config for http added sns http raw delivery --- app/common.go | 3 ++ app/common/common.go | 2 +- app/conf/config.go | 82 ++++++++++++++++++----------- app/conf/config_test.go | 5 -- app/conf/goaws.yaml | 21 +++++--- app/conf/mock-data/mock-config.yaml | 6 +-- app/gosns/gosns.go | 15 ++++-- app/sns_test.go | 14 ++--- go.mod | 1 + go.sum | 1 + 10 files changed, 91 insertions(+), 59 deletions(-) diff --git a/app/common.go b/app/common.go index 3f4b12332..1f8abea53 100644 --- a/app/common.go +++ b/app/common.go @@ -2,6 +2,9 @@ package app /*** config ***/ type EnvSubsciption struct { + Protocol string + EndPoint string + TopicArn string QueueName string Raw bool FilterPolicy string diff --git a/app/common/common.go b/app/common/common.go index 3e11b50dd..ab246d411 100644 --- a/app/common/common.go +++ b/app/common/common.go @@ -60,7 +60,7 @@ func HashAttributes(attributes map[string]app.MessageAttributeValue) string { func sortedKeys(attributes map[string]app.MessageAttributeValue) []string { var keys []string - for key, _ := range attributes { + for key := range attributes { keys = append(keys, key) } sort.Strings(keys) diff --git a/app/conf/config.go b/app/conf/config.go index 1a7401885..743458729 100644 --- a/app/conf/config.go +++ b/app/conf/config.go @@ -1,13 +1,13 @@ package conf import ( + "encoding/json" "io/ioutil" "path/filepath" + "strings" log "github.com/sirupsen/logrus" - "encoding/json" - "github.com/ghodss/yaml" "github.com/p4tin/goaws/app" "github.com/p4tin/goaws/app/common" @@ -105,40 +105,26 @@ func LoadYamlConfig(filename string, env string) []string { for _, subs := range topic.Subscriptions { if _, ok := app.SyncQueues.Queues[subs.QueueName]; !ok { - //Queue does not exist yet, create it. - queueUrl := "http://" + app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port + - "/" + app.CurrentEnvironment.AccountID + "/" + subs.QueueName - if app.CurrentEnvironment.Region != "" { - queueUrl = "http://sqs." + app.CurrentEnvironment.Region + "." + app.CurrentEnvironment.Host + ":" + - app.CurrentEnvironment.Port + "/" + app.CurrentEnvironment.AccountID + "/" + subs.QueueName + var newSub *app.Subscription + if strings.Contains(subs.Protocol, "http") { + newSub = createHttpSubscription(subs) + } else { + //Queue does not exist yet, create it. + newSub = createSqsSubscription(subs, topicArn) } - queueArn := "arn:aws:sqs:" + app.CurrentEnvironment.Region + ":" + app.CurrentEnvironment.AccountID + ":" + subs.QueueName - app.SyncQueues.Queues[subs.QueueName] = &app.Queue{ - Name: subs.QueueName, - TimeoutSecs: app.CurrentEnvironment.QueueAttributeDefaults.VisibilityTimeout, - Arn: queueArn, - URL: queueUrl, - ReceiveWaitTimeSecs: app.CurrentEnvironment.QueueAttributeDefaults.ReceiveMessageWaitTimeSeconds, - IsFIFO: app.HasFIFOQueueName(subs.QueueName), + if subs.FilterPolicy != "" { + filterPolicy := &app.FilterPolicy{} + err = json.Unmarshal([]byte(subs.FilterPolicy), filterPolicy) + if err != nil { + log.Errorf("err: %s", err) + return ports + } + newSub.FilterPolicy = filterPolicy } - } - qArn := app.SyncQueues.Queues[subs.QueueName].Arn - newSub := &app.Subscription{EndPoint: qArn, Protocol: "sqs", TopicArn: topicArn, Raw: subs.Raw} - subArn, _ := common.NewUUID() - subArn = topicArn + ":" + subArn - newSub.SubscriptionArn = subArn - - if subs.FilterPolicy != "" { - filterPolicy := &app.FilterPolicy{} - err = json.Unmarshal([]byte(subs.FilterPolicy), filterPolicy) - if err != nil { - log.Errorf("err: %s", err) - return ports - } - newSub.FilterPolicy = filterPolicy + + newTopic.Subscriptions = append(newTopic.Subscriptions, newSub) } - newTopic.Subscriptions = append(newTopic.Subscriptions, newSub) } app.SyncTopics.Topics[topic.Name] = newTopic } @@ -148,3 +134,35 @@ func LoadYamlConfig(filename string, env string) []string { return ports } + +func createHttpSubscription(configSubscription app.EnvSubsciption) *app.Subscription { + newSub := &app.Subscription{EndPoint: configSubscription.EndPoint, Protocol: configSubscription.Protocol, TopicArn: configSubscription.TopicArn, Raw: configSubscription.Raw} + subArn, _ := common.NewUUID() + subArn = configSubscription.TopicArn + ":" + subArn + newSub.SubscriptionArn = subArn + return newSub +} + +func createSqsSubscription(configSubscription app.EnvSubsciption, topicArn string) *app.Subscription { + queueUrl := "http://" + app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port + + "/" + app.CurrentEnvironment.AccountID + "/" + configSubscription.QueueName + if app.CurrentEnvironment.Region != "" { + queueUrl = "http://sqs." + app.CurrentEnvironment.Region + "." + app.CurrentEnvironment.Host + ":" + + app.CurrentEnvironment.Port + "/" + app.CurrentEnvironment.AccountID + "/" + configSubscription.QueueName + } + queueArn := "arn:aws:sqs:" + app.CurrentEnvironment.Region + ":" + app.CurrentEnvironment.AccountID + ":" + configSubscription.QueueName + app.SyncQueues.Queues[configSubscription.QueueName] = &app.Queue{ + Name: configSubscription.QueueName, + TimeoutSecs: app.CurrentEnvironment.QueueAttributeDefaults.VisibilityTimeout, + Arn: queueArn, + URL: queueUrl, + ReceiveWaitTimeSecs: app.CurrentEnvironment.QueueAttributeDefaults.ReceiveMessageWaitTimeSeconds, + IsFIFO: app.HasFIFOQueueName(configSubscription.QueueName), + } + qArn := app.SyncQueues.Queues[configSubscription.QueueName].Arn + newSub := &app.Subscription{EndPoint: qArn, Protocol: "sqs", TopicArn: topicArn, Raw: configSubscription.Raw} + subArn, _ := common.NewUUID() + subArn = topicArn + ":" + subArn + newSub.SubscriptionArn = subArn + return newSub +} diff --git a/app/conf/config_test.go b/app/conf/config_test.go index 4f11686f8..eec8f7d0e 100644 --- a/app/conf/config_test.go +++ b/app/conf/config_test.go @@ -97,9 +97,4 @@ func TestConfig_NoQueueAttributeDefaults(t *testing.T) { if receiveWaitTime != 20 { t.Errorf("Expected local-queue2 Queue to be configured with ReceiveMessageWaitTimeSeconds: 20 but got %d\n", receiveWaitTime) } - - filterPolicy := app.SyncTopics.Topics["local-topic1"].Subscriptions[1].FilterPolicy - if (*filterPolicy)["foo"][0] != "bar" { - t.Errorf("Expected FilterPolicy subscription on local-topic1 to be: bar but got %s\n", (*filterPolicy)["foo"][0]) - } } diff --git a/app/conf/goaws.yaml b/app/conf/goaws.yaml index 05fdcc860..29b1f1164 100644 --- a/app/conf/goaws.yaml +++ b/app/conf/goaws.yaml @@ -1,11 +1,11 @@ Local: # Environment name that can be passed on the command line - # (i.e.: ./goaws [Local | Dev] -- defaults to 'Local') + # (i.e.: ./goaws [Local | Dev] -- defaults to 'Local') Host: goaws.com # hostname of the goaws system (for docker-compose this is the tag name of the container) -# you can now use either 1 port for both sns and sqs or alternatively you can comment out Port and use SqsPort + SnsPort for compatibilyt with -# yopa and (fage-sns + face-sqs). If both ways are in the config file on the one "Port" will be used by GoAws + # you can now use either 1 port for both sns and sqs or alternatively you can comment out Port and use SqsPort + SnsPort for compatibilyt with + # yopa and (fage-sns + face-sqs). If both ways are in the config file on the one "Port" will be used by GoAws Port: 4100 # port to listen on. -# SqsPort: 9324 # alterante Sqs Port -# SnsPort: 9292 # alternate Sns Port + # SqsPort: 9324 # alterante Sqs Port + # SnsPort: 9292 # alternate Sns Port Region: us-east-1 AccountId: "100010001000" LogMessages: true # Log messages (true/false) @@ -26,12 +26,19 @@ Local: # Environment name that can be passed on the Raw: true # Raw message delivery (true/false) #FilterPolicy: '{"foo": ["bar"]}' # Subscription's FilterPolicy, json object as a string - Name: local-topic2 # Topic name - no Subscriptions + - Name: my_topic # Topic name - http subscription + Subscriptions: + - Protocol: https + EndPoint: https://enkrogwitfcgi.x.pipedream.net + TopicArn: arn:aws:sns:us-east-1:000000000000:my_topic + FilterPolicy: '{"event": ["my_event"]}' + Raw: true Dev: # Another environment Host: localhost Port: 4100 -# SqsPort: 9324 -# SnsPort: 9292 + # SqsPort: 9324 + # SnsPort: 9292 AccountId: "794373491471" LogMessages: true LogFile: ./goaws_messages.log diff --git a/app/conf/mock-data/mock-config.yaml b/app/conf/mock-data/mock-config.yaml index 6d66b6a60..fd54c7147 100644 --- a/app/conf/mock-data/mock-config.yaml +++ b/app/conf/mock-data/mock-config.yaml @@ -1,5 +1,5 @@ Local: # Environment name that can be passed on the command line - # (i.e.: ./goaws [Local | Dev] -- defaults to 'Local') + # (i.e.: ./goaws [Local | Dev] -- defaults to 'Local') Host: localhost # hostname of the goaws system (for docker-compose this is the tag name of the container) Port: 4100 # port to listen on. Region: us-east-1 @@ -24,7 +24,7 @@ Local: # Environment name that can be passed on the FilterPolicy: '{"foo":["bar"]}' # Subscription's FilterPolicy, json like a string - Name: local-topic2 # Topic name - no Subscriptions -NoQueuesOrTopics: # Another environment +NoQueuesOrTopics: # Another environment Host: localhost Port: 4100 LogMessages: true @@ -40,4 +40,4 @@ NoQueueAttributeDefaults: Queues: - Name: local-queue1 - Name: local-queue2 - ReceiveMessageWaitTimeSeconds: 20 \ No newline at end of file + ReceiveMessageWaitTimeSeconds: 20 diff --git a/app/gosns/gosns.go b/app/gosns/gosns.go index b47a85ef9..2ff4f31a2 100644 --- a/app/gosns/gosns.go +++ b/app/gosns/gosns.go @@ -202,7 +202,7 @@ func Subscribe(w http.ResponseWriter, req *http.Request) { } else { snsMSG.Signature = signature } - err = callEndpoint(subscription.EndPoint, uuid, *snsMSG) + err = callEndpoint(subscription.EndPoint, uuid, *snsMSG, subscription.Raw) if err != nil { log.Error("Error posting to url ", err) } @@ -563,7 +563,7 @@ func publishHTTP(subs *app.Subscription, messageBody string, messageAttributes m } else { msg.Signature = signature } - err = callEndpoint(subs.EndPoint, subs.SubscriptionArn, msg) + err = callEndpoint(subs.EndPoint, subs.SubscriptionArn, msg, subs.Raw) if err != nil { log.WithFields(log.Fields{ "EndPoint": subs.EndPoint, @@ -584,13 +584,20 @@ func formatAttributes(values map[string]app.MessageAttributeValue) map[string]ap return attr } -func callEndpoint(endpoint string, subArn string, msg app.SNSMessage) error { +func callEndpoint(endpoint string, subArn string, msg app.SNSMessage, raw bool) error { log.WithFields(log.Fields{ "sns": msg, "subArn": subArn, "endpoint": endpoint, }).Debug("Calling endpoint") - byteData, err := json.Marshal(msg) + var err error + var byteData []byte + + if raw { + byteData, err = json.Marshal(msg.Message) + } else { + byteData, err = json.Marshal(msg) + } if err != nil { return err } diff --git a/app/sns_test.go b/app/sns_test.go index 00b031448..095cfb258 100644 --- a/app/sns_test.go +++ b/app/sns_test.go @@ -12,23 +12,23 @@ func TestFilterPolicy_IsSatisfiedBy(t *testing.T) { }{ { &FilterPolicy{"foo": {"bar"}}, - map[string]MessageAttributeValue{"foo": MessageAttributeValue{DataType: "String", Value: "bar"}}, + map[string]MessageAttributeValue{"foo": {DataType: "String", Value: "bar"}}, true, }, { &FilterPolicy{"foo": {"bar", "xyz"}}, - map[string]MessageAttributeValue{"foo": MessageAttributeValue{DataType: "String", Value: "xyz"}}, + map[string]MessageAttributeValue{"foo": {DataType: "String", Value: "xyz"}}, true, }, { &FilterPolicy{"foo": {"bar", "xyz"}, "abc": {"def"}}, - map[string]MessageAttributeValue{"foo": MessageAttributeValue{DataType: "String", Value: "xyz"}, - "abc": MessageAttributeValue{DataType: "String", Value: "def"}}, + map[string]MessageAttributeValue{"foo": {DataType: "String", Value: "xyz"}, + "abc": {DataType: "String", Value: "def"}}, true, }, { &FilterPolicy{"foo": {"bar"}}, - map[string]MessageAttributeValue{"foo": MessageAttributeValue{DataType: "String", Value: "baz"}}, + map[string]MessageAttributeValue{"foo": {DataType: "String", Value: "baz"}}, false, }, { @@ -38,12 +38,12 @@ func TestFilterPolicy_IsSatisfiedBy(t *testing.T) { }, { &FilterPolicy{"foo": {"bar"}, "abc": {"def"}}, - map[string]MessageAttributeValue{"foo": MessageAttributeValue{DataType: "String", Value: "bar"}}, + map[string]MessageAttributeValue{"foo": {DataType: "String", Value: "bar"}}, false, }, { &FilterPolicy{"foo": {"bar"}}, - map[string]MessageAttributeValue{"foo": MessageAttributeValue{DataType: "Binary", Value: "bar"}}, + map[string]MessageAttributeValue{"foo": {DataType: "Binary", Value: "bar"}}, false, }, } diff --git a/go.mod b/go.mod index 2c7f6ea36..f43dd25b4 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 // indirect github.com/stretchr/testify v1.1.4 golang.org/x/net v0.0.0-20190930134127-c5a3c61f89f3 // indirect + golang.org/x/tools v0.0.0-20190328211700-ab21143f2384 gopkg.in/airbrake/gobrake.v2 v2.0.9 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect diff --git a/go.sum b/go.sum index 135bc71d7..a351e0535 100644 --- a/go.sum +++ b/go.sum @@ -55,6 +55,7 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190328211700-ab21143f2384 h1:TFlARGu6Czu1z7q93HTxcP1P+/ZFC/IKythI5RzrnRg= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= gopkg.in/airbrake/gobrake.v2 v2.0.9 h1:7z2uVWwn7oVeeugY1DtlPAy5H+KYgB1KeKTnqjNatLo= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=