From 1a6a75a1ce73de95382511bc0695d0fc038ca9a2 Mon Sep 17 00:00:00 2001 From: Yaron Schneider Date: Mon, 18 Nov 2024 13:18:20 -0800 Subject: [PATCH] Enable in order processing of eventhubs messages (#3605) Signed-off-by: yaron2 --- bindings/azure/eventhubs/metadata.yaml | 8 ++++++ common/component/azure/eventhubs/eventhubs.go | 6 ++++- .../azure/eventhubs/eventhubs_test.go | 12 +++++++++ common/component/azure/eventhubs/metadata.go | 25 ++++++++++--------- pubsub/azure/eventhubs/metadata.yaml | 7 ++++++ 5 files changed, 45 insertions(+), 13 deletions(-) diff --git a/bindings/azure/eventhubs/metadata.yaml b/bindings/azure/eventhubs/metadata.yaml index 439fc5a9e6..841758b940 100644 --- a/bindings/azure/eventhubs/metadata.yaml +++ b/bindings/azure/eventhubs/metadata.yaml @@ -55,7 +55,15 @@ builtinAuthenticationProfiles: default: "false" example: "false" description: | + Allow management of the Event Hub namespace and storage account. + - name: enableInOrderMessageDelivery + type: bool + required: false + default: "false" + example: "false" + description: | + Enable in order processing of messages within a partition. - name: resourceGroupName type: string required: false diff --git a/common/component/azure/eventhubs/eventhubs.go b/common/component/azure/eventhubs/eventhubs.go index f5724e891f..e5b4cfd6c0 100644 --- a/common/component/azure/eventhubs/eventhubs.go +++ b/common/component/azure/eventhubs/eventhubs.go @@ -393,7 +393,11 @@ func (aeh *AzureEventHubs) processEvents(subscribeCtx context.Context, partition if len(events) != 0 { // Handle received message - go aeh.handleAsync(subscribeCtx, config.Topic, events, config.Handler) + if aeh.metadata.EnableInOrderMessageDelivery { + aeh.handleAsync(subscribeCtx, config.Topic, events, config.Handler) + } else { + go aeh.handleAsync(subscribeCtx, config.Topic, events, config.Handler) + } // Checkpointing disabled for CheckPointFrequencyPerPartition == 0 if config.CheckPointFrequencyPerPartition > 0 { diff --git a/common/component/azure/eventhubs/eventhubs_test.go b/common/component/azure/eventhubs/eventhubs_test.go index bf63c19b6b..8381c07743 100644 --- a/common/component/azure/eventhubs/eventhubs_test.go +++ b/common/component/azure/eventhubs/eventhubs_test.go @@ -72,6 +72,18 @@ func TestParseEventHubsMetadata(t *testing.T) { require.Error(t, err) require.ErrorContains(t, err, "one of connectionString or eventHubNamespace is required") }) + + t.Run("test in order delivery", func(t *testing.T) { + metadata := map[string]string{ + "enableInOrderMessageDelivery": "true", + "connectionString": "fake", + } + + m, err := parseEventHubsMetadata(metadata, false, testLogger) + + require.NoError(t, err) + require.True(t, m.EnableInOrderMessageDelivery) + }) } func TestConstructConnectionStringFromTopic(t *testing.T) { diff --git a/common/component/azure/eventhubs/metadata.go b/common/component/azure/eventhubs/metadata.go index b5e94e114e..00ed07fa7d 100644 --- a/common/component/azure/eventhubs/metadata.go +++ b/common/component/azure/eventhubs/metadata.go @@ -26,18 +26,19 @@ import ( ) type AzureEventHubsMetadata struct { - ConnectionString string `json:"connectionString" mapstructure:"connectionString"` - EventHubNamespace string `json:"eventHubNamespace" mapstructure:"eventHubNamespace"` - ConsumerID string `json:"consumerID" mapstructure:"consumerID"` - StorageConnectionString string `json:"storageConnectionString" mapstructure:"storageConnectionString"` - StorageAccountName string `json:"storageAccountName" mapstructure:"storageAccountName"` - StorageAccountKey string `json:"storageAccountKey" mapstructure:"storageAccountKey"` - StorageContainerName string `json:"storageContainerName" mapstructure:"storageContainerName"` - EnableEntityManagement bool `json:"enableEntityManagement,string" mapstructure:"enableEntityManagement"` - MessageRetentionInDays int32 `json:"messageRetentionInDays,string" mapstructure:"messageRetentionInDays"` - PartitionCount int32 `json:"partitionCount,string" mapstructure:"partitionCount"` - SubscriptionID string `json:"subscriptionID" mapstructure:"subscriptionID"` - ResourceGroupName string `json:"resourceGroupName" mapstructure:"resourceGroupName"` + ConnectionString string `json:"connectionString" mapstructure:"connectionString"` + EventHubNamespace string `json:"eventHubNamespace" mapstructure:"eventHubNamespace"` + ConsumerID string `json:"consumerID" mapstructure:"consumerID"` + StorageConnectionString string `json:"storageConnectionString" mapstructure:"storageConnectionString"` + StorageAccountName string `json:"storageAccountName" mapstructure:"storageAccountName"` + StorageAccountKey string `json:"storageAccountKey" mapstructure:"storageAccountKey"` + StorageContainerName string `json:"storageContainerName" mapstructure:"storageContainerName"` + EnableEntityManagement bool `json:"enableEntityManagement,string" mapstructure:"enableEntityManagement"` + MessageRetentionInDays int32 `json:"messageRetentionInDays,string" mapstructure:"messageRetentionInDays"` + PartitionCount int32 `json:"partitionCount,string" mapstructure:"partitionCount"` + SubscriptionID string `json:"subscriptionID" mapstructure:"subscriptionID"` + ResourceGroupName string `json:"resourceGroupName" mapstructure:"resourceGroupName"` + EnableInOrderMessageDelivery bool `json:"enableInOrderMessageDelivery,string" mapstructure:"enableInOrderMessageDelivery"` // Binding only EventHub string `json:"eventHub" mapstructure:"eventHub" mdonly:"bindings"` diff --git a/pubsub/azure/eventhubs/metadata.yaml b/pubsub/azure/eventhubs/metadata.yaml index 768d472252..57c73c721c 100644 --- a/pubsub/azure/eventhubs/metadata.yaml +++ b/pubsub/azure/eventhubs/metadata.yaml @@ -35,6 +35,13 @@ builtinAuthenticationProfiles: example: "false" description: | Allow management of the Event Hub namespace and storage account. + - name: enableInOrderMessageDelivery + type: bool + required: false + default: "false" + example: "false" + description: | + Enable in order processing of messages within a partition. # The following four properties are needed only if enableEntityManagement is set to true - name: resourceGroupName