From 61cb7314f00ae319cc7b14dd91edc24144d9cbc5 Mon Sep 17 00:00:00 2001 From: Vighnesh Shenoy Date: Wed, 30 Mar 2022 14:33:44 +0530 Subject: [PATCH 1/3] Refactor Azure Blob Scaler code. Signed-off-by: Vighnesh Shenoy --- pkg/scalers/azure/azure_blob.go | 22 +++++++--- pkg/scalers/azure/azure_blob_test.go | 7 ++- pkg/scalers/azure_blob_scaler.go | 66 ++++++++++------------------ 3 files changed, 44 insertions(+), 51 deletions(-) diff --git a/pkg/scalers/azure/azure_blob.go b/pkg/scalers/azure/azure_blob.go index 36d3323504a..600652519e8 100644 --- a/pkg/scalers/azure/azure_blob.go +++ b/pkg/scalers/azure/azure_blob.go @@ -25,21 +25,33 @@ import ( "github.com/kedacore/keda/v2/pkg/util" ) +type BlobMetadata struct { + TargetBlobCount int64 + BlobContainerName string + BlobDelimiter string + BlobPrefix string + Connection string + AccountName string + MetricName string + EndpointSuffix string + ScalerIndex int +} + // GetAzureBlobListLength returns the count of the blobs in blob container in int -func GetAzureBlobListLength(ctx context.Context, httpClient util.HTTPDoer, podIdentity kedav1alpha1.PodIdentityProvider, connectionString, blobContainerName string, accountName string, blobDelimiter string, blobPrefix string, endpointSuffix string) (int64, error) { - credential, endpoint, err := ParseAzureStorageBlobConnection(ctx, httpClient, podIdentity, connectionString, accountName, endpointSuffix) +func GetAzureBlobListLength(ctx context.Context, httpClient util.HTTPDoer, podIdentity kedav1alpha1.PodIdentityProvider, meta *BlobMetadata) (int64, error) { + credential, endpoint, err := ParseAzureStorageBlobConnection(ctx, httpClient, podIdentity, meta.Connection, meta.AccountName, meta.EndpointSuffix) if err != nil { return -1, err } listBlobsSegmentOptions := azblob.ListBlobsSegmentOptions{ - Prefix: blobPrefix, + Prefix: meta.BlobPrefix, } p := azblob.NewPipeline(credential, azblob.PipelineOptions{}) serviceURL := azblob.NewServiceURL(*endpoint, p) - containerURL := serviceURL.NewContainerURL(blobContainerName) + containerURL := serviceURL.NewContainerURL(meta.BlobContainerName) - props, err := containerURL.ListBlobsHierarchySegment(ctx, azblob.Marker{}, blobDelimiter, listBlobsSegmentOptions) + props, err := containerURL.ListBlobsHierarchySegment(ctx, azblob.Marker{}, meta.BlobDelimiter, listBlobsSegmentOptions) if err != nil { return -1, err } diff --git a/pkg/scalers/azure/azure_blob_test.go b/pkg/scalers/azure/azure_blob_test.go index baec439496a..d1961386931 100644 --- a/pkg/scalers/azure/azure_blob_test.go +++ b/pkg/scalers/azure/azure_blob_test.go @@ -9,7 +9,9 @@ import ( func TestGetBlobLength(t *testing.T) { httpClient := http.DefaultClient - length, err := GetAzureBlobListLength(context.TODO(), httpClient, "", "", "blobContainerName", "", "", "", "") + + meta := BlobMetadata{Connection: "", BlobContainerName: "blobContainerName", AccountName: "", BlobDelimiter: "", BlobPrefix: "", EndpointSuffix: ""} + length, err := GetAzureBlobListLength(context.TODO(), httpClient, "", &meta) if length != -1 { t.Error("Expected length to be -1, but got", length) } @@ -22,7 +24,8 @@ func TestGetBlobLength(t *testing.T) { t.Error("Expected error to contain parsing error message, but got", err.Error()) } - length, err = GetAzureBlobListLength(context.TODO(), httpClient, "", "DefaultEndpointsProtocol=https;AccountName=name;AccountKey=key==;EndpointSuffix=core.windows.net", "blobContainerName", "", "", "", "") + meta.Connection = "DefaultEndpointsProtocol=https;AccountName=name;AccountKey=key==;EndpointSuffix=core.windows.net" + length, err = GetAzureBlobListLength(context.TODO(), httpClient, "", &meta) if length != -1 { t.Error("Expected length to be -1, but got", length) diff --git a/pkg/scalers/azure_blob_scaler.go b/pkg/scalers/azure_blob_scaler.go index c0fc9a19df4..6db92f1afb0 100644 --- a/pkg/scalers/azure_blob_scaler.go +++ b/pkg/scalers/azure_blob_scaler.go @@ -43,23 +43,11 @@ const ( type azureBlobScaler struct { metricType v2beta2.MetricTargetType - metadata *azureBlobMetadata + metadata *azure.BlobMetadata podIdentity kedav1alpha1.PodIdentityProvider httpClient *http.Client } -type azureBlobMetadata struct { - targetBlobCount int64 - blobContainerName string - blobDelimiter string - blobPrefix string - connection string - accountName string - metricName string - endpointSuffix string - scalerIndex int -} - var azureBlobLog = logf.Log.WithName("azure_blob_scaler") // NewAzureBlobScaler creates a new azureBlobScaler @@ -82,11 +70,11 @@ func NewAzureBlobScaler(config *ScalerConfig) (Scaler, error) { }, nil } -func parseAzureBlobMetadata(config *ScalerConfig) (*azureBlobMetadata, kedav1alpha1.PodIdentityProvider, error) { - meta := azureBlobMetadata{} - meta.targetBlobCount = defaultTargetBlobCount - meta.blobDelimiter = defaultBlobDelimiter - meta.blobPrefix = defaultBlobPrefix +func parseAzureBlobMetadata(config *ScalerConfig) (*azure.BlobMetadata, kedav1alpha1.PodIdentityProvider, error) { + meta := azure.BlobMetadata{} + meta.TargetBlobCount = defaultTargetBlobCount + meta.BlobDelimiter = defaultBlobDelimiter + meta.BlobPrefix = defaultBlobPrefix if val, ok := config.TriggerMetadata[blobCountMetricName]; ok { blobCount, err := strconv.ParseInt(val, 10, 64) @@ -95,21 +83,21 @@ func parseAzureBlobMetadata(config *ScalerConfig) (*azureBlobMetadata, kedav1alp return nil, "", fmt.Errorf("error parsing azure blob metadata %s: %s", blobCountMetricName, err.Error()) } - meta.targetBlobCount = blobCount + meta.TargetBlobCount = blobCount } if val, ok := config.TriggerMetadata["blobContainerName"]; ok && val != "" { - meta.blobContainerName = val + meta.BlobContainerName = val } else { return nil, "", fmt.Errorf("no blobContainerName given") } if val, ok := config.TriggerMetadata["blobDelimiter"]; ok && val != "" { - meta.blobDelimiter = val + meta.BlobDelimiter = val } if val, ok := config.TriggerMetadata["blobPrefix"]; ok && val != "" { - meta.blobPrefix = val + meta.blobDelimiter + meta.BlobPrefix = val + meta.BlobDelimiter } endpointSuffix, err := azure.ParseAzureStorageEndpointSuffix(config.TriggerMetadata, azure.BlobEndpoint) @@ -117,7 +105,7 @@ func parseAzureBlobMetadata(config *ScalerConfig) (*azureBlobMetadata, kedav1alp return nil, "", err } - meta.endpointSuffix = endpointSuffix + meta.EndpointSuffix = endpointSuffix // before triggerAuthentication CRD, pod identity was configured using this property if val, ok := config.TriggerMetadata["useAAdPodIdentity"]; ok && config.PodIdentity == "" && val == "true" { @@ -125,9 +113,9 @@ func parseAzureBlobMetadata(config *ScalerConfig) (*azureBlobMetadata, kedav1alp } if val, ok := config.TriggerMetadata["metricName"]; ok { - meta.metricName = kedautil.NormalizeString(fmt.Sprintf("azure-blob-%s", val)) + meta.MetricName = kedautil.NormalizeString(fmt.Sprintf("azure-blob-%s", val)) } else { - meta.metricName = kedautil.NormalizeString(fmt.Sprintf("azure-blob-%s", meta.blobContainerName)) + meta.MetricName = kedautil.NormalizeString(fmt.Sprintf("azure-blob-%s", meta.BlobContainerName)) } // If the Use AAD Pod Identity is not present, or set to "none" @@ -137,18 +125,18 @@ func parseAzureBlobMetadata(config *ScalerConfig) (*azureBlobMetadata, kedav1alp // Azure Blob Scaler expects a "connection" parameter in the metadata // of the scaler or in a TriggerAuthentication object if config.AuthParams["connection"] != "" { - meta.connection = config.AuthParams["connection"] + meta.Connection = config.AuthParams["connection"] } else if config.TriggerMetadata["connectionFromEnv"] != "" { - meta.connection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]] + meta.Connection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]] } - if len(meta.connection) == 0 { + if len(meta.Connection) == 0 { return nil, "", fmt.Errorf("no connection setting given") } case kedav1alpha1.PodIdentityProviderAzure: // If the Use AAD Pod Identity is present then check account name if val, ok := config.TriggerMetadata["accountName"]; ok && val != "" { - meta.accountName = val + meta.AccountName = val } else { return nil, "", fmt.Errorf("no accountName given") } @@ -156,7 +144,7 @@ func parseAzureBlobMetadata(config *ScalerConfig) (*azureBlobMetadata, kedav1alp return nil, "", fmt.Errorf("pod identity %s not supported for azure storage blobs", config.PodIdentity) } - meta.scalerIndex = config.ScalerIndex + meta.ScalerIndex = config.ScalerIndex return &meta, config.PodIdentity, nil } @@ -167,12 +155,7 @@ func (s *azureBlobScaler) IsActive(ctx context.Context) (bool, error) { ctx, s.httpClient, s.podIdentity, - s.metadata.connection, - s.metadata.blobContainerName, - s.metadata.accountName, - s.metadata.blobDelimiter, - s.metadata.blobPrefix, - s.metadata.endpointSuffix, + s.metadata, ) if err != nil { @@ -190,9 +173,9 @@ func (s *azureBlobScaler) Close(context.Context) error { func (s *azureBlobScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec { externalMetric := &v2beta2.ExternalMetricSource{ Metric: v2beta2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, s.metadata.metricName), + Name: GenerateMetricNameWithIndex(s.metadata.ScalerIndex, s.metadata.MetricName), }, - Target: GetMetricTarget(s.metricType, s.metadata.targetBlobCount), + Target: GetMetricTarget(s.metricType, s.metadata.TargetBlobCount), } metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType} return []v2beta2.MetricSpec{metricSpec} @@ -204,12 +187,7 @@ func (s *azureBlobScaler) GetMetrics(ctx context.Context, metricName string, met ctx, s.httpClient, s.podIdentity, - s.metadata.connection, - s.metadata.blobContainerName, - s.metadata.accountName, - s.metadata.blobDelimiter, - s.metadata.blobPrefix, - s.metadata.endpointSuffix, + s.metadata, ) if err != nil { From dd906ee7b17ce5f25f541300b2c5ab81035cf7eb Mon Sep 17 00:00:00 2001 From: Vighnesh Shenoy Date: Wed, 30 Mar 2022 18:24:44 +0530 Subject: [PATCH 2/3] Add globPattern, recursive parameters to Azure Blob Storage Scaler. Signed-off-by: Vighnesh Shenoy --- CHANGELOG.md | 1 + go.mod | 1 + go.sum | 2 ++ pkg/scalers/azure/azure_blob.go | 18 ++++++++++++++++++ pkg/scalers/azure_blob_scaler.go | 20 ++++++++++++++++++++ 5 files changed, 42 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 19bac9ea39c..25db5336d27 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ - **General:** Synchronize HPA annotations from ScaledObject ([#2659](https://github.com/kedacore/keda/pull/2659)) - **General:** Updated HTTPClient to be proxy-aware, if available, from environment variables. ([#2577](https://github.com/kedacore/keda/issues/2577)) - **Azure Application Insights Scaler:** Provide support for non-public clouds ([#2735](https://github.com/kedacore/keda/issues/2735)) +- **Azure Blob Storage Scaler:** Add optional parameters for counting blobs recursively ([#1789](https://github.com/kedacore/keda/issues/1789)) - **Azure Event Hub Scaler:** Improve logging when blob container not found ([#2363](https://github.com/kedacore/keda/issues/2363)) - **Azure Event Hub Scaler:** Provide support for non-public clouds ([#1915](https://github.com/kedacore/keda/issues/1915)) - **Azure Log Analytics Scaler:** Provide support for non-public clouds ([#1916](https://github.com/kedacore/keda/issues/1916)) diff --git a/go.mod b/go.mod index a4bf4f8e376..235d3be9309 100644 --- a/go.mod +++ b/go.mod @@ -131,6 +131,7 @@ require ( github.com/go-playground/locales v0.14.0 // indirect github.com/go-playground/universal-translator v0.18.0 // indirect github.com/go-stack/stack v1.8.0 // indirect + github.com/gobwas/glob v0.2.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v4 v4.2.0 // indirect github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect diff --git a/go.sum b/go.sum index a80d7802c92..5f7ec09198f 100644 --- a/go.sum +++ b/go.sum @@ -389,6 +389,8 @@ github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg78 github.com/go-test/deep v1.0.2 h1:onZX1rnHT3Wv6cqNgYyFOOlgVKJrksuCMCRvJStbMYw= github.com/go-test/deep v1.0.2/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/gobuffalo/flect v0.2.4/go.mod h1:1ZyCLIbg0YD7sDkzvFdPoOydPtD8y9JQnrOROolUcM8= +github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= +github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8= diff --git a/pkg/scalers/azure/azure_blob.go b/pkg/scalers/azure/azure_blob.go index 600652519e8..93cd69e8d64 100644 --- a/pkg/scalers/azure/azure_blob.go +++ b/pkg/scalers/azure/azure_blob.go @@ -20,6 +20,7 @@ import ( "context" "github.com/Azure/azure-storage-blob-go/azblob" + "github.com/gobwas/glob" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/kedacore/keda/v2/pkg/util" @@ -35,6 +36,7 @@ type BlobMetadata struct { MetricName string EndpointSuffix string ScalerIndex int + GlobPattern *glob.Glob } // GetAzureBlobListLength returns the count of the blobs in blob container in int @@ -51,6 +53,22 @@ func GetAzureBlobListLength(ctx context.Context, httpClient util.HTTPDoer, podId serviceURL := azblob.NewServiceURL(*endpoint, p) containerURL := serviceURL.NewContainerURL(meta.BlobContainerName) + if meta.GlobPattern != nil { + props, err := containerURL.ListBlobsFlatSegment(ctx, azblob.Marker{}, azblob.ListBlobsSegmentOptions{}) + if err != nil { + return -1, err + } + + var count int64 + globPattern := *meta.GlobPattern + for _, blobItem := range props.Segment.BlobItems { + if globPattern.Match(blobItem.Name) { + count++ + } + } + return count, nil + } + props, err := containerURL.ListBlobsHierarchySegment(ctx, azblob.Marker{}, meta.BlobDelimiter, listBlobsSegmentOptions) if err != nil { return -1, err diff --git a/pkg/scalers/azure_blob_scaler.go b/pkg/scalers/azure_blob_scaler.go index 6db92f1afb0..5f6318cfca3 100644 --- a/pkg/scalers/azure_blob_scaler.go +++ b/pkg/scalers/azure_blob_scaler.go @@ -29,6 +29,7 @@ import ( "k8s.io/metrics/pkg/apis/external_metrics" logf "sigs.k8s.io/controller-runtime/pkg/log" + "github.com/gobwas/glob" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/kedacore/keda/v2/pkg/scalers/azure" kedautil "github.com/kedacore/keda/v2/pkg/util" @@ -96,6 +97,25 @@ func parseAzureBlobMetadata(config *ScalerConfig) (*azure.BlobMetadata, kedav1al meta.BlobDelimiter = val } + if val, ok := config.TriggerMetadata["recursive"]; ok && val != "" { + recursive, err := strconv.ParseBool(val) + if err != nil { + return nil, "", err + } + + if recursive { + meta.BlobDelimiter = "" + } + } + + if val, ok := config.TriggerMetadata["globPattern"]; ok && val != "" { + glob, err := glob.Compile(val) + if err != nil { + return nil, "", fmt.Errorf("invalid glob pattern - %s", err.Error()) + } + meta.GlobPattern = &glob + } + if val, ok := config.TriggerMetadata["blobPrefix"]; ok && val != "" { meta.BlobPrefix = val + meta.BlobDelimiter } From 533fa81680f3fe9e55dcb0a7467dec48a6a3b4f1 Mon Sep 17 00:00:00 2001 From: Vighnesh Shenoy Date: Wed, 30 Mar 2022 23:14:33 +0530 Subject: [PATCH 3/3] Add unit tests. Signed-off-by: Vighnesh Shenoy --- pkg/scalers/azure_blob_scaler_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/scalers/azure_blob_scaler_test.go b/pkg/scalers/azure_blob_scaler_test.go index e3d378138ba..33bf3b6871d 100644 --- a/pkg/scalers/azure_blob_scaler_test.go +++ b/pkg/scalers/azure_blob_scaler_test.go @@ -71,6 +71,16 @@ var testAzBlobMetadata = []parseAzBlobMetadataTestData{ {map[string]string{"accountName": "sample_acc", "blobContainerName": "sample_container", "cloud": "", "endpointSuffix": "ignored"}, false, testAzBlobResolvedEnv, map[string]string{}, kedav1alpha1.PodIdentityProviderAzure}, // connection from authParams {map[string]string{"blobContainerName": "sample_container", "blobCount": "5"}, false, testAzBlobResolvedEnv, map[string]string{"connection": "value"}, kedav1alpha1.PodIdentityProviderNone}, + // with globPattern + {map[string]string{"connectionFromEnv": "CONNECTION", "blobContainerName": "sample", "blobCount": "5", "globPattern": "foo**"}, false, testAzBlobResolvedEnv, map[string]string{}, ""}, + // with recursive true + {map[string]string{"connectionFromEnv": "CONNECTION", "blobContainerName": "sample", "blobCount": "5", "recursive": "true"}, false, testAzBlobResolvedEnv, map[string]string{}, ""}, + // with recursive false + {map[string]string{"connectionFromEnv": "CONNECTION", "blobContainerName": "sample", "blobCount": "5", "recursive": "false"}, false, testAzBlobResolvedEnv, map[string]string{}, ""}, + // with invalid value for recursive + {map[string]string{"connectionFromEnv": "CONNECTION", "blobContainerName": "sample", "blobCount": "5", "recursive": "invalid"}, true, testAzBlobResolvedEnv, map[string]string{}, ""}, + // with invalid glob pattern + {map[string]string{"connectionFromEnv": "CONNECTION", "blobContainerName": "sample", "blobCount": "5", "globPattern": "[\\]"}, true, testAzBlobResolvedEnv, map[string]string{}, ""}, } var azBlobMetricIdentifiers = []azBlobMetricIdentifier{