Skip to content

Commit

Permalink
honour DelaySeconds queue attribute and SendMessage parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
jrhy authored and dhumphreys01 committed Mar 4, 2023
1 parent 4728078 commit 16b1323
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 20 deletions.
20 changes: 13 additions & 7 deletions app/gosqs/gosqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func CreateQueue(w http.ResponseWriter, req *http.Request) {

func SendMessage(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Type", "application/xml")
req.ParseForm()
messageBody := req.FormValue("MessageBody")
messageGroupID := req.FormValue("MessageGroupId")
messageDeduplicationID := req.FormValue("MessageDeduplicationId")
Expand All @@ -180,13 +181,18 @@ func SendMessage(w http.ResponseWriter, req *http.Request) {
return
}

if (app.SyncQueues.Queues[queueName].MaximumMessageSize > 0 &&
len(messageBody) > app.SyncQueues.Queues[queueName].MaximumMessageSize) {
if app.SyncQueues.Queues[queueName].MaximumMessageSize > 0 &&
len(messageBody) > app.SyncQueues.Queues[queueName].MaximumMessageSize {
// Message size is too big
createErrorResponse(w, req, "MessageTooBig")
return
}

delaySecs := app.SyncQueues.Queues[queueName].DelaySecs
if mv := req.FormValue("DelaySeconds"); mv != "" {
delaySecs, _ = strconv.Atoi(mv)
}

log.Println("Putting Message in Queue:", queueName)
msg := app.Message{MessageBody: []byte(messageBody)}
if len(messageAttributes) > 0 {
Expand All @@ -198,6 +204,7 @@ func SendMessage(w http.ResponseWriter, req *http.Request) {
msg.GroupID = messageGroupID
msg.DeduplicationID = messageDeduplicationID
msg.SentTime = time.Now()
msg.DelaySecs = delaySecs

app.SyncQueues.Lock()
fifoSeqNumber := ""
Expand Down Expand Up @@ -272,7 +279,6 @@ func SendMessageBatch(w http.ResponseWriter, req *http.Request) {
return
}
keyIndex, err := strconv.Atoi(keySegments[1])

if err != nil {
createErrorResponse(w, req, "Error")
return
Expand Down Expand Up @@ -377,6 +383,7 @@ func SendMessageBatch(w http.ResponseWriter, req *http.Request) {

func ReceiveMessage(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Type", "application/xml")
req.ParseForm()

waitTimeSeconds := 0
wts := req.FormValue("WaitTimeSeconds")
Expand Down Expand Up @@ -504,8 +511,8 @@ func ReceiveMessage(w http.ResponseWriter, req *http.Request) {

func numberOfHiddenMessagesInQueue(queue app.Queue) int {
num := 0
for i := range queue.Messages {
if queue.Messages[i].ReceiptHandle != "" {
for _, m := range queue.Messages {
if m.ReceiptHandle != "" || m.DelaySecs > 0 && time.Now().Before(m.SentTime.Add(time.Duration(m.DelaySecs)*time.Second)) {
num++
}
}
Expand Down Expand Up @@ -612,7 +619,6 @@ func DeleteMessageBatch(w http.ResponseWriter, req *http.Request) {
keySegments := strings.Split(k, ".")
if keySegments[0] == "DeleteMessageBatchRequestEntry" {
keyIndex, err := strconv.Atoi(keySegments[1])

if err != nil {
createErrorResponse(w, req, "Error")
return
Expand Down Expand Up @@ -836,7 +842,7 @@ func GetQueueAttributes(w http.ResponseWriter, req *http.Request) {
attribs := make([]app.Attribute, 0, 0)
attr := app.Attribute{Name: "VisibilityTimeout", Value: strconv.Itoa(queue.TimeoutSecs)}
attribs = append(attribs, attr)
attr = app.Attribute{Name: "DelaySeconds", Value: "0"}
attr = app.Attribute{Name: "DelaySeconds", Value: strconv.Itoa(queue.DelaySecs)}
attribs = append(attribs, attr)
attr = app.Attribute{Name: "ReceiveMessageWaitTimeSeconds", Value: strconv.Itoa(queue.ReceiveWaitTimeSecs)}
attribs = append(attribs, attr)
Expand Down
193 changes: 185 additions & 8 deletions app/gosqs/gosqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func TestListQueues_POST_Success(t *testing.T) {
t.Errorf("handler returned unexpected body: got %v",
rr.Body.String())
}

}

func TestCreateQueuehandler_POST_CreateQueue(t *testing.T) {
Expand Down Expand Up @@ -133,12 +132,12 @@ func TestCreateQueuehandler_POST_CreateQueue(t *testing.T) {
rr.Body.String(), expected)
}
expectedQueue := &app.Queue{
Name: queueName,
URL: "http://://" + queueName,
Arn: "arn:aws:sqs:::" + queueName,
TimeoutSecs: 60,
Name: queueName,
URL: "http://://" + queueName,
Arn: "arn:aws:sqs:::" + queueName,
TimeoutSecs: 60,
MaximumMessageSize: 2048,
Duplicates: make(map[string]time.Time),
Duplicates: make(map[string]time.Time),
}
actualQueue := app.SyncQueues.Queues[queueName]
if !reflect.DeepEqual(expectedQueue, actualQueue) {
Expand Down Expand Up @@ -186,7 +185,7 @@ func TestCreateFIFOQueuehandler_POST_CreateQueue(t *testing.T) {
Arn: "arn:aws:sqs:::" + queueName,
TimeoutSecs: 60,
IsFIFO: true,
Duplicates: make(map[string]time.Time),
Duplicates: make(map[string]time.Time),
}
actualQueue := app.SyncQueues.Queues[queueName]
if !reflect.DeepEqual(expectedQueue, actualQueue) {
Expand Down Expand Up @@ -1018,7 +1017,6 @@ func TestDeadLetterQueue(t *testing.T) {
if len(deadLetterQueue.Messages) == 0 {
t.Fatal("expected a message")
}

}

func TestReceiveMessageWaitTimeEnforced(t *testing.T) {
Expand Down Expand Up @@ -1122,6 +1120,7 @@ func TestReceiveMessageWaitTimeEnforced(t *testing.T) {
t.Fatal("handler waited when message was available, expected not to wait")
}
}

func TestReceiveMessage_CanceledByClient(t *testing.T) {
// create a queue
req, err := http.NewRequest("POST", "/", nil)
Expand Down Expand Up @@ -1313,7 +1312,96 @@ func TestReceiveMessage_WithConcurrentDeleteQueue(t *testing.T) {
if timedout := waitTimeout(&wg, 2*time.Second); timedout {
t.Errorf("concurrent handlers timeout, expecting both to return within timeout")
}
}

func TestReceiveMessageDelaySeconds(t *testing.T) {
// create a queue
req, err := http.NewRequest("POST", "/", nil)
if err != nil {
t.Fatal(err)
}
form := url.Values{}
form.Add("Action", "CreateQueue")
form.Add("QueueName", "delay-seconds-queue")
form.Add("Attribute.1.Name", "DelaySeconds")
form.Add("Attribute.1.Value", "2")
form.Add("Version", "2012-11-05")
req.PostForm = form

rr := httptest.NewRecorder()
http.HandlerFunc(CreateQueue).ServeHTTP(rr, req)

if status := rr.Code; status != http.StatusOK {
t.Errorf("handler returned wrong status code: got \n%v want %v",
status, http.StatusOK)
}

// send a message
req, err = http.NewRequest("POST", "/", nil)
if err != nil {
t.Fatal(err)
}
form = url.Values{}
form.Add("Action", "SendMessage")
form.Add("QueueUrl", "http://localhost:4100/queue/delay-seconds-queue")
form.Add("MessageBody", "1")
form.Add("Version", "2012-11-05")
req.PostForm = form
rr = httptest.NewRecorder()
http.HandlerFunc(SendMessage).ServeHTTP(rr, req)
if status := rr.Code; status != http.StatusOK {
t.Errorf("handler returned wrong status code: got \n%v want %v",
status, http.StatusOK)
}

// receive message before delay is up
req, err = http.NewRequest("POST", "/", nil)
if err != nil {
t.Fatal(err)
}
form = url.Values{}
form.Add("Action", "ReceiveMessage")
form.Add("QueueUrl", "http://localhost:4100/queue/delay-seconds-queue")
form.Add("Version", "2012-11-05")
req.PostForm = form
rr = httptest.NewRecorder()
http.HandlerFunc(ReceiveMessage).ServeHTTP(rr, req)
if status := rr.Code; status != http.StatusOK {
t.Errorf("handler returned wrong status code: got \n%v want %v",
status, http.StatusOK)
}
if ok := strings.Contains(rr.Body.String(), "<Message>"); ok {
t.Fatal("handler should not return a message")
}

// receive message with wait should return after delay
req, err = http.NewRequest("POST", "/", nil)
if err != nil {
t.Fatal(err)
}
form = url.Values{}
form.Add("Action", "ReceiveMessage")
form.Add("QueueUrl", "http://localhost:4100/queue/delay-seconds-queue")
form.Add("WaitTimeSeconds", "10")
form.Add("Version", "2012-11-05")
req.PostForm = form
rr = httptest.NewRecorder()
start := time.Now()
http.HandlerFunc(ReceiveMessage).ServeHTTP(rr, req)
elapsed := time.Since(start)
if status := rr.Code; status != http.StatusOK {
t.Errorf("handler returned wrong status code: got \n%v want %v",
status, http.StatusOK)
}
if elapsed < 1*time.Second {
t.Errorf("handler didn't wait at all")
}
if ok := strings.Contains(rr.Body.String(), "<Message>"); !ok {
t.Errorf("handler should return a message")
}
if elapsed > 4*time.Second {
t.Errorf("handler didn't need to wait all WaitTimeSeconds=10, only DelaySeconds=2")
}
}

func TestSetQueueAttributes_POST_QueueNotFound(t *testing.T) {
Expand Down Expand Up @@ -1762,6 +1850,95 @@ func TestSendMessage_POST_DuplicatationEnabledOnFifoQueue(t *testing.T) {
}
}

func TestSendMessage_POST_DelaySeconds(t *testing.T) {
// create a queue
req, err := http.NewRequest("POST", "/", nil)
if err != nil {
t.Fatal(err)
}
form := url.Values{}
form.Add("Action", "CreateQueue")
form.Add("QueueName", "sendmessage-delay")
form.Add("Version", "2012-11-05")
req.PostForm = form

rr := httptest.NewRecorder()
http.HandlerFunc(CreateQueue).ServeHTTP(rr, req)

if status := rr.Code; status != http.StatusOK {
t.Errorf("handler returned wrong status code: got \n%v want %v",
status, http.StatusOK)
}

// send a message
req, err = http.NewRequest("POST", "/", nil)
if err != nil {
t.Fatal(err)
}
form = url.Values{}
form.Add("Action", "SendMessage")
form.Add("QueueUrl", "http://localhost:4100/queue/sendmessage-delay")
form.Add("MessageBody", "1")
form.Add("DelaySeconds", "2")
form.Add("Version", "2012-11-05")
req.PostForm = form
rr = httptest.NewRecorder()
http.HandlerFunc(SendMessage).ServeHTTP(rr, req)
if status := rr.Code; status != http.StatusOK {
t.Errorf("handler returned wrong status code: got \n%v want %v",
status, http.StatusOK)
}

// receive message before delay is up
req, err = http.NewRequest("POST", "/", nil)
if err != nil {
t.Fatal(err)
}
form = url.Values{}
form.Add("Action", "ReceiveMessage")
form.Add("QueueUrl", "http://localhost:4100/queue/sendmessage-delay")
form.Add("Version", "2012-11-05")
req.PostForm = form
rr = httptest.NewRecorder()
http.HandlerFunc(ReceiveMessage).ServeHTTP(rr, req)
if status := rr.Code; status != http.StatusOK {
t.Errorf("handler returned wrong status code: got \n%v want %v",
status, http.StatusOK)
}
if ok := strings.Contains(rr.Body.String(), "<Message>"); ok {
t.Fatal("handler should not return a message")
}

// receive message with wait should return after delay
req, err = http.NewRequest("POST", "/", nil)
if err != nil {
t.Fatal(err)
}
form = url.Values{}
form.Add("Action", "ReceiveMessage")
form.Add("QueueUrl", "http://localhost:4100/queue/sendmessage-delay")
form.Add("WaitTimeSeconds", "10")
form.Add("Version", "2012-11-05")
req.PostForm = form
rr = httptest.NewRecorder()
start := time.Now()
http.HandlerFunc(ReceiveMessage).ServeHTTP(rr, req)
elapsed := time.Since(start)
if status := rr.Code; status != http.StatusOK {
t.Errorf("handler returned wrong status code: got \n%v want %v",
status, http.StatusOK)
}
if elapsed < 1*time.Second {
t.Errorf("handler didn't wait at all")
}
if ok := strings.Contains(rr.Body.String(), "<Message>"); !ok {
t.Errorf("handler should return a message")
}
if elapsed > 4*time.Second {
t.Errorf("handler didn't need to wait all WaitTimeSeconds=10, only DelaySeconds=2")
}
}

// waitTimeout waits for the waitgroup for the specified max timeout.
// Returns true if waiting timed out.
// credits: https://stackoverflow.com/questions/32840687/timeout-for-waitgroup-wait
Expand Down
4 changes: 4 additions & 0 deletions app/gosqs/queue_attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func validateAndSetQueueAttributes(q *app.Queue, u url.Values) error {
q.DeadLetterQueue = deadLetterQueue
q.MaxReceiveCount = maxReceiveCount
}
delaySecs, _ := strconv.Atoi(attr["DelaySeconds"])
if delaySecs != 0 {
q.DelaySecs = delaySecs
}

return nil
}
Expand Down
3 changes: 2 additions & 1 deletion app/gosqs/queue_attributes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestApplyQueueAttributes(t *testing.T) {
q := &app.Queue{TimeoutSecs: 30}
u := url.Values{}
u.Add("Attribute.1.Name", "DelaySeconds")
u.Add("Attribute.1.Value", "20")
u.Add("Attribute.1.Value", "25")
u.Add("Attribute.2.Name", "VisibilityTimeout")
u.Add("Attribute.2.Value", "60")
u.Add("Attribute.3.Name", "Policy")
Expand All @@ -31,6 +31,7 @@ func TestApplyQueueAttributes(t *testing.T) {
expected := &app.Queue{
TimeoutSecs: 60,
ReceiveWaitTimeSecs: 20,
DelaySecs: 25,
MaxReceiveCount: 4,
DeadLetterQueue: deadLetterQueue,
}
Expand Down
12 changes: 8 additions & 4 deletions app/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package app
import (
"errors"
"fmt"
log "github.com/sirupsen/logrus"
"math/rand"
"strconv"
"strings"
"sync"
"time"

log "github.com/sirupsen/logrus"
)

type SqsErrorType struct {
Expand Down Expand Up @@ -37,7 +38,8 @@ type Message struct {
MessageAttributes map[string]MessageAttributeValue
GroupID string
DeduplicationID string
SentTime time.Time
SentTime time.Time
DelaySecs int
}

func (m *Message) IsReadyForReceipt() bool {
Expand All @@ -46,10 +48,11 @@ func (m *Message) IsReadyForReceipt() bool {
log.Error(err)
return true
}
return m.SentTime.Add(randomLatency).Before(time.Now())
showAt := m.SentTime.Add(randomLatency).Add(time.Duration(m.DelaySecs) * time.Second)
return showAt.Before(time.Now())
}

func getRandomLatency() (time.Duration, error){
func getRandomLatency() (time.Duration, error) {
min := CurrentEnvironment.RandomLatency.Min
max := CurrentEnvironment.RandomLatency.Max
if min == 0 && max == 0 {
Expand Down Expand Up @@ -81,6 +84,7 @@ type Queue struct {
Arn string
TimeoutSecs int
ReceiveWaitTimeSecs int
DelaySecs int
MaximumMessageSize int
Messages []Message
DeadLetterQueue *Queue
Expand Down

0 comments on commit 16b1323

Please sign in to comment.