Skip to content

Commit

Permalink
Merge branch 'master' into feature/maximum-message-size
Browse files Browse the repository at this point in the history
  • Loading branch information
p4tin authored May 15, 2021
2 parents 46a9697 + 3993d6e commit 4792680
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 4 deletions.
1 change: 1 addition & 0 deletions app/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Environment struct {
AccountID string
LogToFile bool
LogFile string
EnableDuplicates bool
Topics []EnvTopic
Queues []EnvQueue
QueueAttributeDefaults EnvQueueAttributes
Expand Down
5 changes: 5 additions & 0 deletions app/conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io/ioutil"
"path/filepath"
"strings"
"time"

log "github.com/sirupsen/logrus"

Expand Down Expand Up @@ -102,6 +103,8 @@ func LoadYamlConfig(filename string, env string) []string {
ReceiveWaitTimeSecs: queue.ReceiveMessageWaitTimeSeconds,
MaximumMessageSize: queue.MaximumMessageSize,
IsFIFO: app.HasFIFOQueueName(queue.Name),
EnableDuplicates: app.CurrentEnvironment.EnableDuplicates,
Duplicates: make(map[string]time.Time),
}
}

Expand Down Expand Up @@ -165,6 +168,8 @@ func createSqsSubscription(configSubscription app.EnvSubsciption, topicArn strin
ReceiveWaitTimeSecs: app.CurrentEnvironment.QueueAttributeDefaults.ReceiveMessageWaitTimeSeconds,
MaximumMessageSize: app.CurrentEnvironment.QueueAttributeDefaults.MaximumMessageSize,
IsFIFO: app.HasFIFOQueueName(configSubscription.QueueName),
EnableDuplicates: app.CurrentEnvironment.EnableDuplicates,
Duplicates: make(map[string]time.Time),
}
}
qArn := app.SyncQueues.Queues[configSubscription.QueueName].Arn
Expand Down
1 change: 1 addition & 0 deletions app/conf/goaws.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Local: # Environment name that can be passed on the
AccountId: "100010001000"
LogToFile: false # Log messages (true/false)
LogFile: .st/goaws_messages.log # Log filename (for message logging
EnableDuplicates: false # Enable or not deduplication based on messageDeduplicationId
QueueAttributeDefaults: # default attributes for all queues
VisibilityTimeout: 30 # message visibility timeout
ReceiveMessageWaitTimeSeconds: 0 # receive message max wait time
Expand Down
44 changes: 40 additions & 4 deletions app/gosqs/gosqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ func PeriodicTasks(d time.Duration, quit <-chan struct{}) {
log.Debugf("Queue [%s] length [%d]", queue.Name, len(queue.Messages))
for i := 0; i < len(queue.Messages); i++ {
msg := &queue.Messages[i]

// Reset deduplication period
for dedupId, startTime := range queue.Duplicates {
if time.Now().After(startTime.Add(app.DeduplicationPeriod)) {
log.Debugf("deduplication period for message with deduplicationId [%s] expired", dedupId)
delete(queue.Duplicates, dedupId)
}
}

if msg.ReceiptHandle != "" {
if msg.VisibilityTimeout.Before(time.Now()) {
log.Debugf("Making message visible again %s", msg.ReceiptHandle)
Expand Down Expand Up @@ -127,6 +136,8 @@ func CreateQueue(w http.ResponseWriter, req *http.Request) {
ReceiveWaitTimeSecs: app.CurrentEnvironment.QueueAttributeDefaults.ReceiveMessageWaitTimeSeconds,
MaximumMessageSize: app.CurrentEnvironment.QueueAttributeDefaults.MaximumMessageSize,
IsFIFO: app.HasFIFOQueueName(queueName),
EnableDuplicates: app.CurrentEnvironment.EnableDuplicates,
Duplicates: make(map[string]time.Time),
}
if err := validateAndSetQueueAttributes(queue, req.Form); err != nil {
createErrorResponse(w, req, err.Error())
Expand All @@ -149,6 +160,7 @@ func SendMessage(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Type", "application/xml")
messageBody := req.FormValue("MessageBody")
messageGroupID := req.FormValue("MessageGroupId")
messageDeduplicationID := req.FormValue("MessageDeduplicationId")
messageAttributes := extractMessageAttributes(req, "")

queueUrl := getQueueFromPath(req.FormValue("QueueUrl"), req.URL.String())
Expand Down Expand Up @@ -184,14 +196,22 @@ func SendMessage(w http.ResponseWriter, req *http.Request) {
msg.MD5OfMessageBody = common.GetMD5Hash(messageBody)
msg.Uuid, _ = common.NewUUID()
msg.GroupID = messageGroupID
msg.DeduplicationID = messageDeduplicationID
msg.SentTime = time.Now()

app.SyncQueues.Lock()
fifoSeqNumber := ""
if app.SyncQueues.Queues[queueName].IsFIFO {
fifoSeqNumber = app.SyncQueues.Queues[queueName].NextSequenceNumber(messageGroupID)
}
app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages, msg)

if !app.SyncQueues.Queues[queueName].IsDuplicate(messageDeduplicationID) {
app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages, msg)
} else {
log.Debugf("Message with deduplicationId [%s] in queue [%s] is duplicate ", messageDeduplicationID, queueName)
}

app.SyncQueues.Queues[queueName].InitDuplicatation(messageDeduplicationID)
app.SyncQueues.Unlock()
log.Infof("%s: Queue: %s, Message: %s\n", time.Now().Format("2006-01-02 15:04:05"), queueName, msg.MessageBody)

Expand Down Expand Up @@ -276,8 +296,12 @@ func SendMessageBatch(w http.ResponseWriter, req *http.Request) {
sendEntries[keyIndex-1].MessageGroupId = v[0]
}

if keySegments[2] == "MessageAttribute" {
sendEntries[keyIndex-1].MessageAttributes = extractMessageAttributes(req, strings.Join(keySegments[0:2], "."))
if keySegments[2] == "MessageGroupId" {
sendEntries[keyIndex-1].MessageGroupId = v[0]
}

if keySegments[2] == "MessageDeduplicationId" {
sendEntries[keyIndex-1].MessageDeduplicationId = v[0]
}
}
}
Expand Down Expand Up @@ -310,14 +334,23 @@ func SendMessageBatch(w http.ResponseWriter, req *http.Request) {
}
msg.MD5OfMessageBody = common.GetMD5Hash(sendEntry.MessageBody)
msg.GroupID = sendEntry.MessageGroupId
msg.DeduplicationID = sendEntry.MessageDeduplicationId
msg.Uuid, _ = common.NewUUID()
msg.SentTime = time.Now()
app.SyncQueues.Lock()
fifoSeqNumber := ""
if app.SyncQueues.Queues[queueName].IsFIFO {
fifoSeqNumber = app.SyncQueues.Queues[queueName].NextSequenceNumber(sendEntry.MessageGroupId)
}
app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages, msg)

if !app.SyncQueues.Queues[queueName].IsDuplicate(sendEntry.MessageDeduplicationId) {
app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages, msg)
} else {
log.Debugf("Message with deduplicationId [%s] in queue [%s] is duplicate ", sendEntry.MessageDeduplicationId, queueName)
}

app.SyncQueues.Queues[queueName].InitDuplicatation(sendEntry.MessageDeduplicationId)

app.SyncQueues.Unlock()
se := app.SendMessageBatchResultEntry{
Id: sendEntry.Id,
Expand Down Expand Up @@ -612,6 +645,7 @@ func DeleteMessageBatch(w http.ResponseWriter, req *http.Request) {
log.Printf("FIFO Queue %s unlocking group %s:", queueName, msg.GroupID)
app.SyncQueues.Queues[queueName].UnlockGroup(msg.GroupID)
app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages[:i], app.SyncQueues.Queues[queueName].Messages[i+1:]...)
delete(app.SyncQueues.Queues[queueName].Duplicates, msg.DeduplicationID)

deleteEntry.Deleted = true
deletedEntry := app.DeleteMessageBatchResultEntry{Id: deleteEntry.Id}
Expand Down Expand Up @@ -676,6 +710,7 @@ func DeleteMessage(w http.ResponseWriter, req *http.Request) {
app.SyncQueues.Queues[queueName].UnlockGroup(msg.GroupID)
//Delete message from Q
app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages[:i], app.SyncQueues.Queues[queueName].Messages[i+1:]...)
delete(app.SyncQueues.Queues[queueName].Duplicates, msg.DeduplicationID)

app.SyncQueues.Unlock()
// Create, encode/xml and send response
Expand Down Expand Up @@ -741,6 +776,7 @@ func PurgeQueue(w http.ResponseWriter, req *http.Request) {
app.SyncQueues.Lock()
if _, ok := app.SyncQueues.Queues[queueName]; ok {
app.SyncQueues.Queues[queueName].Messages = nil
app.SyncQueues.Queues[queueName].Duplicates = make(map[string]time.Time)
respStruct := app.PurgeQueueResponse{"http://queue.amazonaws.com/doc/2012-11-05/", app.ResponseMetadata{RequestId: "00000000-0000-0000-0000-000000000000"}}
enc := xml.NewEncoder(w)
enc.Indent(" ", " ")
Expand Down
Loading

0 comments on commit 4792680

Please sign in to comment.