Skip to content

Commit

Permalink
add sqs attributes to message
Browse files Browse the repository at this point in the history
  • Loading branch information
rguo-wish authored and zengmin-wish committed Oct 29, 2021
1 parent ac81e2a commit a4f921d
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 3 deletions.
18 changes: 16 additions & 2 deletions backends/sqs/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,13 @@ func (s *Backend) GetMessages(ctx context.Context, in *rpc.GetMessagesRequest) (
return nil, err
}

var sqsAttributeNames []*string
if in.RequireQueueSystemAttributes {
sqsAttributeNames = []*string{aws.String("All")}
}

output, err := s.sqs.ReceiveMessageWithContext(ctx, &sqs.ReceiveMessageInput{
AttributeNames: sqsAttributeNames,
MessageAttributeNames: []*string{aws.String("All")},
QueueUrl: &url,
WaitTimeSeconds: &in.LongPollSeconds,
Expand All @@ -389,9 +395,17 @@ func (s *Backend) GetMessages(ctx context.Context, in *rpc.GetMessagesRequest) (
for key, valueStruct := range message.MessageAttributes {
attributes[key] = *valueStruct.StringValue
}

sqsAttributes := make(map[string]string)
if in.RequireQueueSystemAttributes {
for key, value := range message.Attributes {
sqsAttributes[key] = *value
}
}
messages = append(messages, &rpc.Message{
Data: *message.Body,
Attributes: attributes,
Data: *message.Body,
Attributes: attributes,
QueueSystemAttributes: sqsAttributes,
Receipt: &rpc.MessageReceipt{
Id: *message.ReceiptHandle,
},
Expand Down
4 changes: 3 additions & 1 deletion rpc/qproxy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ message FailedPublish {

message Message {
string Data = 1;
map<string, string> Attributes = 2;
map<string, string> Attributes = 2;
map<string, string> QueueSystemAttributes = 3;

MessageReceipt Receipt = 100;
}
Expand All @@ -170,6 +171,7 @@ message GetMessagesRequest {
int64 LongPollSeconds = 2;
int64 MaxMessages = 3;
int64 AckDeadlineSeconds = 4;
bool RequireQueueSystemAttributes = 5;
int64 RPCTimeout = 100;
}

Expand Down

0 comments on commit a4f921d

Please sign in to comment.