asyncsqs wraps around SQS client from aws-sdk-go-v2 to provide an async buffered client which batches send message and delete message requests to optimise AWS costs.
Messages can be scheduled to be sent and deleted. Requests will be dispatched when
- either batch becomes full
- or waiting period exhausts (if configured)
...whichever occurs earlier.
asyncsqs requires a Go version with modules support. If you're starting a new project, make sure to initialise a Go module:
$ mkdir ~/hellosqs
$ cd ~/hellosqs
$ go mod init github.com/my/hellosqs
And then add asyncsqs as a dependency to your existing or new project:
$ go get github.com/prashanthpai/asyncsqs
package main
import (
"context"
"log"
"strconv"
"github.com/prashanthpai/asyncsqs"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
)
func main() {
// Create a SQS client with appropriate credentials/IAM role, region etc.
awsCfg, err := config.LoadDefaultConfig(context.Background())
if err != nil {
log.Fatalf("config.LoadDefaultConfig() failed: %v", err)
}
sqsClient := sqs.NewFromConfig(awsCfg)
// Create a asyncsqs buffered client; you'd have one per SQS queue
client, err := asyncsqs.NewBufferedClient(asyncsqs.Config{
SQSClient: sqsClient,
QueueURL: "https://sqs.us-east-1.amazonaws.com/xxxxxxxxxxxx/qqqqqqqqqqqq",
OnSendMessageBatch: sendResponseHandler, // register callback function (recommended)
})
if err != nil {
log.Fatalf("asyncsqs.NewBufferedClient() failed: %v", err)
}
// important! Stop() ensures that requests in memory are gracefully
// flushed/dispatched and resources like goroutines are cleaned-up
defer client.Stop()
for i := 0; i < 100; i++ {
_ = client.SendMessageAsync(types.SendMessageBatchRequestEntry{
Id: aws.String(strconv.Itoa(i)),
MessageBody: aws.String("hello world"),
})
}
}
func sendResponseHandler(output *sqs.SendMessageBatchOutput, err error) {
if err != nil {
log.Printf("send returned error: %v", err)
}
for _, s := range output.Successful {
log.Printf("message send successful: msg id = %s", *s.Id)
}
for _, f := range output.Failed {
log.Printf("message send failed: msg id = %s", *f.Id)
}
}
While asyncsqs ensures batch size doesn't exceed SQS's limit of 10 messages, it does not validate size of the payload yet. SQS places following limits on batch request payload:
The maximum allowed individual message size and the maximum total payload size
(the sum of the individual lengths of all of the batched messages) are both
256 KB (262,144 bytes).
This translates to an average payload limit of around 25KB per individual message.