From d79782d11770f575dfab4a36538e80aa1ef13cd0 Mon Sep 17 00:00:00 2001 From: "Rajasa Savant (RJ)" Date: Mon, 16 Dec 2019 18:31:20 -0800 Subject: [PATCH 1/2] Event Hub scalar expansion to work with Java and C# applications - Made changes to event hub scalar codes --- pkg/scalers/azure_eventhub.go | 12 ++++++++++-- pkg/scalers/azure_eventhub_scaler.go | 7 +++++++ pkg/scalers/azure_eventhub_test.go | 2 ++ 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/pkg/scalers/azure_eventhub.go b/pkg/scalers/azure_eventhub.go index 35bf24d9073..d91ae59d32d 100644 --- a/pkg/scalers/azure_eventhub.go +++ b/pkg/scalers/azure_eventhub.go @@ -29,7 +29,7 @@ type baseCheckpoint struct { // Checkpoint is the object eventhub processor stores in storage // for checkpointing event processors. This matches the object -// stored by the eventhub C# sdk +// stored by the eventhub C# sdk and Java sdk type Checkpoint struct { baseCheckpoint PartitionID string `json:"PartitionId"` @@ -86,7 +86,15 @@ func GetCheckpointFromBlobStorage(ctx context.Context, partitionID string, event } // TODO: add more ways to read from different types of storage and read checkpoints/leases written in different JSON formats - u, _ := url.Parse(fmt.Sprintf("%s://%s.blob.%s/azure-webjobs-eventhub/%s/%s/%s/%s", endpointProtocol, storageAccountName, endpointSuffix, eventHubNamespace, eventHubName, eventHubMetadata.eventHubConsumerGroup, partitionID)) + // Checking blob store for C# and Java applications + if eventHubMetadata.blobContainer != "" { + // URL format - ://.blob./// + u, _ := url.Parse(fmt.Sprintf("%s://%s.blob.%s/%s/%s/%s", endpointProtocol, storageAccountName, endpointSuffix, eventHubMetadata.blobContainer, eventHubMetadata.eventHubConsumerGroup, partitionID)) + } else { + // Checking blob store for Azure functions + // URL format - ://.blob./azure-webjobs-eventhub//// + u, _ := url.Parse(fmt.Sprintf("%s://%s.blob.%s/azure-webjobs-eventhub/%s/%s/%s/%s", endpointProtocol, storageAccountName, endpointSuffix, eventHubNamespace, eventHubName, eventHubMetadata.eventHubConsumerGroup, partitionID)) + } _, cred, err := GetStorageCredentials(eventHubMetadata.storageConnection) if err != nil { diff --git a/pkg/scalers/azure_eventhub_scaler.go b/pkg/scalers/azure_eventhub_scaler.go index 8252ecb0615..9e1de7fd70a 100644 --- a/pkg/scalers/azure_eventhub_scaler.go +++ b/pkg/scalers/azure_eventhub_scaler.go @@ -23,6 +23,7 @@ const ( defaultEventHubConsumerGroup = "$Default" defaultEventHubConnectionSetting = "EventHub" defaultStorageConnectionSetting = "AzureWebJobsStorage" + defaultBlobContainer = "" ) var eventhubLog = logf.Log.WithName("azure_eventhub_scaler") @@ -38,6 +39,7 @@ type EventHubMetadata struct { eventHubConsumerGroup string threshold int64 storageConnection string + blobContainer string } // NewAzureEventHubScaler creates a new scaler for eventHub @@ -105,6 +107,11 @@ func parseAzureEventHubMetadata(metadata, resolvedEnv map[string]string) (*Event meta.eventHubConsumerGroup = val } + meta.blobContainer = defaultBlobContainer + if val, ok := metadata["blobContainer"]; ok { + meta.blobContainer = val + } + return &meta, nil } diff --git a/pkg/scalers/azure_eventhub_test.go b/pkg/scalers/azure_eventhub_test.go index fe7d9c77db2..29d6ec0ddef 100644 --- a/pkg/scalers/azure_eventhub_test.go +++ b/pkg/scalers/azure_eventhub_test.go @@ -48,6 +48,8 @@ var parseEventHubMetadataDataset = []parseEventHubMetadataTestData{ {map[string]string{"storageConnection": storageConnectionSetting, "connection": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, false}, // missing unprocessed event threshold - should replace with default {map[string]string{"storageConnection": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connection": eventHubConnectionSetting}, false}, + // added blob container details + {map[string]string{"storageConnection": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connection": eventHubConnectionSetting, "blobContainer": testContainerName}, false}, } var testEventHubScaler = AzureEventHubScaler{ From bb5ef488b2a5d828f4d5b3eecfc99f517bf88cd9 Mon Sep 17 00:00:00 2001 From: rasavant Date: Tue, 17 Dec 2019 16:19:33 -0800 Subject: [PATCH 2/2] Fixed variable declaration and value assignment bug for Event Hub Scalar --- pkg/scalers/azure_eventhub.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/scalers/azure_eventhub.go b/pkg/scalers/azure_eventhub.go index d91ae59d32d..4c38367e079 100644 --- a/pkg/scalers/azure_eventhub.go +++ b/pkg/scalers/azure_eventhub.go @@ -86,14 +86,15 @@ func GetCheckpointFromBlobStorage(ctx context.Context, partitionID string, event } // TODO: add more ways to read from different types of storage and read checkpoints/leases written in different JSON formats + var u *url.URL // Checking blob store for C# and Java applications if eventHubMetadata.blobContainer != "" { // URL format - ://.blob./// - u, _ := url.Parse(fmt.Sprintf("%s://%s.blob.%s/%s/%s/%s", endpointProtocol, storageAccountName, endpointSuffix, eventHubMetadata.blobContainer, eventHubMetadata.eventHubConsumerGroup, partitionID)) + u, _ = url.Parse(fmt.Sprintf("%s://%s.blob.%s/%s/%s/%s", endpointProtocol, storageAccountName, endpointSuffix, eventHubMetadata.blobContainer, eventHubMetadata.eventHubConsumerGroup, partitionID)) } else { // Checking blob store for Azure functions // URL format - ://.blob./azure-webjobs-eventhub//// - u, _ := url.Parse(fmt.Sprintf("%s://%s.blob.%s/azure-webjobs-eventhub/%s/%s/%s/%s", endpointProtocol, storageAccountName, endpointSuffix, eventHubNamespace, eventHubName, eventHubMetadata.eventHubConsumerGroup, partitionID)) + u, _ = url.Parse(fmt.Sprintf("%s://%s.blob.%s/azure-webjobs-eventhub/%s/%s/%s/%s", endpointProtocol, storageAccountName, endpointSuffix, eventHubNamespace, eventHubName, eventHubMetadata.eventHubConsumerGroup, partitionID)) } _, cred, err := GetStorageCredentials(eventHubMetadata.storageConnection)