From 4776d09c8fd761814c1eb9ba7e964ceace651152 Mon Sep 17 00:00:00 2001 From: Jorge Turrado Date: Wed, 21 Aug 2024 00:31:28 +0200 Subject: [PATCH 1/2] fix: Azure EventHub shows checkpointer error properly Signed-off-by: Jorge Turrado --- pkg/scalers/azure_eventhub_scaler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/scalers/azure_eventhub_scaler.go b/pkg/scalers/azure_eventhub_scaler.go index de262e38b1e..a185d1c681d 100644 --- a/pkg/scalers/azure_eventhub_scaler.go +++ b/pkg/scalers/azure_eventhub_scaler.go @@ -276,9 +276,9 @@ func (s *azureEventHubScaler) GetUnprocessedEventCountInPartition(ctx context.Co checkpoint, err = azure.GetCheckpointFromBlobStorage(ctx, s.blobStorageClient, s.metadata.eventHubInfo, partitionInfo.PartitionID) if err != nil { // if blob not found return the total partition event count - err = errors.Unwrap(err) - if bloberror.HasCode(err, bloberror.BlobNotFound, bloberror.ContainerNotFound) { - s.logger.V(1).Error(err, fmt.Sprintf("Blob container : %s not found to use checkpoint strategy, getting unprocessed event count without checkpoint", s.metadata.eventHubInfo.BlobContainer)) + unWrapped := errors.Unwrap(err) + if bloberror.HasCode(unWrapped, bloberror.BlobNotFound, bloberror.ContainerNotFound) { + s.logger.V(1).Error(unWrapped, fmt.Sprintf("Blob container : %s not found to use checkpoint strategy, getting unprocessed event count without checkpoint", s.metadata.eventHubInfo.BlobContainer)) return GetUnprocessedEventCountWithoutCheckpoint(partitionInfo), azure.Checkpoint{}, nil } return -1, azure.Checkpoint{}, fmt.Errorf("unable to get checkpoint from storage: %w", err) From 9b8be4868a27c304646cf8cb0735357eb272bd38 Mon Sep 17 00:00:00 2001 From: Jorge Turrado Date: Wed, 11 Sep 2024 00:57:58 +0200 Subject: [PATCH 2/2] . Signed-off-by: Jorge Turrado --- CHANGELOG.md | 2 +- pkg/scalers/azure_eventhub_scaler.go | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e7d28889e2..0570be63bae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -71,7 +71,7 @@ Here is an overview of all new **experimental** features: ### Fixes -- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) +- **Azure Event Hub Scaler**: Checkpointer errors are correctly handled ([#6084](https://github.com/kedacore/keda/issues/6084)) ### Deprecations diff --git a/pkg/scalers/azure_eventhub_scaler.go b/pkg/scalers/azure_eventhub_scaler.go index a185d1c681d..e967cc08454 100644 --- a/pkg/scalers/azure_eventhub_scaler.go +++ b/pkg/scalers/azure_eventhub_scaler.go @@ -18,7 +18,6 @@ limitations under the License. import ( "context" - "errors" "fmt" "math" "strconv" @@ -276,9 +275,8 @@ func (s *azureEventHubScaler) GetUnprocessedEventCountInPartition(ctx context.Co checkpoint, err = azure.GetCheckpointFromBlobStorage(ctx, s.blobStorageClient, s.metadata.eventHubInfo, partitionInfo.PartitionID) if err != nil { // if blob not found return the total partition event count - unWrapped := errors.Unwrap(err) - if bloberror.HasCode(unWrapped, bloberror.BlobNotFound, bloberror.ContainerNotFound) { - s.logger.V(1).Error(unWrapped, fmt.Sprintf("Blob container : %s not found to use checkpoint strategy, getting unprocessed event count without checkpoint", s.metadata.eventHubInfo.BlobContainer)) + if bloberror.HasCode(err, bloberror.BlobNotFound, bloberror.ContainerNotFound) { + s.logger.V(1).Error(err, fmt.Sprintf("Blob container : %s not found to use checkpoint strategy, getting unprocessed event count without checkpoint", s.metadata.eventHubInfo.BlobContainer)) return GetUnprocessedEventCountWithoutCheckpoint(partitionInfo), azure.Checkpoint{}, nil } return -1, azure.Checkpoint{}, fmt.Errorf("unable to get checkpoint from storage: %w", err)