Skip to content

Commit

Permalink
Merge branch 'master' into feature/allow-redrive-policy-at-cfg
Browse files Browse the repository at this point in the history
  • Loading branch information
p4tin authored May 15, 2021
2 parents cccd057 + 0bee4ed commit 8ed2fac
Show file tree
Hide file tree
Showing 11 changed files with 429 additions and 20 deletions.
27 changes: 13 additions & 14 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
#FROM golang:alpine as builder
#
#WORKDIR /go/src/github.com/p4tin/goaws
#
#RUN apk add --update --repository https://dl-3.alpinelinux.org/alpine/edge/testing/ git
#RUN go get github.com/golang/dep/cmd/dep
#
#COPY Gopkg.lock Gopkg.toml app ./
#RUN dep ensure
#COPY . .
#
#RUN go build -o goaws_linux_amd64 app/cmd/goaws.goc
# build image
FROM golang:alpine as build

WORKDIR /go/src/github.com/p4tin/goaws

COPY . .
RUN CGO_ENABLED=0 go test ./app/...
RUN go build -o goaws app/cmd/goaws.go

# release image
FROM alpine

EXPOSE 4100
COPY --from=build /go/src/github.com/p4tin/goaws/goaws /goaws

COPY goaws /
COPY app/conf/goaws.yaml /conf/

EXPOSE 4100
ENTRYPOINT ["/goaws"]
3 changes: 3 additions & 0 deletions app/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ type EnvQueue struct {
Name string
ReceiveMessageWaitTimeSeconds int
RedrivePolicy string
MaximumMessageSize int
}

type EnvQueueAttributes struct {
VisibilityTimeout int
ReceiveMessageWaitTimeSeconds int
MaximumMessageSize int
}

type Environment struct {
Expand All @@ -35,6 +37,7 @@ type Environment struct {
AccountID string
LogToFile bool
LogFile string
EnableDuplicates bool
Topics []EnvTopic
Queues []EnvQueue
QueueAttributeDefaults EnvQueueAttributes
Expand Down
14 changes: 14 additions & 0 deletions app/conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path/filepath"
"strconv"
"strings"
"time"

log "github.com/sirupsen/logrus"

Expand Down Expand Up @@ -65,6 +66,10 @@ func LoadYamlConfig(filename string, env string) []string {
app.CurrentEnvironment.QueueAttributeDefaults.VisibilityTimeout = 30
}

if app.CurrentEnvironment.QueueAttributeDefaults.MaximumMessageSize == 0 {
app.CurrentEnvironment.QueueAttributeDefaults.MaximumMessageSize = 262144 // 256K
}

if app.CurrentEnvironment.AccountID == "" {
app.CurrentEnvironment.AccountID = "queue"
}
Expand All @@ -88,14 +93,20 @@ func LoadYamlConfig(filename string, env string) []string {
if queue.ReceiveMessageWaitTimeSeconds == 0 {
queue.ReceiveMessageWaitTimeSeconds = app.CurrentEnvironment.QueueAttributeDefaults.ReceiveMessageWaitTimeSeconds
}
if queue.MaximumMessageSize == 0 {
queue.MaximumMessageSize = app.CurrentEnvironment.QueueAttributeDefaults.MaximumMessageSize
}

app.SyncQueues.Queues[queue.Name] = &app.Queue{
Name: queue.Name,
TimeoutSecs: app.CurrentEnvironment.QueueAttributeDefaults.VisibilityTimeout,
Arn: queueArn,
URL: queueUrl,
ReceiveWaitTimeSecs: queue.ReceiveMessageWaitTimeSeconds,
MaximumMessageSize: queue.MaximumMessageSize,
IsFIFO: app.HasFIFOQueueName(queue.Name),
EnableDuplicates: app.CurrentEnvironment.EnableDuplicates,
Duplicates: make(map[string]time.Time),
}
}

Expand Down Expand Up @@ -170,7 +181,10 @@ func createSqsSubscription(configSubscription app.EnvSubsciption, topicArn strin
Arn: queueArn,
URL: queueUrl,
ReceiveWaitTimeSecs: app.CurrentEnvironment.QueueAttributeDefaults.ReceiveMessageWaitTimeSeconds,
MaximumMessageSize: app.CurrentEnvironment.QueueAttributeDefaults.MaximumMessageSize,
IsFIFO: app.HasFIFOQueueName(configSubscription.QueueName),
EnableDuplicates: app.CurrentEnvironment.EnableDuplicates,
Duplicates: make(map[string]time.Time),
}
}
qArn := app.SyncQueues.Queues[configSubscription.QueueName].Arn
Expand Down
8 changes: 8 additions & 0 deletions app/conf/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func TestConfig_QueueAttributes(t *testing.T) {
if timeoutSecs != 10 {
t.Errorf("Expected local-queue1 Queue to be configured with VisibilityTimeout: 10 but got %d\n", timeoutSecs)
}
maximumMessageSize := app.SyncQueues.Queues["local-queue1"].MaximumMessageSize
if maximumMessageSize != 1024 {
t.Errorf("Expected local-queue1 Queue to be configured with MaximumMessageSize: 1024 but got %d\n", maximumMessageSize)
}

if app.SyncQueues.Queues["local-queue1"].DeadLetterQueue != nil {
t.Errorf("Expected local-queue1 Queue to be configured without redrive policy\n")
Expand All @@ -92,6 +96,10 @@ func TestConfig_QueueAttributes(t *testing.T) {
if dlq.Name != "local-queue3-dlq" {
t.Errorf("Expected local-queue3 to have dead letter queue local-queue3-dlq but got %s\n", dlq.Name)
}
maximumMessageSize = app.SyncQueues.Queues["local-queue2"].MaximumMessageSize
if maximumMessageSize != 128 {
t.Errorf("Expected local-queue1 Queue to be configured with MaximumMessageSize: 128 but got %d\n", maximumMessageSize)
}
}

func TestConfig_NoQueueAttributeDefaults(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions app/conf/goaws.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ Local: # Environment name that can be passed on the
AccountId: "100010001000"
LogToFile: false # Log messages (true/false)
LogFile: .st/goaws_messages.log # Log filename (for message logging
EnableDuplicates: false # Enable or not deduplication based on messageDeduplicationId
QueueAttributeDefaults: # default attributes for all queues
VisibilityTimeout: 30 # message visibility timeout
ReceiveMessageWaitTimeSeconds: 0 # receive message max wait time
MaximumMessageSize: 262144 # maximum message size (bytes)
Queues: # List of queues to create at startup
- Name: local-queue1 # Queue name
- Name: local-queue2 # Queue name
Expand Down
2 changes: 2 additions & 0 deletions app/conf/mock-data/mock-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ Local: # Environment name that can be passed on the
QueueAttributeDefaults: # default attributes for all queues
VisibilityTimeout: 10 # message visibility timeout
ReceiveMessageWaitTimeSeconds: 10 # receive message max wait time
MaximumMessageSize: 1024 # maximum message size (bytes)
Queues: # List of queues to create at startup
- Name: local-queue1 # Queue name
- Name: local-queue2 # Queue name
ReceiveMessageWaitTimeSeconds: 20 # Queue receive message max wait time
MaximumMessageSize: 128 # Queue maximum message size (bytes)
- Name: local-queue3 # Queue name
RedrivePolicy: '{"maxReceiveCount": 100, "deadLetterTargetArn":"arn:aws:sqs:us-east-1:000000000000:local-queue3-dlq"}'
- Name: local-queue3-dlq # Queue name
Expand Down
7 changes: 6 additions & 1 deletion app/gosns/gosns.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,12 @@ func publishSQS(w http.ResponseWriter, req *http.Request,
} else {
msg.MessageAttributes = messageAttributes
msg.MD5OfMessageAttributes = common.HashAttributes(messageAttributes)
msg.MessageBody = []byte(messageBody)
m, err := extractMessageFromJSON(messageBody, subs.Protocol)
if (err == nil) {
msg.MessageBody = []byte(m)
} else {
msg.MessageBody = []byte(messageBody)
}
}

msg.MD5OfMessageBody = common.GetMD5Hash(messageBody)
Expand Down
56 changes: 51 additions & 5 deletions app/gosqs/gosqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"time"

log "github.com/sirupsen/logrus"

"github.com/gorilla/mux"
"github.com/p4tin/goaws/app"
"github.com/p4tin/goaws/app/common"
Expand Down Expand Up @@ -38,6 +38,8 @@ func init() {
app.SqsErrors["InvalidVisibilityTimeout"] = err8
err9 := app.SqsErrorType{HttpError: http.StatusBadRequest, Type: "MessageNotInFlight", Code: "AWS.SimpleQueueService.MessageNotInFlight", Message: "The message referred to isn't in flight."}
app.SqsErrors["MessageNotInFlight"] = err9
err10 := app.SqsErrorType{HttpError: http.StatusBadRequest, Type: "MessageTooBig", Code: "InvalidMessageContents", Message: "The message size exceeds the limit."}
app.SqsErrors["MessageTooBig"] = err10
app.SqsErrors[ErrInvalidParameterValue.Type] = *ErrInvalidParameterValue
app.SqsErrors[ErrInvalidAttributeValue.Type] = *ErrInvalidAttributeValue
}
Expand All @@ -54,6 +56,15 @@ func PeriodicTasks(d time.Duration, quit <-chan struct{}) {
log.Debugf("Queue [%s] length [%d]", queue.Name, len(queue.Messages))
for i := 0; i < len(queue.Messages); i++ {
msg := &queue.Messages[i]

// Reset deduplication period
for dedupId, startTime := range queue.Duplicates {
if time.Now().After(startTime.Add(app.DeduplicationPeriod)) {
log.Debugf("deduplication period for message with deduplicationId [%s] expired", dedupId)
delete(queue.Duplicates, dedupId)
}
}

if msg.ReceiptHandle != "" {
if msg.VisibilityTimeout.Before(time.Now()) {
log.Debugf("Making message visible again %s", msg.ReceiptHandle)
Expand Down Expand Up @@ -123,7 +134,10 @@ func CreateQueue(w http.ResponseWriter, req *http.Request) {
Arn: queueArn,
TimeoutSecs: app.CurrentEnvironment.QueueAttributeDefaults.VisibilityTimeout,
ReceiveWaitTimeSecs: app.CurrentEnvironment.QueueAttributeDefaults.ReceiveMessageWaitTimeSeconds,
MaximumMessageSize: app.CurrentEnvironment.QueueAttributeDefaults.MaximumMessageSize,
IsFIFO: app.HasFIFOQueueName(queueName),
EnableDuplicates: app.CurrentEnvironment.EnableDuplicates,
Duplicates: make(map[string]time.Time),
}
if err := validateAndSetQueueAttributes(queue, req.Form); err != nil {
createErrorResponse(w, req, err.Error())
Expand All @@ -146,6 +160,7 @@ func SendMessage(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Type", "application/xml")
messageBody := req.FormValue("MessageBody")
messageGroupID := req.FormValue("MessageGroupId")
messageDeduplicationID := req.FormValue("MessageDeduplicationId")
messageAttributes := extractMessageAttributes(req, "")

queueUrl := getQueueFromPath(req.FormValue("QueueUrl"), req.URL.String())
Expand All @@ -165,6 +180,13 @@ func SendMessage(w http.ResponseWriter, req *http.Request) {
return
}

if (app.SyncQueues.Queues[queueName].MaximumMessageSize > 0 &&
len(messageBody) > app.SyncQueues.Queues[queueName].MaximumMessageSize) {
// Message size is too big
createErrorResponse(w, req, "MessageTooBig")
return
}

log.Println("Putting Message in Queue:", queueName)
msg := app.Message{MessageBody: []byte(messageBody)}
if len(messageAttributes) > 0 {
Expand All @@ -174,14 +196,22 @@ func SendMessage(w http.ResponseWriter, req *http.Request) {
msg.MD5OfMessageBody = common.GetMD5Hash(messageBody)
msg.Uuid, _ = common.NewUUID()
msg.GroupID = messageGroupID
msg.DeduplicationID = messageDeduplicationID
msg.SentTime = time.Now()

app.SyncQueues.Lock()
fifoSeqNumber := ""
if app.SyncQueues.Queues[queueName].IsFIFO {
fifoSeqNumber = app.SyncQueues.Queues[queueName].NextSequenceNumber(messageGroupID)
}
app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages, msg)

if !app.SyncQueues.Queues[queueName].IsDuplicate(messageDeduplicationID) {
app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages, msg)
} else {
log.Debugf("Message with deduplicationId [%s] in queue [%s] is duplicate ", messageDeduplicationID, queueName)
}

app.SyncQueues.Queues[queueName].InitDuplicatation(messageDeduplicationID)
app.SyncQueues.Unlock()
log.Infof("%s: Queue: %s, Message: %s\n", time.Now().Format("2006-01-02 15:04:05"), queueName, msg.MessageBody)

Expand Down Expand Up @@ -266,8 +296,12 @@ func SendMessageBatch(w http.ResponseWriter, req *http.Request) {
sendEntries[keyIndex-1].MessageGroupId = v[0]
}

if keySegments[2] == "MessageAttribute" {
sendEntries[keyIndex-1].MessageAttributes = extractMessageAttributes(req, strings.Join(keySegments[0:2], "."))
if keySegments[2] == "MessageGroupId" {
sendEntries[keyIndex-1].MessageGroupId = v[0]
}

if keySegments[2] == "MessageDeduplicationId" {
sendEntries[keyIndex-1].MessageDeduplicationId = v[0]
}
}
}
Expand Down Expand Up @@ -300,14 +334,23 @@ func SendMessageBatch(w http.ResponseWriter, req *http.Request) {
}
msg.MD5OfMessageBody = common.GetMD5Hash(sendEntry.MessageBody)
msg.GroupID = sendEntry.MessageGroupId
msg.DeduplicationID = sendEntry.MessageDeduplicationId
msg.Uuid, _ = common.NewUUID()
msg.SentTime = time.Now()
app.SyncQueues.Lock()
fifoSeqNumber := ""
if app.SyncQueues.Queues[queueName].IsFIFO {
fifoSeqNumber = app.SyncQueues.Queues[queueName].NextSequenceNumber(sendEntry.MessageGroupId)
}
app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages, msg)

if !app.SyncQueues.Queues[queueName].IsDuplicate(sendEntry.MessageDeduplicationId) {
app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages, msg)
} else {
log.Debugf("Message with deduplicationId [%s] in queue [%s] is duplicate ", sendEntry.MessageDeduplicationId, queueName)
}

app.SyncQueues.Queues[queueName].InitDuplicatation(sendEntry.MessageDeduplicationId)

app.SyncQueues.Unlock()
se := app.SendMessageBatchResultEntry{
Id: sendEntry.Id,
Expand Down Expand Up @@ -602,6 +645,7 @@ func DeleteMessageBatch(w http.ResponseWriter, req *http.Request) {
log.Printf("FIFO Queue %s unlocking group %s:", queueName, msg.GroupID)
app.SyncQueues.Queues[queueName].UnlockGroup(msg.GroupID)
app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages[:i], app.SyncQueues.Queues[queueName].Messages[i+1:]...)
delete(app.SyncQueues.Queues[queueName].Duplicates, msg.DeduplicationID)

deleteEntry.Deleted = true
deletedEntry := app.DeleteMessageBatchResultEntry{Id: deleteEntry.Id}
Expand Down Expand Up @@ -666,6 +710,7 @@ func DeleteMessage(w http.ResponseWriter, req *http.Request) {
app.SyncQueues.Queues[queueName].UnlockGroup(msg.GroupID)
//Delete message from Q
app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages[:i], app.SyncQueues.Queues[queueName].Messages[i+1:]...)
delete(app.SyncQueues.Queues[queueName].Duplicates, msg.DeduplicationID)

app.SyncQueues.Unlock()
// Create, encode/xml and send response
Expand Down Expand Up @@ -731,6 +776,7 @@ func PurgeQueue(w http.ResponseWriter, req *http.Request) {
app.SyncQueues.Lock()
if _, ok := app.SyncQueues.Queues[queueName]; ok {
app.SyncQueues.Queues[queueName].Messages = nil
app.SyncQueues.Queues[queueName].Duplicates = make(map[string]time.Time)
respStruct := app.PurgeQueueResponse{"http://queue.amazonaws.com/doc/2012-11-05/", app.ResponseMetadata{RequestId: "00000000-0000-0000-0000-000000000000"}}
enc := xml.NewEncoder(w)
enc.Indent(" ", " ")
Expand Down
Loading

0 comments on commit 8ed2fac

Please sign in to comment.