Skip to content

Commit

Permalink
pubsub/awssnssqs: Add support for setting FIFO message metadata (#3435)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartventer authored May 31, 2024
1 parent 0866b65 commit e891930
Show file tree
Hide file tree
Showing 104 changed files with 10,420 additions and 5,553 deletions.
68 changes: 68 additions & 0 deletions pubsub/awssnssqs/awssnssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,50 @@ func maybeEncodeBody(body []byte, opt BodyBase64Encoding) (string, bool) {
return string(body), false
}

// Defines values for Metadata keys used by the driver for setting message
// attributes on SNS ([sns.PublishBatchRequestEntry]/[snstypesv2.PublishBatchRequestEntry])
// and SQS ([sqs.SendMessageBatchRequestEntry]/[sqstypesv2.SendMessageBatchRequestEntry])
// messages.
//
// For example, to set a deduplication ID and message group ID on a message:
//
// import (
// "gocloud.dev/pubsub"
// "gocloud.dev/pubsub/awssnssqs"
// )
//
// message := pubsub.Message{
// Body: []byte("Hello, World!"),
// Metadata: map[string]string{
// awssnssqs.MetadataKeyDeduplicationID: "my-dedup-id",
// awssnssqs.MetadataKeyMessageGroupID: "my-group-id",
// },
// }
const (
MetadataKeyDeduplicationID = "DeduplicationId"
MetadataKeyMessageGroupID = "MessageGroupId"
)

// reviseSnsEntryAttributes sets attributes on a [sns.PublishBatchRequestEntry] based on [driver.Message.Metadata].
func reviseSnsEntryAttributes(dm *driver.Message, entry *sns.PublishBatchRequestEntry) {
if dedupID, ok := dm.Metadata[MetadataKeyDeduplicationID]; ok {
entry.MessageDeduplicationId = aws.String(dedupID)
}
if groupID, ok := dm.Metadata[MetadataKeyMessageGroupID]; ok {
entry.MessageGroupId = aws.String(groupID)
}
}

// reviseSnsV2EntryAttributes sets attributes on a [snstypesv2.PublishBatchRequestEntry] based on [driver.Message.Metadata].
func reviseSnsV2EntryAttributes(dm *driver.Message, entry *snstypesv2.PublishBatchRequestEntry) {
if dedupID, ok := dm.Metadata[MetadataKeyDeduplicationID]; ok {
entry.MessageDeduplicationId = aws.String(dedupID)
}
if groupID, ok := dm.Metadata[MetadataKeyMessageGroupID]; ok {
entry.MessageGroupId = aws.String(groupID)
}
}

// SendBatch implements driver.Topic.SendBatch.
func (t *snsTopic) SendBatch(ctx context.Context, dms []*driver.Message) error {
if t.useV2 {
Expand Down Expand Up @@ -483,6 +527,7 @@ func (t *snsTopic) SendBatch(ctx context.Context, dms []*driver.Message) error {
MessageAttributes: attrs,
Message: aws.String(body),
}
reviseSnsV2EntryAttributes(dm, entry)
if dm.BeforeSend != nil {
// A previous revision used the non-batch API PublishInput, which takes
// a *snsv2.PublishInput. For backwards compatibility for As, continue
Expand Down Expand Up @@ -581,6 +626,7 @@ func (t *snsTopic) SendBatch(ctx context.Context, dms []*driver.Message) error {
MessageAttributes: attrs,
Message: aws.String(body),
}
reviseSnsEntryAttributes(dm, entry)
if dm.BeforeSend != nil {
// A previous revision used the non-batch API PublishInput, which takes
// a *snsv2.PublishInput. For backwards compatibility for As, continue
Expand Down Expand Up @@ -741,6 +787,26 @@ func openSQSTopicV2(ctx context.Context, client *sqsv2.Client, qURL string, opts
}
}

// reviseSqsEntryAttributes sets attributes on a [sqs.SendMessageBatchRequestEntry] based on [driver.Message.Metadata].
func reviseSqsEntryAttributes(dm *driver.Message, entry *sqs.SendMessageBatchRequestEntry) {
if dedupID, ok := dm.Metadata[MetadataKeyDeduplicationID]; ok {
entry.MessageDeduplicationId = aws.String(dedupID)
}
if groupID, ok := dm.Metadata[MetadataKeyMessageGroupID]; ok {
entry.MessageGroupId = aws.String(groupID)
}
}

// reviseSqsV2EntryAttributes sets attributes on a [sqstypesv2.SendMessageBatchRequestEntry] based on [driver.Message.Metadata].
func reviseSqsV2EntryAttributes(dm *driver.Message, entry *sqstypesv2.SendMessageBatchRequestEntry) {
if dedupID, ok := dm.Metadata[MetadataKeyDeduplicationID]; ok {
entry.MessageDeduplicationId = aws.String(dedupID)
}
if groupID, ok := dm.Metadata[MetadataKeyMessageGroupID]; ok {
entry.MessageGroupId = aws.String(groupID)
}
}

// SendBatch implements driver.Topic.SendBatch.
func (t *sqsTopic) SendBatch(ctx context.Context, dms []*driver.Message) error {
if t.useV2 {
Expand Down Expand Up @@ -770,6 +836,7 @@ func (t *sqsTopic) SendBatch(ctx context.Context, dms []*driver.Message) error {
MessageAttributes: attrs,
MessageBody: aws.String(body),
}
reviseSqsV2EntryAttributes(dm, entry)
if dm.BeforeSend != nil {
asFunc := func(i interface{}) bool {
if p, ok := i.(**sqstypesv2.SendMessageBatchRequestEntry); ok {
Expand Down Expand Up @@ -836,6 +903,7 @@ func (t *sqsTopic) SendBatch(ctx context.Context, dms []*driver.Message) error {
MessageAttributes: attrs,
MessageBody: aws.String(body),
}
reviseSqsEntryAttributes(dm, entry)
req.Entries = append(req.Entries, entry)
if dm.BeforeSend != nil {
// A previous revision used the non-batch API SendMessage, which takes
Expand Down
Loading

0 comments on commit e891930

Please sign in to comment.