Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event Hub scalar expansion to work with Java and C# applications #517

Merged
merged 2 commits into from
Dec 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions pkg/scalers/azure_eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type baseCheckpoint struct {

// Checkpoint is the object eventhub processor stores in storage
// for checkpointing event processors. This matches the object
// stored by the eventhub C# sdk
// stored by the eventhub C# sdk and Java sdk
type Checkpoint struct {
baseCheckpoint
PartitionID string `json:"PartitionId"`
Expand Down Expand Up @@ -86,7 +86,16 @@ func GetCheckpointFromBlobStorage(ctx context.Context, partitionID string, event
}

// TODO: add more ways to read from different types of storage and read checkpoints/leases written in different JSON formats
u, _ := url.Parse(fmt.Sprintf("%s://%s.blob.%s/azure-webjobs-eventhub/%s/%s/%s/%s", endpointProtocol, storageAccountName, endpointSuffix, eventHubNamespace, eventHubName, eventHubMetadata.eventHubConsumerGroup, partitionID))
var u *url.URL
// Checking blob store for C# and Java applications
if eventHubMetadata.blobContainer != "" {
// URL format - <endpointProtocol>://<storageAccountName>.blob.<endpointSuffix>/<blobContainer>/<eventHubConsumerGroup>/<partitionID>
u, _ = url.Parse(fmt.Sprintf("%s://%s.blob.%s/%s/%s/%s", endpointProtocol, storageAccountName, endpointSuffix, eventHubMetadata.blobContainer, eventHubMetadata.eventHubConsumerGroup, partitionID))
} else {
// Checking blob store for Azure functions
// URL format - <endpointProtocol>://<storageAccountName>.blob.<endpointSuffix>/azure-webjobs-eventhub/<eventHubNamespace>/<eventHubName>/<eventHubConsumerGroup>/<partitionID>
u, _ = url.Parse(fmt.Sprintf("%s://%s.blob.%s/azure-webjobs-eventhub/%s/%s/%s/%s", endpointProtocol, storageAccountName, endpointSuffix, eventHubNamespace, eventHubName, eventHubMetadata.eventHubConsumerGroup, partitionID))
}

_, cred, err := GetStorageCredentials(eventHubMetadata.storageConnection)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/scalers/azure_eventhub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
defaultEventHubConsumerGroup = "$Default"
defaultEventHubConnectionSetting = "EventHub"
defaultStorageConnectionSetting = "AzureWebJobsStorage"
defaultBlobContainer = ""
ahmelsayed marked this conversation as resolved.
Show resolved Hide resolved
)

var eventhubLog = logf.Log.WithName("azure_eventhub_scaler")
Expand All @@ -38,6 +39,7 @@ type EventHubMetadata struct {
eventHubConsumerGroup string
threshold int64
storageConnection string
blobContainer string
}

// NewAzureEventHubScaler creates a new scaler for eventHub
Expand Down Expand Up @@ -105,6 +107,11 @@ func parseAzureEventHubMetadata(metadata, resolvedEnv map[string]string) (*Event
meta.eventHubConsumerGroup = val
}

meta.blobContainer = defaultBlobContainer
if val, ok := metadata["blobContainer"]; ok {
meta.blobContainer = val
}

return &meta, nil
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/scalers/azure_eventhub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ var parseEventHubMetadataDataset = []parseEventHubMetadataTestData{
{map[string]string{"storageConnection": storageConnectionSetting, "connection": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, false},
// missing unprocessed event threshold - should replace with default
{map[string]string{"storageConnection": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connection": eventHubConnectionSetting}, false},
// added blob container details
{map[string]string{"storageConnection": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connection": eventHubConnectionSetting, "blobContainer": testContainerName}, false},
}

var testEventHubScaler = AzureEventHubScaler{
Expand Down