Skip to content

Commit

Permalink
#208 Fixed creating subscriptions on startup to exsiting queues
Browse files Browse the repository at this point in the history
  • Loading branch information
p4tin committed Oct 12, 2019
1 parent 44f52f0 commit b6cccf8
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 32 deletions.
63 changes: 31 additions & 32 deletions app/conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,27 +104,24 @@ func LoadYamlConfig(filename string, env string) []string {
newTopic.Subscriptions = make([]*app.Subscription, 0, 0)

for _, subs := range topic.Subscriptions {
if _, ok := app.SyncQueues.Queues[subs.QueueName]; !ok {
var newSub *app.Subscription
if strings.Contains(subs.Protocol, "http") {
newSub = createHttpSubscription(subs)
} else {
//Queue does not exist yet, create it.
newSub = createSqsSubscription(subs, topicArn)
}
if subs.FilterPolicy != "" {
filterPolicy := &app.FilterPolicy{}
err = json.Unmarshal([]byte(subs.FilterPolicy), filterPolicy)
if err != nil {
log.Errorf("err: %s", err)
return ports
}
newSub.FilterPolicy = filterPolicy
var newSub *app.Subscription
if strings.Contains(subs.Protocol, "http") {
newSub = createHttpSubscription(subs)
} else {
//Queue does not exist yet, create it.
newSub = createSqsSubscription(subs, topicArn)
}
if subs.FilterPolicy != "" {
filterPolicy := &app.FilterPolicy{}
err = json.Unmarshal([]byte(subs.FilterPolicy), filterPolicy)
if err != nil {
log.Errorf("err: %s", err)
return ports
}

newTopic.Subscriptions = append(newTopic.Subscriptions, newSub)
newSub.FilterPolicy = filterPolicy
}

newTopic.Subscriptions = append(newTopic.Subscriptions, newSub)
}
app.SyncTopics.Topics[topic.Name] = newTopic
}
Expand All @@ -144,20 +141,22 @@ func createHttpSubscription(configSubscription app.EnvSubsciption) *app.Subscrip
}

func createSqsSubscription(configSubscription app.EnvSubsciption, topicArn string) *app.Subscription {
queueUrl := "http://" + app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port +
"/" + app.CurrentEnvironment.AccountID + "/" + configSubscription.QueueName
if app.CurrentEnvironment.Region != "" {
queueUrl = "http://" + app.CurrentEnvironment.Region + "." + app.CurrentEnvironment.Host + ":" +
app.CurrentEnvironment.Port + "/" + app.CurrentEnvironment.AccountID + "/" + configSubscription.QueueName
}
queueArn := "arn:aws:sqs:" + app.CurrentEnvironment.Region + ":" + app.CurrentEnvironment.AccountID + ":" + configSubscription.QueueName
app.SyncQueues.Queues[configSubscription.QueueName] = &app.Queue{
Name: configSubscription.QueueName,
TimeoutSecs: app.CurrentEnvironment.QueueAttributeDefaults.VisibilityTimeout,
Arn: queueArn,
URL: queueUrl,
ReceiveWaitTimeSecs: app.CurrentEnvironment.QueueAttributeDefaults.ReceiveMessageWaitTimeSeconds,
IsFIFO: app.HasFIFOQueueName(configSubscription.QueueName),
if _, ok := app.SyncQueues.Queues[configSubscription.QueueName]; !ok {
queueUrl := "http://" + app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port +
"/" + app.CurrentEnvironment.AccountID + "/" + configSubscription.QueueName
if app.CurrentEnvironment.Region != "" {
queueUrl = "http://" + app.CurrentEnvironment.Region + "." + app.CurrentEnvironment.Host + ":" +
app.CurrentEnvironment.Port + "/" + app.CurrentEnvironment.AccountID + "/" + configSubscription.QueueName
}
queueArn := "arn:aws:sqs:" + app.CurrentEnvironment.Region + ":" + app.CurrentEnvironment.AccountID + ":" + configSubscription.QueueName
app.SyncQueues.Queues[configSubscription.QueueName] = &app.Queue{
Name: configSubscription.QueueName,
TimeoutSecs: app.CurrentEnvironment.QueueAttributeDefaults.VisibilityTimeout,
Arn: queueArn,
URL: queueUrl,
ReceiveWaitTimeSecs: app.CurrentEnvironment.QueueAttributeDefaults.ReceiveMessageWaitTimeSeconds,
IsFIFO: app.HasFIFOQueueName(configSubscription.QueueName),
}
}
qArn := app.SyncQueues.Queues[configSubscription.QueueName].Arn
newSub := &app.Subscription{EndPoint: qArn, Protocol: "sqs", TopicArn: topicArn, Raw: configSubscription.Raw}
Expand Down
4 changes: 4 additions & 0 deletions app/conf/goaws.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ Local: # Environment name that can be passed on the
- QueueName: local-queue4 # Queue name
Raw: true # Raw message delivery (true/false)
#FilterPolicy: '{"foo": ["bar"]}' # Subscription's FilterPolicy, json object as a string
- Name: sub-topic # Topic name - with some Subscriptions
Subscriptions: # List of Subscriptions to create for this topic (queues will be created as required)
- QueueName: local-queue1 # Queue name
Raw: true # Raw message delivery (true/false)
- Name: local-topic2 # Topic name - no Subscriptions
- Name: my_topic # Topic name - http subscription
Subscriptions:
Expand Down

0 comments on commit b6cccf8

Please sign in to comment.