diff --git a/pkg/scalers/azure_eventhub.go b/pkg/scalers/azure_eventhub.go index 35bf24d9073..4c38367e079 100644 --- a/pkg/scalers/azure_eventhub.go +++ b/pkg/scalers/azure_eventhub.go @@ -29,7 +29,7 @@ type baseCheckpoint struct { // Checkpoint is the object eventhub processor stores in storage // for checkpointing event processors. This matches the object -// stored by the eventhub C# sdk +// stored by the eventhub C# sdk and Java sdk type Checkpoint struct { baseCheckpoint PartitionID string `json:"PartitionId"` @@ -86,7 +86,16 @@ func GetCheckpointFromBlobStorage(ctx context.Context, partitionID string, event } // TODO: add more ways to read from different types of storage and read checkpoints/leases written in different JSON formats - u, _ := url.Parse(fmt.Sprintf("%s://%s.blob.%s/azure-webjobs-eventhub/%s/%s/%s/%s", endpointProtocol, storageAccountName, endpointSuffix, eventHubNamespace, eventHubName, eventHubMetadata.eventHubConsumerGroup, partitionID)) + var u *url.URL + // Checking blob store for C# and Java applications + if eventHubMetadata.blobContainer != "" { + // URL format - ://.blob./// + u, _ = url.Parse(fmt.Sprintf("%s://%s.blob.%s/%s/%s/%s", endpointProtocol, storageAccountName, endpointSuffix, eventHubMetadata.blobContainer, eventHubMetadata.eventHubConsumerGroup, partitionID)) + } else { + // Checking blob store for Azure functions + // URL format - ://.blob./azure-webjobs-eventhub//// + u, _ = url.Parse(fmt.Sprintf("%s://%s.blob.%s/azure-webjobs-eventhub/%s/%s/%s/%s", endpointProtocol, storageAccountName, endpointSuffix, eventHubNamespace, eventHubName, eventHubMetadata.eventHubConsumerGroup, partitionID)) + } _, cred, err := GetStorageCredentials(eventHubMetadata.storageConnection) if err != nil { diff --git a/pkg/scalers/azure_eventhub_scaler.go b/pkg/scalers/azure_eventhub_scaler.go index 8252ecb0615..9e1de7fd70a 100644 --- a/pkg/scalers/azure_eventhub_scaler.go +++ b/pkg/scalers/azure_eventhub_scaler.go @@ -23,6 +23,7 @@ const ( defaultEventHubConsumerGroup = "$Default" defaultEventHubConnectionSetting = "EventHub" defaultStorageConnectionSetting = "AzureWebJobsStorage" + defaultBlobContainer = "" ) var eventhubLog = logf.Log.WithName("azure_eventhub_scaler") @@ -38,6 +39,7 @@ type EventHubMetadata struct { eventHubConsumerGroup string threshold int64 storageConnection string + blobContainer string } // NewAzureEventHubScaler creates a new scaler for eventHub @@ -105,6 +107,11 @@ func parseAzureEventHubMetadata(metadata, resolvedEnv map[string]string) (*Event meta.eventHubConsumerGroup = val } + meta.blobContainer = defaultBlobContainer + if val, ok := metadata["blobContainer"]; ok { + meta.blobContainer = val + } + return &meta, nil } diff --git a/pkg/scalers/azure_eventhub_test.go b/pkg/scalers/azure_eventhub_test.go index fe7d9c77db2..29d6ec0ddef 100644 --- a/pkg/scalers/azure_eventhub_test.go +++ b/pkg/scalers/azure_eventhub_test.go @@ -48,6 +48,8 @@ var parseEventHubMetadataDataset = []parseEventHubMetadataTestData{ {map[string]string{"storageConnection": storageConnectionSetting, "connection": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, false}, // missing unprocessed event threshold - should replace with default {map[string]string{"storageConnection": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connection": eventHubConnectionSetting}, false}, + // added blob container details + {map[string]string{"storageConnection": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connection": eventHubConnectionSetting, "blobContainer": testContainerName}, false}, } var testEventHubScaler = AzureEventHubScaler{