Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AWS' Simple Queue Service support for transport #1018

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 227 additions & 0 deletions transport/awssqs/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package awssqs
peterbourgon marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"encoding/json"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/transport"
)

// Consumer wraps an endpoint and provides a handler for SQS messages.
type Consumer struct {
0marq marked this conversation as resolved.
Show resolved Hide resolved
sqsClient sqsiface.SQSAPI
e endpoint.Endpoint
dec DecodeRequestFunc
enc EncodeResponseFunc
wantRep WantReplyFunc
queueURL string
before []ConsumerRequestFunc
after []ConsumerResponseFunc
errorEncoder ErrorEncoder
finalizer []ConsumerFinalizerFunc
errorHandler transport.ErrorHandler
}

// NewConsumer constructs a new Consumer, which provides a Consume method
// and message handlers that wrap the provided endpoint.
func NewConsumer(
sqsClient sqsiface.SQSAPI,
e endpoint.Endpoint,
dec DecodeRequestFunc,
enc EncodeResponseFunc,
queueURL string,
options ...ConsumerOption,
) *Consumer {
s := &Consumer{
sqsClient: sqsClient,
e: e,
dec: dec,
enc: enc,
wantRep: DoNotRespond,
queueURL: queueURL,
errorEncoder: DefaultErrorEncoder,
errorHandler: transport.NewLogErrorHandler(log.NewNopLogger()),
}
for _, option := range options {
option(s)
}
return s
}

// ConsumerOption sets an optional parameter for consumers.
type ConsumerOption func(*Consumer)

// ConsumerBefore functions are executed on the producer request object before the
// request is decoded.
func ConsumerBefore(before ...ConsumerRequestFunc) ConsumerOption {
return func(c *Consumer) { c.before = append(c.before, before...) }
}

// ConsumerAfter functions are executed on the consumer reply after the
// endpoint is invoked, but before anything is published to the reply.
func ConsumerAfter(after ...ConsumerResponseFunc) ConsumerOption {
return func(c *Consumer) { c.after = append(c.after, after...) }
}

// ConsumerErrorEncoder is used to encode errors to the consumer reply
// whenever they're encountered in the processing of a request. Clients can
// use this to provide custom error formatting. By default,
// errors will be published with the DefaultErrorEncoder.
func ConsumerErrorEncoder(ee ErrorEncoder) ConsumerOption {
return func(c *Consumer) { c.errorEncoder = ee }
}

// ConsumerWantReplyFunc overrides the default value for the consumer's
// wantRep field.
func ConsumerWantReplyFunc(replyFunc WantReplyFunc) ConsumerOption {
return func(c *Consumer) { c.wantRep = replyFunc }
}

// ConsumerErrorHandler is used to handle non-terminal errors. By default, non-terminal errors
// are ignored. This is intended as a diagnostic measure. Finer-grained control
// of error handling, including logging in more detail, should be performed in a
// custom ConsumerErrorEncoder which has access to the context.
func ConsumerErrorHandler(errorHandler transport.ErrorHandler) ConsumerOption {
return func(c *Consumer) { c.errorHandler = errorHandler }
}

// ConsumerFinalizer is executed once all the received SQS messages are done being processed.
// By default, no finalizer is registered.
func ConsumerFinalizer(f ...ConsumerFinalizerFunc) ConsumerOption {
return func(c *Consumer) { c.finalizer = f }
}

// ConsumerDeleteMessageBefore returns a ConsumerOption that appends a function
// that delete the message from queue to the list of consumer's before functions.
func ConsumerDeleteMessageBefore() ConsumerOption {
return func(c *Consumer) {
deleteBefore := func(ctx context.Context, cancel context.CancelFunc, msg *sqs.Message) context.Context {
if err := deleteMessage(ctx, c.sqsClient, c.queueURL, msg); err != nil {
c.errorHandler.Handle(ctx, err)
c.errorEncoder(ctx, err, msg, c.sqsClient)
cancel()
}
return ctx
}
c.before = append(c.before, deleteBefore)
}
}

// ConsumerDeleteMessageAfter returns a ConsumerOption that appends a function
// that delete a message from queue to the list of consumer's after functions.
func ConsumerDeleteMessageAfter() ConsumerOption {
return func(c *Consumer) {
deleteAfter := func(ctx context.Context, cancel context.CancelFunc, msg *sqs.Message, _ *sqs.SendMessageInput) context.Context {
if err := deleteMessage(ctx, c.sqsClient, c.queueURL, msg); err != nil {
c.errorHandler.Handle(ctx, err)
c.errorEncoder(ctx, err, msg, c.sqsClient)
cancel()
}
return ctx
}
c.after = append(c.after, deleteAfter)
}
}

// ServeMessage serves an SQS message.
func (c Consumer) ServeMessage(ctx context.Context, msg *sqs.Message) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this implement an interface expected of something in the AWS SQS package? How should callers use it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No not really.
It should be used as depicted by @xyluet in #1018 (comment)

Copy link
Member

@peterbourgon peterbourgon Oct 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is some conceptual disconnect here.

A Go kit service is comprised of 1 or more endpoints, and each endpoint is exposed via 1 or more transports. By convention every Go kit transport package provides a type that exposes a single endpoint with unique DecodeRequest and EncodeResponse functions. But that type should compose into a larger "unit" which represents an entire service.

For example, transport/http.Server type isn't an sttdlib http.Server itself but an http.Handler, and you're supposed to mount multiple transport/http.Server handlers in a single mux to represent your service. Or, transport/nats.Subscriber isn't it's own client and consumer of the NATS topic, instead it implements nats.MsgHandler so that it can be composed by the caller into a larger consumer that receives multiple message types and dispatches them to the appropriate MsgHandler.

Concretely: users of this package shouldn't have to run ServeMessage loops for every transport/awssqs.Consumer they create (i.e. every endpoint in their service). They should create one SQS client/consumer/whatever with 1 or more transport/awssqs.Consumer types, each of which is fed messages by the "outer" component appropriately. I don't know the architecture of the SQS client lib so I don't know if this is natively supported or would have to be provided in this package too.

Does this make sense?

Copy link

@sunjayaali sunjayaali Oct 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't implement anything. I just borrow from amqp transport.

func (s Subscriber) ServeDelivery(ch Channel) func(deliv *amqp.Delivery) {

SQS SDK provides almost the same way as RabbitMQ when receiving messages. RabbitMQ is using channel, and we need to for loop the channel and pass a Delivery. SQS has the same way, but we need to call ReceiveMessage in for loop and getting array of messages from receive message output and we iterate the messages and pass it to our transport.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any update on how you feel about this, knowing that this pattern is already used in amqp transport @peterbourgon ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just changed the ServeMessage method signature to return a function. Is this what both of you had in mind ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any update guys ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello !

What are your thoughts about the last changes I committed b673cbe ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xyluet @peterbourgon Any update on this ?

Copy link

@JasonTruter JasonTruter Mar 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is some conceptual disconnect here.

A Go kit service is comprised of 1 or more endpoints, and each endpoint is exposed via 1 or more transports. By convention every Go kit transport package provides a type that exposes a single endpoint with unique DecodeRequest and EncodeResponse functions. But that type should compose into a larger "unit" which represents an entire service.

For example, transport/http.Server type isn't an sttdlib http.Server itself but an http.Handler, and you're supposed to mount multiple transport/http.Server handlers in a single mux to represent your service. Or, transport/nats.Subscriber isn't it's own client and consumer of the NATS topic, instead it implements nats.MsgHandler so that it can be composed by the caller into a larger consumer that receives multiple message types and dispatches them to the appropriate MsgHandler.

Concretely: users of this package shouldn't have to run ServeMessage loops for every transport/awssqs.Consumer they create (i.e. every endpoint in their service). They should create one SQS client/consumer/whatever with 1 or more transport/awssqs.Consumer types, each of which is fed messages by the "outer" component appropriately. I don't know the architecture of the SQS client lib so I don't know if this is natively supported or would have to be provided in this package too.

Does this make sense?

@peterbourgon
Regarding your second point above, SQS is just a poll based queue.

  • Example SQS request that is direct from my service.
  •   queueURLOutput, err := sqsclient.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: &queue})
      if err != nil {
      	logger.Info().Msg("failed to get queue url")
      	os.Exit(1)
      }
      input := &sqs.ReceiveMessageInput{
      			QueueUrl:            queueURLOutput.QueueUrl,
      			WaitTimeSeconds:     aws.Int64(20), // long polling
      			MaxNumberOfMessages: aws.Int64(10),
      			VisibilityTimeout:   aws.Int64(2),
      		}
        // output contains a slice of messages
         output, err := sqsclient.ReceiveMessageWithContext(ctx, input)
    
    
    

This is the receive message api endpoint from SQS:

ReceiveMessage(input *ReceiveMessageInput) (*ReceiveMessageOutput, error)

SQS has this concept of long polling and short polling. Long polling would be a long lived connection that lives for a maximum of 20 seconds, allowing the poller to return a response only when there is a message available (cheaper due to less network calls). With short polling, a response is returned immediately but may have an empty response(may require multiple requests to check for a message on the queue).

I've seen that most of the supported transports return functions due to the nature of the way the client of those transports are built. Example:

  • http has ServerHTTP func which allows it to be used as a handler. func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request)
  • NATS has a MsgHandler which is a function that takes in a message as input. This gets returned as a function within the ServeMsg function func (s Subscriber) ServeMsg(nc *nats.Conn) func(msg *nats.Msg)
    sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(nc))

I think that the SQS implementation should have some type of SQS muxer that dispatches SQS messages to the relevant consumers based off some sort of matching much like the other transports in go kit.

The problem with SQS is that is has a large interface and we wouldn't want to satisfy all interface functions.

ctx, cancel := context.WithCancel(ctx)
defer cancel()

if len(c.finalizer) > 0 {
defer func() {
for _, f := range c.finalizer {
f(ctx, msg)
}
}()
}

for _, f := range c.before {
ctx = f(ctx, cancel, msg)
}

req, err := c.dec(ctx, msg)
if err != nil {
c.errorHandler.Handle(ctx, err)
c.errorEncoder(ctx, err, msg, c.sqsClient)
return err
}

response, err := c.e(ctx, req)
if err != nil {
c.errorHandler.Handle(ctx, err)
c.errorEncoder(ctx, err, msg, c.sqsClient)
return err
}

responseMsg := sqs.SendMessageInput{}
for _, f := range c.after {
ctx = f(ctx, cancel, msg, &responseMsg)
}

if !c.wantRep(ctx, msg) {
return nil
}

if err := c.enc(ctx, &responseMsg, response); err != nil {
c.errorHandler.Handle(ctx, err)
c.errorEncoder(ctx, err, msg, c.sqsClient)
return err
}

if _, err := c.sqsClient.SendMessageWithContext(ctx, &responseMsg); err != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will introduce ResponsePublisher which it will responsible to publish the response or just run no operation func. How about that?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see what that would improve over my implementation. If you think it really does please help me understand.
I have a WantReplyFunc that can be either based on a message attribute or simply return false or true all the time.
Depending on the output of this func, I proceed to the rest of the function which is in charge of encoding and sending a response or not.

I prefere my approach because I find it closer to gokit's approach (in NATS transport for example) where you have a dedicated encoding response function.

c.errorHandler.Handle(ctx, err)
c.errorEncoder(ctx, err, msg, c.sqsClient)
return err
}
return nil
}

// ErrorEncoder is responsible for encoding an error to the consumer's reply.
// Users are encouraged to use custom ErrorEncoders to encode errors to
// their replies, and will likely want to pass and check for their own error
// types.
type ErrorEncoder func(ctx context.Context, err error, req *sqs.Message, sqsClient sqsiface.SQSAPI)

// ConsumerFinalizerFunc can be used to perform work at the end of a request
// from a producer, after the response has been written to the producer. The
// principal intended use is for request logging.
// Can also be used to delete messages once fully proccessed.
type ConsumerFinalizerFunc func(ctx context.Context, msg *sqs.Message)

// WantReplyFunc encapsulates logic to check whether message awaits response or not
// for example check for a given message attribute value.
type WantReplyFunc func(context.Context, *sqs.Message) bool

// DefaultErrorEncoder simply ignores the message. It does not reply.
func DefaultErrorEncoder(context.Context, error, *sqs.Message, sqsiface.SQSAPI) {
}

func deleteMessage(ctx context.Context, sqsClient sqsiface.SQSAPI, queueURL string, msg *sqs.Message) error {
_, err := sqsClient.DeleteMessageWithContext(ctx, &sqs.DeleteMessageInput{
QueueUrl: &queueURL,
ReceiptHandle: msg.ReceiptHandle,
})
return err
}

// DoNotRespond is a WantReplyFunc and is the default value for consumer's wantRep field.
// It indicates that the message do not expect a response.
func DoNotRespond(context.Context, *sqs.Message) bool {
return false
}

// EncodeJSONResponse marshals response as json and loads it into an sqs.SendMessageInput MessageBody.
func EncodeJSONResponse(_ context.Context, input *sqs.SendMessageInput, response interface{}) error {
payload, err := json.Marshal(response)
if err != nil {
return err
}
input.MessageBody = aws.String(string(payload))
return nil
}
Loading