Skip to content

Commit

Permalink
Event Hub Scaler: Remove or replace usages of Event Hub offsets (#5600)
Browse files Browse the repository at this point in the history
* Event Hub Scaler: Remove or replace usages of Event Hub offsets

Signed-off-by: Walker Crouse <walker.crouse@acuitybrands.com>

* fix sequence number field name

Signed-off-by: Walker Crouse <walker.crouse@acuitybrands.com>

* add CHANGELOG entry

Signed-off-by: Walker Crouse <walker.crouse@acuitybrands.com>

* add issue link to changelog entry

Signed-off-by: Walker Crouse <walker.crouse@acuitybrands.com>

* fix test that is based on an erroneous assumption

Signed-off-by: Walker Crouse <walker.crouse@acuitybrands.com>

* whitespace

Signed-off-by: Walker Crouse <walker.crouse@acuitybrands.com>

* remove baseCheckpoint

Signed-off-by: Walker Crouse <walker.crouse@acuitybrands.com>

---------

Signed-off-by: Walker Crouse <walker.crouse@acuitybrands.com>
Co-authored-by: Walker Crouse <walker.crouse@acuitybrands.com>
  • Loading branch information
windy1 and Walker Crouse authored Apr 7, 2024
1 parent 1bfb223 commit e7ff90a
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 102 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ Here is an overview of all new **experimental** features:
- **General**: Add GRPC Healthchecks ([#5590](https://github.com/kedacore/keda/issues/5590))
- **General**: Add OPENTELEMETRY flag in e2e test YAML ([#5375](https://github.com/kedacore/keda/issues/5375))
- **General**: Add support for cross tenant/cloud authentication when using Azure Workload Identity for TriggerAuthentication ([#5441](https://github.com/kedacore/keda/issues/5441))
- **Azure Event Hub Scaler**: Remove usage of checkpoint offsets to account for SDK checkpointing implementation changes ([#5574](https://github.com/kedacore/keda/issues/5574))
- **GCP Stackdriver Scaler**: Add missing parameters 'rate' and 'count' for GCP Stackdriver Scaler alignment ([#5633](https://github.com/kedacore/keda/issues/5633))
- **Metrics API Scaler**: Add support for various formats: json, xml, yaml, prometheus ([#2633](https://github.com/kedacore/keda/issues/2633))
- **MongoDB Scaler**: Add scheme field support srv record ([#5544](https://github.com/kedacore/keda/issues/5544))
Expand Down
30 changes: 4 additions & 26 deletions pkg/scalers/azure/azure_eventhub_checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,19 @@ import (
// goCheckpoint struct to adapt goSdk Checkpoint
type goCheckpoint struct {
Checkpoint struct {
SequenceNumber int64 `json:"sequenceNumber"`
Offset string `json:"offset"`
SequenceNumber int64 `json:"sequenceNumber"`
} `json:"checkpoint"`
PartitionID string `json:"partitionId"`
}

type baseCheckpoint struct {
Epoch int64 `json:"Epoch"`
Offset string `json:"Offset"`
Owner string `json:"Owner"`
Token string `json:"Token"`
}

// Checkpoint in a common format
type Checkpoint struct {
baseCheckpoint
PartitionID string `json:"PartitionId"`
SequenceNumber int64 `json:"SequenceNumber"`
}

// Older python sdk stores the checkpoint differently
type pythonCheckpoint struct {
baseCheckpoint
PartitionID string `json:"partition_id"`
SequenceNumber int64 `json:"sequence_number"`
}
Expand Down Expand Up @@ -92,8 +82,8 @@ type defaultCheckpointer struct {
containerName string
}

func NewCheckpoint(offset string, sequenceNumber int64) Checkpoint {
return Checkpoint{baseCheckpoint: baseCheckpoint{Offset: offset}, SequenceNumber: sequenceNumber}
func NewCheckpoint(sequenceNumber int64) Checkpoint {
return Checkpoint{SequenceNumber: sequenceNumber}
}

// GetCheckpointFromBlobStorage reads depending of the CheckpointStrategy the checkpoint from a azure storage
Expand Down Expand Up @@ -222,10 +212,7 @@ func newGoSdkCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {

return Checkpoint{
SequenceNumber: checkpoint.Checkpoint.SequenceNumber,
baseCheckpoint: baseCheckpoint{
Offset: checkpoint.Checkpoint.Offset,
},
PartitionID: checkpoint.PartitionID,
PartitionID: checkpoint.PartitionID,
}, nil
}

Expand Down Expand Up @@ -318,15 +305,6 @@ func getCheckpointFromStorageMetadata(get *azblob.DownloadResponse, partitionID
}
}

if offset, ok := metadata["offset"]; ok {
if !ok {
if offset, ok = metadata["Offset"]; !ok {
return Checkpoint{}, fmt.Errorf("offset on blob not found")
}
}
checkpoint.Offset = offset
}

return checkpoint, nil
}

Expand Down
57 changes: 10 additions & 47 deletions pkg/scalers/azure/azure_eventhub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,19 @@ func TestCheckpointFromBlobStorageAzureFunction(t *testing.T) {
}

partitionID := "0"
offset := "1001"
consumerGroup := "$Default1"

sequencenumber := int64(1)

containerName := "azure-webjobs-eventhub"
checkpointFormat := "{\"Offset\":\"%s\",\"SequenceNumber\":%d,\"PartitionId\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}"
checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID)
checkpointFormat := "{\"SequenceNumber\":%d,\"PartitionId\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}"
checkpoint := fmt.Sprintf(checkpointFormat, sequencenumber, partitionID)
urlPath := fmt.Sprintf("eventhubnamespace.servicebus.windows.net/hub/%s/", consumerGroup)

ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil)
assert.Equal(t, err, nil)

expectedCheckpoint := Checkpoint{
baseCheckpoint: baseCheckpoint{
Offset: offset,
},
PartitionID: partitionID,
SequenceNumber: sequencenumber,
}
Expand All @@ -54,8 +50,6 @@ func TestCheckpointFromBlobStorageAzureFunction(t *testing.T) {
}

check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0")
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
}

Expand All @@ -65,23 +59,19 @@ func TestCheckpointFromBlobStorageDefault(t *testing.T) {
}

partitionID := "1"
offset := "1005"
consumerGroup := "$Default2"

sequencenumber := int64(1)

containerName := "defaultcontainer"
checkpointFormat := "{\"Offset\":\"%s\",\"SequenceNumber\":%d,\"PartitionId\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}"
checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID)
checkpointFormat := "{\"SequenceNumber\":%d,\"PartitionId\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}"
checkpoint := fmt.Sprintf(checkpointFormat, sequencenumber, partitionID)
urlPath := fmt.Sprintf("%s/", consumerGroup)

ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil)
assert.Equal(t, err, nil)

expectedCheckpoint := Checkpoint{
baseCheckpoint: baseCheckpoint{
Offset: offset,
},
PartitionID: partitionID,
SequenceNumber: sequencenumber,
}
Expand All @@ -95,8 +85,6 @@ func TestCheckpointFromBlobStorageDefault(t *testing.T) {
}

check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID)
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
}

Expand All @@ -106,23 +94,19 @@ func TestCheckpointFromBlobStorageDefaultDeprecatedPythonCheckpoint(t *testing.T
}

partitionID := "2"
offset := "1006"
consumerGroup := "$Default3"

sequencenumber := int64(1)

containerName := "defaultcontainerpython"
checkpointFormat := "{\"Offset\":\"%s\",\"sequence_number\":%d,\"partition_id\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}"
checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID)
checkpointFormat := "{\"sequence_number\":%d,\"partition_id\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}"
checkpoint := fmt.Sprintf(checkpointFormat, sequencenumber, partitionID)
urlPath := fmt.Sprintf("%s/", consumerGroup)

ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil)
assert.Equal(t, err, nil)

expectedCheckpoint := Checkpoint{
baseCheckpoint: baseCheckpoint{
Offset: offset,
},
PartitionID: partitionID,
SequenceNumber: sequencenumber,
}
Expand All @@ -136,8 +120,6 @@ func TestCheckpointFromBlobStorageDefaultDeprecatedPythonCheckpoint(t *testing.T
}

check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID)
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
}

Expand All @@ -147,13 +129,11 @@ func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) {
}

partitionID := "4"
offset := "1002"
consumerGroup := "$default"

sequencenumber := int64(1)

metadata := map[string]string{
"offset": offset,
"sequencenumber": strconv.FormatInt(sequencenumber, 10),
}

Expand All @@ -164,9 +144,6 @@ func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) {
assert.Equal(t, err, nil)

expectedCheckpoint := Checkpoint{
baseCheckpoint: baseCheckpoint{
Offset: offset,
},
PartitionID: partitionID,
SequenceNumber: sequencenumber,
}
Expand All @@ -181,8 +158,6 @@ func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) {
}

check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID)
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
}

Expand All @@ -192,23 +167,19 @@ func TestCheckpointFromBlobStorageGoSdk(t *testing.T) {
}

partitionID := "0"
offset := "1003"

sequencenumber := int64(1)

containerName := "gosdkcontainer"
checkpointFormat := "{\"partitionID\":\"%s\",\"epoch\":0,\"owner\":\"\",\"checkpoint\":{\"offset\":\"%s\",\"sequenceNumber\":%d,\"enqueueTime\":\"\"},\"state\":\"\",\"token\":\"\"}"
checkpoint := fmt.Sprintf(checkpointFormat, partitionID, offset, sequencenumber)
checkpointFormat := "{\"partitionID\":\"%s\",\"epoch\":0,\"owner\":\"\",\"checkpoint\":{\"sequenceNumber\":%d,\"enqueueTime\":\"\"},\"state\":\"\",\"token\":\"\"}"
checkpoint := fmt.Sprintf(checkpointFormat, partitionID, sequencenumber)

urlPath := ""

ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil)
assert.Equal(t, err, nil)

expectedCheckpoint := Checkpoint{
baseCheckpoint: baseCheckpoint{
Offset: offset,
},
PartitionID: partitionID,
SequenceNumber: sequencenumber,
}
Expand All @@ -222,8 +193,6 @@ func TestCheckpointFromBlobStorageGoSdk(t *testing.T) {
}

check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID)
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
}

Expand All @@ -233,25 +202,21 @@ func TestCheckpointFromBlobStorageDapr(t *testing.T) {
}

partitionID := "0"
offset := "1004"
consumerGroup := "$default"
eventhubName := "hub"

sequencenumber := int64(1)

containerName := fmt.Sprintf("dapr-%s-%s-%s", eventhubName, consumerGroup, partitionID)
checkpointFormat := "{\"partitionID\":\"%s\",\"epoch\":0,\"owner\":\"\",\"checkpoint\":{\"offset\":\"%s\",\"sequenceNumber\":%d,\"enqueueTime\":\"\"},\"state\":\"\",\"token\":\"\"}"
checkpoint := fmt.Sprintf(checkpointFormat, partitionID, offset, sequencenumber)
checkpointFormat := "{\"partitionID\":\"%s\",\"epoch\":0,\"owner\":\"\",\"checkpoint\":{\"sequenceNumber\":%d,\"enqueueTime\":\"\"},\"state\":\"\",\"token\":\"\"}"
checkpoint := fmt.Sprintf(checkpointFormat, partitionID, sequencenumber)

urlPath := ""

ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil)
assert.Equal(t, err, nil)

expectedCheckpoint := Checkpoint{
baseCheckpoint: baseCheckpoint{
Offset: offset,
},
PartitionID: partitionID,
SequenceNumber: sequencenumber,
}
Expand All @@ -265,8 +230,6 @@ func TestCheckpointFromBlobStorageDapr(t *testing.T) {
}

check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID)
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
}

Expand Down
12 changes: 2 additions & 10 deletions pkg/scalers/azure_eventhub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ func parseAzureEventHubAuthenticationMetadata(logger logr.Logger, config *scaler

// GetUnprocessedEventCountInPartition gets number of unprocessed events in a given partition
func (s *azureEventHubScaler) GetUnprocessedEventCountInPartition(ctx context.Context, partitionInfo *eventhub.HubPartitionRuntimeInformation) (newEventCount int64, checkpoint azure.Checkpoint, err error) {
// if partitionInfo.LastEnqueuedOffset = -1, that means event hub partition is empty
if partitionInfo == nil || partitionInfo.LastEnqueuedOffset == "-1" {
// if partitionInfo.LastSequenceNumber = -1, that means event hub partition is empty
if partitionInfo == nil || partitionInfo.LastSequenceNumber == -1 {
return 0, azure.Checkpoint{}, nil
}

Expand All @@ -306,14 +306,6 @@ func (s *azureEventHubScaler) GetUnprocessedEventCountInPartition(ctx context.Co
func calculateUnprocessedEvents(partitionInfo *eventhub.HubPartitionRuntimeInformation, checkpoint azure.Checkpoint, stalePartitionInfoThreshold int64) int64 {
unprocessedEventCount := int64(0)

// If checkpoint.Offset is empty that means no messages has been processed from an event hub partition
// And since partitionInfo.LastSequenceNumber = 0 for the very first message hence
// total unprocessed message will be partitionInfo.LastSequenceNumber + 1
if checkpoint.Offset == "" {
unprocessedEventCount = partitionInfo.LastSequenceNumber + 1
return unprocessedEventCount
}

if partitionInfo.LastSequenceNumber >= checkpoint.SequenceNumber {
unprocessedEventCount = partitionInfo.LastSequenceNumber - checkpoint.SequenceNumber
} else {
Expand Down
38 changes: 19 additions & 19 deletions pkg/scalers/azure_eventhub_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,51 +211,51 @@ var parseEventHubMetadataDatasetWithPodIdentity = []parseEventHubMetadataTestDat

var calculateUnprocessedEventsDataset = []calculateUnprocessedEventsTestData{
{
checkpoint: azure.NewCheckpoint("1", 5),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 10, LastEnqueuedOffset: "2"},
checkpoint: azure.NewCheckpoint(5),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 10},
unprocessedEvents: 5,
},
{
checkpoint: azure.NewCheckpoint("1002", 4611686018427387903),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905, LastEnqueuedOffset: "1000"},
checkpoint: azure.NewCheckpoint(4611686018427387903),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905},
unprocessedEvents: 2,
},
{
checkpoint: azure.NewCheckpoint("900", 4611686018427387900),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905, LastEnqueuedOffset: "1000"},
checkpoint: azure.NewCheckpoint(4611686018427387900),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905},
unprocessedEvents: 5,
},
{
checkpoint: azure.NewCheckpoint("800", 4000000000000200000),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4000000000000000000, LastEnqueuedOffset: "750"},
checkpoint: azure.NewCheckpoint(4000000000000200000),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4000000000000000000},
unprocessedEvents: 9223372036854575807,
},
// Empty checkpoint
{
checkpoint: azure.NewCheckpoint("", 0),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 1, LastEnqueuedOffset: "1"},
unprocessedEvents: 2,
checkpoint: azure.NewCheckpoint(0),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 1},
unprocessedEvents: 1,
},
// Stale PartitionInfo
{
checkpoint: azure.NewCheckpoint("5", 15),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 10, LastEnqueuedOffset: "2"},
checkpoint: azure.NewCheckpoint(15),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 10},
unprocessedEvents: 0,
},
{
checkpoint: azure.NewCheckpoint("1000", 4611686018427387910),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905, LastEnqueuedOffset: "900"},
checkpoint: azure.NewCheckpoint(4611686018427387910),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905},
unprocessedEvents: 0,
},
{
checkpoint: azure.NewCheckpoint("1", 5),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 9223372036854775797, LastEnqueuedOffset: "10000"},
checkpoint: azure.NewCheckpoint(5),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 9223372036854775797},
unprocessedEvents: 0,
},
// Circular buffer reset
{
checkpoint: azure.NewCheckpoint("100000", 9223372036854775797),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 5, LastEnqueuedOffset: "1"},
checkpoint: azure.NewCheckpoint(9223372036854775797),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 5},
unprocessedEvents: 15,
},
}
Expand Down

0 comments on commit e7ff90a

Please sign in to comment.