From b25f7b03504cf48ec0e702e0850f2af22cce616f Mon Sep 17 00:00:00 2001 From: xavier geerinck Date: Mon, 16 Nov 2020 12:49:46 +0100 Subject: [PATCH 1/8] Wrong aws_iam_authorization access key id setting (#1332) * Wrong aws_iam_authorization access key id setting When getting the ACCESS_KEY_ID for AWS from Environment variables (which is currently the only working option), the `aws_iam_authorization` scaler would check if metadata.awsAccessKeyIDFromEnv was set, but it would then utilize the metadata.awsAccessKeyId for the name... while in the secretAccessKey it would then use the `FromEnv` one. This fixes that Signed-off-by: Xavier Geerinck --- CHANGELOG.md | 2 ++ pkg/scalers/aws_iam_authorization.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c58175825c..6ce2f3f2991 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,8 @@ ### Improvements +- Bug fix in aws_iam_authorization to utilize correct secret from env key name ([PR #1332](https://github.com/kedacore/keda/pull/1332)) + ### Breaking Changes ### Other diff --git a/pkg/scalers/aws_iam_authorization.go b/pkg/scalers/aws_iam_authorization.go index bdd33d830d7..e542ad01cad 100644 --- a/pkg/scalers/aws_iam_authorization.go +++ b/pkg/scalers/aws_iam_authorization.go @@ -30,7 +30,7 @@ func getAwsAuthorization(authParams, metadata, resolvedEnv map[string]string) (a if metadata["awsAccessKeyID"] != "" { meta.awsAccessKeyID = metadata["awsAccessKeyID"] } else if metadata["awsAccessKeyIDFromEnv"] != "" { - meta.awsAccessKeyID = resolvedEnv[metadata["awsAccessKeyID"]] + meta.awsAccessKeyID = resolvedEnv[metadata["awsAccessKeyIDFromEnv"]] } if len(meta.awsAccessKeyID) == 0 { From 9a17d10b3f6a70d7b144b463b288ec45b60f9a33 Mon Sep 17 00:00:00 2001 From: boris Date: Mon, 16 Nov 2020 19:56:30 +0800 Subject: [PATCH 2/8] add ScaledJob's label to its job (#1322) * add ScaledJob's label to its job Signed-off-by: mosesyou * Changelog: Add ScaledJob's label to its job Signed-off-by: mosesyou Co-authored-by: Zbynek Roubalik <726523+zroubalik@users.noreply.github.com> --- CHANGELOG.md | 1 + pkg/scaling/executor/scale_jobs.go | 19 ++++++++++++------- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ce2f3f2991..63365106deb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ ### Improvements +- Support add ScaledJob's label to its job ([#1311](https://github.com/kedacore/keda/issues/1311)) - Bug fix in aws_iam_authorization to utilize correct secret from env key name ([PR #1332](https://github.com/kedacore/keda/pull/1332)) ### Breaking Changes diff --git a/pkg/scaling/executor/scale_jobs.go b/pkg/scaling/executor/scale_jobs.go index 81eac699629..6b976973ff3 100644 --- a/pkg/scaling/executor/scale_jobs.go +++ b/pkg/scaling/executor/scale_jobs.go @@ -63,18 +63,23 @@ func (e *scaleExecutor) createJobs(logger logr.Logger, scaledJob *kedav1alpha1.S } logger.Info("Creating jobs", "Number of jobs", scaleTo) + labels := map[string]string{ + "app.kubernetes.io/name": scaledJob.GetName(), + "app.kubernetes.io/version": version.Version, + "app.kubernetes.io/part-of": scaledJob.GetName(), + "app.kubernetes.io/managed-by": "keda-operator", + "scaledjob": scaledJob.GetName(), + } + for key, value := range scaledJob.ObjectMeta.Labels { + labels[key] = value + } + for i := 0; i < int(scaleTo); i++ { job := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ GenerateName: scaledJob.GetName() + "-", Namespace: scaledJob.GetNamespace(), - Labels: map[string]string{ - "app.kubernetes.io/name": scaledJob.GetName(), - "app.kubernetes.io/version": version.Version, - "app.kubernetes.io/part-of": scaledJob.GetName(), - "app.kubernetes.io/managed-by": "keda-operator", - "scaledjob": scaledJob.GetName(), - }, + Labels: labels, }, Spec: *scaledJob.Spec.JobTargetRef.DeepCopy(), } From fb5b60361cd3af061b3f7abd8e159d3761dc9175 Mon Sep 17 00:00:00 2001 From: Girish Ramnani Date: Mon, 16 Nov 2020 20:54:09 +0530 Subject: [PATCH 3/8] use the latest extension id for golang in devcontainer.json (#1334) Signed-off-by: Girish Ramnani --- .devcontainer/devcontainer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index ac16988e453..4c295ad7c37 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -29,6 +29,6 @@ // Add the IDs of extensions you want installed when the container is created in the array below. "extensions": [ - "ms-vscode.go" + "golang.go" ] } From e07bc0cb744dc61907dd485204647cef79bca868 Mon Sep 17 00:00:00 2001 From: Girish Ramnani Date: Mon, 16 Nov 2020 20:55:24 +0530 Subject: [PATCH 4/8] resolve broken link to community standup section in contributing.md (#1335) Signed-off-by: Girish Ramnani --- CONTRIBUTING.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 0768ee0339b..4a4570943c6 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -28,7 +28,7 @@ would love to become a maintainer to support our project please create an issue ## Getting Help -If you have a question about KEDA or how best to contribute, the [#KEDA](https://kubernetes.slack.com/archives/CKZJ36A5D) channel on the Kubernetes slack channel ([get an invite if you don't have one already](https://slack.k8s.io/)) is a good place to start. We also have regular [community stand-ups](https://github.com/kedacore/keda#community-standup) to track ongoing work and discuss areas of contribution. For any issues with the product you can [create an issue](https://github.com/kedacore/keda/issues/new) in this repo. +If you have a question about KEDA or how best to contribute, the [#KEDA](https://kubernetes.slack.com/archives/CKZJ36A5D) channel on the Kubernetes slack channel ([get an invite if you don't have one already](https://slack.k8s.io/)) is a good place to start. We also have regular [community stand-ups](https://github.com/kedacore/keda#community) to track ongoing work and discuss areas of contribution. For any issues with the product you can [create an issue](https://github.com/kedacore/keda/issues/new) in this repo. ## Contributing Scalers From f7fab4087de24464382e70f11baebe939fbf0af0 Mon Sep 17 00:00:00 2001 From: Girish Ramnani Date: Tue, 17 Nov 2020 02:11:02 +0530 Subject: [PATCH 5/8] update the setup-go set from v2-beta to v2 (#1337) Signed-off-by: Girish Ramnani --- .github/workflows/pr-validation.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pr-validation.yml b/.github/workflows/pr-validation.yml index 0699ffa0c29..def9df85e7d 100644 --- a/.github/workflows/pr-validation.yml +++ b/.github/workflows/pr-validation.yml @@ -38,7 +38,7 @@ jobs: steps: - uses: actions/checkout@v2 - uses: actions/setup-python@v1 - - uses: actions/setup-go@v2-beta + - uses: actions/setup-go@v2 with: go-version: 1.15.3 - name: Get golangci From d217b934d3b194ac438ae5782a316a2c19ae5f73 Mon Sep 17 00:00:00 2001 From: Emad Alashi Date: Tue, 17 Nov 2020 08:19:18 +1100 Subject: [PATCH 6/8] Eventhub podidentity (#1305) Signed-off-by: Emad Alashi --- CHANGELOG.md | 2 +- go.mod | 10 +++-- go.sum | 14 +++++++ pkg/scalers/azure/azure_eventhub.go | 45 ++++++++++++++++++----- pkg/scalers/azure_eventhub_scaler.go | 45 +++++++++++++++++------ pkg/scalers/azure_eventhub_scaler_test.go | 25 ++++++++++++- 6 files changed, 116 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 63365106deb..b57ddadcde3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,9 +17,9 @@ ## Unreleased ### New +- Can use Pod Identity with Azure Event Hub scaler ([#994](https://github.com/kedacore/keda/issues/994)) ### Improvements - - Support add ScaledJob's label to its job ([#1311](https://github.com/kedacore/keda/issues/1311)) - Bug fix in aws_iam_authorization to utilize correct secret from env key name ([PR #1332](https://github.com/kedacore/keda/pull/1332)) diff --git a/go.mod b/go.mod index ce293d448d2..5b67228a069 100644 --- a/go.mod +++ b/go.mod @@ -4,12 +4,14 @@ go 1.15 require ( cloud.google.com/go v0.65.0 - github.com/Azure/azure-amqp-common-go/v3 v3.0.1 - github.com/Azure/azure-event-hubs-go v1.3.1 - github.com/Azure/azure-sdk-for-go v47.0.0+incompatible + github.com/Azure/azure-amqp-common-go/v3 v3.1.0 + github.com/Azure/azure-event-hubs-go v1.3.1 // indirect + github.com/Azure/azure-event-hubs-go/v3 v3.3.2 + github.com/Azure/azure-sdk-for-go v48.0.0+incompatible github.com/Azure/azure-service-bus-go v0.10.6 github.com/Azure/azure-storage-blob-go v0.10.0 github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd + github.com/Azure/go-autorest/autorest v0.11.10 github.com/Azure/go-autorest/autorest/azure/auth v0.5.3 github.com/Huawei/gophercloud v1.0.21 github.com/Shopify/sarama v1.27.1 @@ -34,6 +36,8 @@ require ( github.com/stretchr/testify v1.6.1 github.com/tidwall/gjson v1.6.1 github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c + golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 // indirect + golang.org/x/net v0.0.0-20201031054903-ff519b6c9102 // indirect google.golang.org/api v0.31.0 google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d google.golang.org/grpc v1.31.1 diff --git a/go.sum b/go.sum index 06b1dc4b172..849597f785b 100644 --- a/go.sum +++ b/go.sum @@ -50,8 +50,12 @@ github.com/Azure/azure-amqp-common-go/v2 v2.1.0 h1:+QbFgmWCnPzdaRMfsI0Yb6GrRdBj5 github.com/Azure/azure-amqp-common-go/v2 v2.1.0/go.mod h1:R8rea+gJRuJR6QxTir/XuEd+YuKoUiazDC/N96FiDEU= github.com/Azure/azure-amqp-common-go/v3 v3.0.1 h1:mXh+eyOxGLBfqDtfmbtby0l7XfG/6b2NkuZ3B7i6zHA= github.com/Azure/azure-amqp-common-go/v3 v3.0.1/go.mod h1:PBIGdzcO1teYoufTKMcGibdKaYZv4avS+O6LNIp8bq0= +github.com/Azure/azure-amqp-common-go/v3 v3.1.0 h1:1N4YSkWYWffOpQHromYdOucBSQXhNRKzqtgICy6To8Q= +github.com/Azure/azure-amqp-common-go/v3 v3.1.0/go.mod h1:PBIGdzcO1teYoufTKMcGibdKaYZv4avS+O6LNIp8bq0= github.com/Azure/azure-event-hubs-go v1.3.1 h1:vKw7tLOFJ8kVMkhNvOXZWz+3purRQ/WTe60+bavZ5qc= github.com/Azure/azure-event-hubs-go v1.3.1/go.mod h1:me2m3+0WC7G7JRBTWI5SQ81s2TYyOqgV3JIpYg86jZA= +github.com/Azure/azure-event-hubs-go/v3 v3.3.2 h1:R3HoM9QiZ2uBcxfMPROBJPSsCXUL31TiV5vQ3dRsRNg= +github.com/Azure/azure-event-hubs-go/v3 v3.3.2/go.mod h1:sszMsQpFy8Au2s2NColbnJY8lRVm1koW0XxBJ3rN5TY= github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg= github.com/Azure/azure-pipeline-go v0.1.9/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg= github.com/Azure/azure-pipeline-go v0.2.1/go.mod h1:UGSo8XybXnIGZ3epmeBw7Jdz+HiUVpqIlpz/HKHylF4= @@ -73,12 +77,15 @@ github.com/Azure/azure-sdk-for-go v46.0.0+incompatible h1:4qlEOCDcDQZTGczYGzbGYC github.com/Azure/azure-sdk-for-go v46.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go v47.0.0+incompatible h1:Hn9OhJUtoLjm27f17/JKw38KBQny0cjpnsBHn7kPpTI= github.com/Azure/azure-sdk-for-go v47.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= +github.com/Azure/azure-sdk-for-go v48.0.0+incompatible h1:adRBpSbkY3IAgqBA83nSDN8yXDsy48zJNPqSwZabDNQ= +github.com/Azure/azure-sdk-for-go v48.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-service-bus-go v0.9.1 h1:G1qBLQvHCFDv9pcpgwgFkspzvnGknJRR0PYJ9ytY/JA= github.com/Azure/azure-service-bus-go v0.9.1/go.mod h1:yzBx6/BUGfjfeqbRZny9AQIbIe3AcV9WZbAdpkoXOa0= github.com/Azure/azure-service-bus-go v0.10.6 h1:xjxJf6rnEoX5yCCoKiXe5VPJAxdXZU1e2zgSkHMBRR8= github.com/Azure/azure-service-bus-go v0.10.6/go.mod h1:1tX7Ap1oTgovUJ9iUtMOjM9I/xcvjyZ0VMWzNF0cBf0= github.com/Azure/azure-storage-blob-go v0.0.0-20181023070848-cf01652132cc/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y= github.com/Azure/azure-storage-blob-go v0.0.0-20190123011202-457680cc0804/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y= +github.com/Azure/azure-storage-blob-go v0.6.0/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y= github.com/Azure/azure-storage-blob-go v0.8.0 h1:53qhf0Oxa0nOjgbDeeYPUeyiNmafAFEY95rZLK0Tj6o= github.com/Azure/azure-storage-blob-go v0.8.0/go.mod h1:lPI3aLPpuLTeUwh1sViKXFxwl2B6teiRqI0deQUvsw0= github.com/Azure/azure-storage-blob-go v0.10.0 h1:evCwGreYo3XLeBV4vSxLbLiYb6e0SzsJiXQVRGsRXxs= @@ -112,6 +119,8 @@ github.com/Azure/go-autorest/autorest v0.11.3 h1:fyYnmYujkIXUgv88D9/Wo2ybE4Zwd/T github.com/Azure/go-autorest/autorest v0.11.3/go.mod h1:JFgpikqFJ/MleTTxwepExTKnFUKKszPS8UavbQYUMuw= github.com/Azure/go-autorest/autorest v0.11.9 h1:P0ZF0dEYoUPUVDQo3mA1CvH5b8mKev7DDcmTwauuNME= github.com/Azure/go-autorest/autorest v0.11.9/go.mod h1:eipySxLmqSyC5s5k1CLupqet0PSENBEDP93LQ9a8QYw= +github.com/Azure/go-autorest/autorest v0.11.10 h1:j5sGbX7uj1ieYYkQ3Mpvewd4DCsEQ+ZeJpqnSM9pjnM= +github.com/Azure/go-autorest/autorest v0.11.10/go.mod h1:eipySxLmqSyC5s5k1CLupqet0PSENBEDP93LQ9a8QYw= github.com/Azure/go-autorest/autorest/adal v0.1.0/go.mod h1:MeS4XhScH55IST095THyTxElntu7WqB7pNbZo8Q5G3E= github.com/Azure/go-autorest/autorest/adal v0.5.0/go.mod h1:8Z9fGy2MpX0PvDjB1pEgQTmVqjGhiHBW7RJJEciWzS0= github.com/Azure/go-autorest/autorest/adal v0.8.0/go.mod h1:Z6vX6WXXuyieHAXwMj0S6HY6e6wcHn37qQMBQlvY3lc= @@ -1499,6 +1508,8 @@ golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rB golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E= +golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1586,6 +1597,8 @@ golang.org/x/net v0.0.0-20200904194848-62affa334b73 h1:MXfv8rhZWmFeqX3GNZRsd6vOL golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0 h1:wBouT66WTYFXdxfVdz9sVWARVd/2vfGcmI45D2gj45M= golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201031054903-ff519b6c9102 h1:42cLlJJdEh+ySyeUUbEQ5bsTiq8voBeTuweGVkY6Puw= +golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/oauth2 v0.0.0-20180724155351-3d292e4d0cdc/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1836,6 +1849,7 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogR gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b h1:QRR6H1YWRnHb4Y/HeNFCTJLFVxaq6wH4YuVdsUOr75U= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= diff --git a/pkg/scalers/azure/azure_eventhub.go b/pkg/scalers/azure/azure_eventhub.go index 03edd66900f..872a0b1c53c 100644 --- a/pkg/scalers/azure/azure_eventhub.go +++ b/pkg/scalers/azure/azure_eventhub.go @@ -11,9 +11,10 @@ import ( "github.com/imdario/mergo" - eventhub "github.com/Azure/azure-event-hubs-go" + "github.com/Azure/azure-amqp-common-go/v3/aad" + eventhub "github.com/Azure/azure-event-hubs-go/v3" "github.com/Azure/azure-storage-blob-go/azblob" - + "github.com/Azure/go-autorest/autorest/azure" kedav1alpha1 "github.com/kedacore/keda/api/v1alpha1" ) @@ -46,16 +47,35 @@ type EventHubInfo struct { EventHubConsumerGroup string StorageConnection string BlobContainer string + Namespace string + EventHubName string } // GetEventHubClient returns eventhub client func GetEventHubClient(info EventHubInfo) (*eventhub.Hub, error) { - hub, err := eventhub.NewHubFromConnectionString(info.EventHubConnection) - if err != nil { - return nil, fmt.Errorf("failed to create hub client: %s", err) + // The user wants to use a connectionstring, not a pod identity + if info.EventHubConnection != "" { + hub, err := eventhub.NewHubFromConnectionString(info.EventHubConnection) + if err != nil { + return nil, fmt.Errorf("failed to create hub client: %s", err) + } + return hub, nil } - return hub, nil + // Since there is no connectionstring, then user wants to use pod identity + // Internally, the JWTProvider will use Managed Service Identity to authenticate if no Service Principal info supplied + provider, aadErr := aad.NewJWTProvider(func(config *aad.TokenProviderConfiguration) error { + if config.Env == nil { + config.Env = &azure.PublicCloud + } + return nil + }) + + if aadErr == nil { + return eventhub.NewHub(info.Namespace, info.EventHubName, provider) + } + + return nil, aadErr } // GetCheckpointFromBlobStorage accesses Blob storage and gets checkpoint information of a partition @@ -65,9 +85,16 @@ func GetCheckpointFromBlobStorage(ctx context.Context, info EventHubInfo, partit return Checkpoint{}, err } - eventHubNamespace, eventHubName, err := ParseAzureEventHubConnectionString(info.EventHubConnection) - if err != nil { - return Checkpoint{}, err + var eventHubNamespace string + var eventHubName string + if info.EventHubConnection != "" { + eventHubNamespace, eventHubName, err = ParseAzureEventHubConnectionString(info.EventHubConnection) + if err != nil { + return Checkpoint{}, err + } + } else { + eventHubNamespace = info.Namespace + eventHubName = info.EventHubName } // TODO: add more ways to read from different types of storage and read checkpoints/leases written in different JSON formats diff --git a/pkg/scalers/azure_eventhub_scaler.go b/pkg/scalers/azure_eventhub_scaler.go index 1dc50eed33f..aaabb6ec2a3 100644 --- a/pkg/scalers/azure_eventhub_scaler.go +++ b/pkg/scalers/azure_eventhub_scaler.go @@ -7,9 +7,10 @@ import ( "math" "strconv" + "github.com/kedacore/keda/api/v1alpha1" "github.com/kedacore/keda/pkg/scalers/azure" - eventhub "github.com/Azure/azure-event-hubs-go" + eventhub "github.com/Azure/azure-event-hubs-go/v3" "github.com/Azure/azure-storage-blob-go/azblob" "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" @@ -85,16 +86,6 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error) return nil, fmt.Errorf("no storage connection string given") } - if config.AuthParams["connection"] != "" { - meta.eventHubInfo.EventHubConnection = config.AuthParams["connection"] - } else if config.TriggerMetadata["connectionFromEnv"] != "" { - meta.eventHubInfo.EventHubConnection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]] - } - - if len(meta.eventHubInfo.EventHubConnection) == 0 { - return nil, fmt.Errorf("no event hub connection string given") - } - meta.eventHubInfo.EventHubConsumerGroup = defaultEventHubConsumerGroup if val, ok := config.TriggerMetadata["consumerGroup"]; ok { meta.eventHubInfo.EventHubConsumerGroup = val @@ -105,6 +96,38 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error) meta.eventHubInfo.BlobContainer = val } + if config.PodIdentity == "" || config.PodIdentity == v1alpha1.PodIdentityProviderNone { + if config.AuthParams["connection"] != "" { + meta.eventHubInfo.EventHubConnection = config.AuthParams["connection"] + } else if config.TriggerMetadata["connectionFromEnv"] != "" { + meta.eventHubInfo.EventHubConnection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]] + } + + if len(meta.eventHubInfo.EventHubConnection) == 0 { + return nil, fmt.Errorf("no event hub connection string given") + } + } else { + if config.TriggerMetadata["eventHubNamespace"] != "" { + meta.eventHubInfo.Namespace = config.TriggerMetadata["eventHubNamespace"] + } else if config.TriggerMetadata["eventHubNamespaceFromEnv"] != "" { + meta.eventHubInfo.Namespace = config.ResolvedEnv[config.TriggerMetadata["eventHubNamespaceFromEnv"]] + } + + if len(meta.eventHubInfo.Namespace) == 0 { + return nil, fmt.Errorf("no event hub namespace string given") + } + + if config.TriggerMetadata["eventHubName"] != "" { + meta.eventHubInfo.EventHubName = config.TriggerMetadata["eventHubName"] + } else if config.TriggerMetadata["eventHubNameFromEnv"] != "" { + meta.eventHubInfo.EventHubName = config.ResolvedEnv[config.TriggerMetadata["eventHubNameFromEnv"]] + } + + if len(meta.eventHubInfo.EventHubName) == 0 { + return nil, fmt.Errorf("no event hub name string given") + } + } + return &meta, nil } diff --git a/pkg/scalers/azure_eventhub_scaler_test.go b/pkg/scalers/azure_eventhub_scaler_test.go index f760aae04f0..cd2dfa3761e 100644 --- a/pkg/scalers/azure_eventhub_scaler_test.go +++ b/pkg/scalers/azure_eventhub_scaler_test.go @@ -9,7 +9,7 @@ import ( "github.com/kedacore/keda/pkg/scalers/azure" - eventhub "github.com/Azure/azure-event-hubs-go" + eventhub "github.com/Azure/azure-event-hubs-go/v3" "github.com/Azure/azure-storage-blob-go/azblob" ) @@ -51,6 +51,18 @@ var parseEventHubMetadataDataset = []parseEventHubMetadataTestData{ {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "blobContainer": testContainerName}, false}, } +var parseEventHubMetadataDatasetWithPodIdentity = []parseEventHubMetadataTestData{ + {map[string]string{}, true}, + // Even though connection string is provided, this should fail because the eventhub Namespace is not provided explicitly when using Pod Identity + {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, true}, + // properly formed event hub metadata with Pod Identity + {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, false}, + // missing eventHubname + {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubNamespace": testEventHubNamespace}, true}, + // missing eventHubNamespace + {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName}, true}, +} + var eventHubMetricIdentifiers = []eventHubMetricIdentifier{ {&parseEventHubMetadataDataset[1], "azure-eventhub-none-testEventHubConsumerGroup"}, } @@ -76,6 +88,17 @@ func TestParseEventHubMetadata(t *testing.T) { t.Error("Expected error and got success") } } + + for _, testData := range parseEventHubMetadataDatasetWithPodIdentity { + _, err := parseAzureEventHubMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv, AuthParams: map[string]string{}, PodIdentity: "Azure"}) + + if err != nil && !testData.isError { + t.Errorf("Expected success but got error: %s", err) + } + if testData.isError && err == nil { + t.Error("Expected error and got success") + } + } } func TestGetUnprocessedEventCountInPartition(t *testing.T) { From 3f1e980865880ce86f4ff434a25c0470890af091 Mon Sep 17 00:00:00 2001 From: Girish Ramnani Date: Wed, 18 Nov 2020 01:02:15 +0530 Subject: [PATCH 7/8] enable gocyclo in golintCI and resolve the warnings (#1336) Signed-off-by: Girish Ramnani --- .golangci.yml | 2 +- pkg/scalers/azure_log_analytics_scaler.go | 163 ++++++++++------------ pkg/scalers/azure_monitor_scaler.go | 37 ++--- 3 files changed, 95 insertions(+), 107 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 8406ad0eece..c414388f9f4 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -34,7 +34,7 @@ linters: #- funlen #- goconst #- gocritic - #- gocyclo + - gocyclo - gosimple - stylecheck - unused diff --git a/pkg/scalers/azure_log_analytics_scaler.go b/pkg/scalers/azure_log_analytics_scaler.go index d3a8bae688a..38b9eb9b0ff 100644 --- a/pkg/scalers/azure_log_analytics_scaler.go +++ b/pkg/scalers/azure_log_analytics_scaler.go @@ -106,38 +106,26 @@ func parseAzureLogAnalyticsMetadata(config *ScalerConfig) (*azureLogAnalyticsMet meta := azureLogAnalyticsMetadata{} if config.PodIdentity == "" || config.PodIdentity == kedav1alpha1.PodIdentityProviderNone { - //Getting tenantId - if val, ok := config.AuthParams["tenantId"]; ok && val != "" { - meta.tenantID = val - } else if val, ok := config.TriggerMetadata["tenantId"]; ok && val != "" { - meta.tenantID = val - } else if val, ok := config.TriggerMetadata["tenantIdFromEnv"]; ok && val != "" { - meta.tenantID = config.ResolvedEnv[config.TriggerMetadata["tenantIdFromEnv"]] - } else { - return nil, fmt.Errorf("error parsing metadata. Details: tenantId was not found in metadata. Check your ScaledObject configuration") + // Getting tenantId + tenantID, err := getParameterFromConfig(config, "tenantId", true) + if err != nil { + return nil, err } + meta.tenantID = tenantID - //Getting clientId - if val, ok := config.AuthParams["clientId"]; ok && val != "" { - meta.clientID = val - } else if val, ok := config.TriggerMetadata["clientId"]; ok && val != "" { - meta.clientID = val - } else if val, ok := config.TriggerMetadata["clientIdFromEnv"]; ok && val != "" { - meta.clientID = config.ResolvedEnv[config.TriggerMetadata["clientIdFromEnv"]] - } else { - return nil, fmt.Errorf("error parsing metadata. Details: clientId was not found in metadata. Check your ScaledObject configuration") + // Getting clientId + clientID, err := getParameterFromConfig(config, "clientId", true) + if err != nil { + return nil, err } + meta.clientID = clientID - //Getting clientSecret - if val, ok := config.AuthParams["clientSecret"]; ok && val != "" { - meta.clientSecret = val - } else if val, ok := config.TriggerMetadata["clientSecret"]; ok && val != "" { - meta.clientSecret = val - } else if val, ok := config.TriggerMetadata["clientSecretFromEnv"]; ok && val != "" { - meta.clientSecret = config.ResolvedEnv[config.TriggerMetadata["clientSecretFromEnv"]] - } else { - return nil, fmt.Errorf("error parsing metadata. Details: clientSecret was not found in metadata. Check your ScaledObject configuration") + // Getting clientSecret + clientSecret, err := getParameterFromConfig(config, "clientSecret", true) + if err != nil { + return nil, err } + meta.clientSecret = clientSecret meta.podIdentity = "" } else if config.PodIdentity == kedav1alpha1.PodIdentityProviderAzure { @@ -146,46 +134,47 @@ func parseAzureLogAnalyticsMetadata(config *ScalerConfig) (*azureLogAnalyticsMet return nil, fmt.Errorf("error parsing metadata. Details: Log Analytics Scaler doesn't support pod identity %s", config.PodIdentity) } - //Getting workspaceId - if val, ok := config.AuthParams["workspaceId"]; ok && val != "" { - meta.workspaceID = val - } else if val, ok := config.TriggerMetadata["workspaceId"]; ok && val != "" { - meta.workspaceID = val - } else if val, ok := config.TriggerMetadata["workspaceIdFromEnv"]; ok && val != "" { - meta.workspaceID = config.ResolvedEnv[config.TriggerMetadata["workspaceIdFromEnv"]] - } else { - return nil, fmt.Errorf("error parsing metadata. Details: workspaceId was not found in metadata. Check your ScaledObject configuration") + // Getting workspaceId + workspaceID, err := getParameterFromConfig(config, "workspaceId", true) + if err != nil { + return nil, err } + meta.workspaceID = workspaceID - //Getting query - if val, ok := config.TriggerMetadata["query"]; ok && val != "" { - meta.query = val - } else if val, ok := config.TriggerMetadata["queryFromEnv"]; ok && val != "" { - meta.query = config.ResolvedEnv[config.TriggerMetadata["queryFromEnv"]] - } else { - return nil, fmt.Errorf("error parsing metadata. Details: query was not found in metadata. Check your ScaledObject configuration") + // Getting query, observe that we dont check AuthParams for query + query, err := getParameterFromConfig(config, "query", false) + if err != nil { + return nil, err } + meta.query = query - //Getting threshold - if val, ok := config.TriggerMetadata["threshold"]; ok && val != "" { - threshold, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return nil, fmt.Errorf("error parsing metadata. Details: can't parse threshold. Inner Error: %v", err) - } - meta.threshold = threshold - } else if val, ok := config.TriggerMetadata["thresholdFromEnv"]; ok && val != "" { - threshold, err := strconv.ParseInt(config.ResolvedEnv[config.TriggerMetadata["thresholdFromEnv"]], 10, 64) - if err != nil { - return nil, fmt.Errorf("error parsing metadata. Details: can't parse threshold. Inner Error: %v", err) - } - meta.threshold = threshold - } else { - return nil, fmt.Errorf("error parsing metadata. Details: threshold was not found in metadata. Check your ScaledObject configuration") + // Getting threshold, observe that we dont check AuthParams for threshold + val, err := getParameterFromConfig(config, "threshold", false) + if err != nil { + return nil, err } + threshold, err := strconv.ParseInt(val, 10, 64) + if err != nil { + return nil, fmt.Errorf("error parsing metadata. Details: can't parse threshold. Inner Error: %v", err) + } + meta.threshold = threshold return &meta, nil } +// getParameterFromConfig gets the parameter from the configs, if checkAuthParams is true +// then AuthParams is also check for the parameter +func getParameterFromConfig(config *ScalerConfig, parameter string, checkAuthParams bool) (string, error) { + if val, ok := config.AuthParams[parameter]; checkAuthParams && ok && val != "" { + return val, nil + } else if val, ok := config.TriggerMetadata[parameter]; ok && val != "" { + return val, nil + } else if val, ok := config.TriggerMetadata[fmt.Sprintf("%sFromEnv", parameter)]; ok && val != "" { + return config.ResolvedEnv[config.TriggerMetadata[fmt.Sprintf("%sFromEnv", parameter)]], nil + } + return "", fmt.Errorf("error parsing metadata. Details: %s was not found in metadata. Check your ScaledObject configuration", parameter) +} + // IsActive determines if we need to scale from zero func (s *azureLogAnalyticsScaler) IsActive(ctx context.Context) (bool, error) { err := s.updateCache() @@ -369,45 +358,21 @@ func (s *azureLogAnalyticsScaler) executeQuery(query string, tokenInfo tokenData if len(queryData.Tables[0].Rows[0]) > 0 { metricDataType := queryData.Tables[0].Columns[0].Type metricVal := queryData.Tables[0].Rows[0][0] - - if metricVal != nil { - //type can be: real, int, long - if metricDataType == "real" || metricDataType == "int" || metricDataType == "long" { - metricValue, isConverted := metricVal.(float64) - if !isConverted { - return metricsData{}, fmt.Errorf("error validating Log Analytics request. Details: can not convert result to type float64. HTTP code: %d. Body: %s", statusCode, string(body)) - } - if metricValue < 0 { - return metricsData{}, fmt.Errorf("error validating Log Analytics request. Details: metric value should be >=0, but received %f. HTTP code: %d. Body: %s", metricValue, statusCode, string(body)) - } - metricsInfo.value = int64(metricValue) - } else { - return metricsData{}, fmt.Errorf("error validating Log Analytics request. Details: metric value data type should be real, int or long, but received %s. HTTP code: %d Body: %s", metricDataType, statusCode, string(body)) - } + parsedMetricVal, err := parseTableValueToInt64(metricVal, metricDataType) + if err != nil { + return metricsData{}, fmt.Errorf("%s. HTTP code: %d. Body: %s", err.Error(), statusCode, string(body)) } + metricsInfo.value = parsedMetricVal } if len(queryData.Tables[0].Rows[0]) > 1 { thresholdDataType := queryData.Tables[0].Columns[1].Type thresholdVal := queryData.Tables[0].Rows[0][1] - - if thresholdVal != nil { - //type can be: real, int, long - if thresholdDataType == "real" || thresholdDataType == "int" || thresholdDataType == "long" { - thresholdValue, isConverted := thresholdVal.(float64) - if !isConverted { - return metricsData{}, fmt.Errorf("error validating Log Analytics request. Details: cannot convert threshold result to type float64. HTTP code: %d. Body: %s", statusCode, string(body)) - } - if thresholdValue < 0 { - return metricsData{}, fmt.Errorf("error validating Log Analytics request. Details: threshold value should be >=0, but received %f. HTTP code: %d. Body: %s", thresholdValue, statusCode, string(body)) - } - metricsInfo.threshold = int64(thresholdValue) - } else { - return metricsData{}, fmt.Errorf("error validating Log Analytics request. Details: threshold value data type should be real, int or long, but received %s. HTTP code: %d. Body: %s", thresholdDataType, statusCode, string(body)) - } - } else { - return metricsData{}, fmt.Errorf("error validating Log Analytics request. Details: threshold value is empty, check your query. HTTP code: %d. Body: %s", statusCode, string(body)) + parsedThresholdVal, err := parseTableValueToInt64(thresholdVal, thresholdDataType) + if err != nil { + return metricsData{}, fmt.Errorf("%s. HTTP code: %d. Body: %s", err.Error(), statusCode, string(body)) } + metricsInfo.threshold = parsedThresholdVal } else { metricsInfo.threshold = -1 } @@ -418,6 +383,24 @@ func (s *azureLogAnalyticsScaler) executeQuery(query string, tokenInfo tokenData return metricsData{}, fmt.Errorf("error processing Log Analytics request. Details: unknown error. HTTP code: %d. Body: %s", statusCode, string(body)) } +func parseTableValueToInt64(value interface{}, dataType string) (int64, error) { + if value != nil { + //type can be: real, int, long + if dataType == "real" || dataType == "int" || dataType == "long" { + convertedValue, isConverted := value.(float64) + if !isConverted { + return 0, fmt.Errorf("error validating Log Analytics request. Details: cannot convert result to type float64") + } + if convertedValue < 0 { + return 0, fmt.Errorf("error validating Log Analytics request. Details: value should be >=0, but received %f", value) + } + return int64(convertedValue), nil + } + return 0, fmt.Errorf("error validating Log Analytics request. Details: value data type should be real, int or long, but received %s", dataType) + } + return 0, fmt.Errorf("error validating Log Analytics request. Details: value is empty, check your query") +} + func (s *azureLogAnalyticsScaler) refreshAccessToken() (tokenData, error) { tokenInfo, err := s.getAuthorizationToken() diff --git a/pkg/scalers/azure_monitor_scaler.go b/pkg/scalers/azure_monitor_scaler.go index 5d667424f97..0fc7e50e100 100644 --- a/pkg/scalers/azure_monitor_scaler.go +++ b/pkg/scalers/azure_monitor_scaler.go @@ -119,33 +119,38 @@ func parseAzureMonitorMetadata(config *ScalerConfig) (*azureMonitorMetadata, err return nil, fmt.Errorf("no tenantId given") } - if config.PodIdentity == "" || config.PodIdentity == kedav1alpha1.PodIdentityProviderNone { - if config.AuthParams["activeDirectoryClientId"] != "" { - meta.azureMonitorInfo.ClientID = config.AuthParams["activeDirectoryClientId"] - } else if config.TriggerMetadata["activeDirectoryClientId"] != "" { - meta.azureMonitorInfo.ClientID = config.TriggerMetadata["activeDirectoryClientId"] - } else if config.TriggerMetadata["activeDirectoryClientIdFromEnv"] != "" { - meta.azureMonitorInfo.ClientID = config.ResolvedEnv[config.TriggerMetadata["activeDirectoryClientIdFromEnv"]] - } + clientID, clientPassword, err := parseAzurePodIdentityParams(config) + if err != nil { + return nil, err + } + meta.azureMonitorInfo.ClientID = clientID + meta.azureMonitorInfo.ClientPassword = clientPassword - if len(meta.azureMonitorInfo.ClientID) == 0 { - return nil, fmt.Errorf("no activeDirectoryClientId given") + return &meta, nil +} + +// parseAzurePodIdentityParams gets the activeDirectory clientID and password +func parseAzurePodIdentityParams(config *ScalerConfig) (clientID string, clientPassword string, err error) { + if config.PodIdentity == "" || config.PodIdentity == kedav1alpha1.PodIdentityProviderNone { + clientID, err = getParameterFromConfig(config, "activeDirectoryClientId", true) + if err != nil || clientID == "" { + return "", "", fmt.Errorf("no activeDirectoryClientId given") } if config.AuthParams["activeDirectoryClientPassword"] != "" { - meta.azureMonitorInfo.ClientPassword = config.AuthParams["activeDirectoryClientPassword"] + clientPassword = config.AuthParams["activeDirectoryClientPassword"] } else if config.TriggerMetadata["activeDirectoryClientPasswordFromEnv"] != "" { - meta.azureMonitorInfo.ClientPassword = config.ResolvedEnv[config.TriggerMetadata["activeDirectoryClientPasswordFromEnv"]] + clientPassword = config.ResolvedEnv[config.TriggerMetadata["activeDirectoryClientPasswordFromEnv"]] } - if len(meta.azureMonitorInfo.ClientPassword) == 0 { - return nil, fmt.Errorf("no activeDirectoryClientPassword given") + if len(clientPassword) == 0 { + return "", "", fmt.Errorf("no activeDirectoryClientPassword given") } } else if config.PodIdentity != kedav1alpha1.PodIdentityProviderAzure { - return nil, fmt.Errorf("azure Monitor doesn't support pod identity %s", config.PodIdentity) + return "", "", fmt.Errorf("azure Monitor doesn't support pod identity %s", config.PodIdentity) } - return &meta, nil + return clientID, clientPassword, nil } // Returns true if the Azure Monitor metric value is greater than zero From b31d4b66258b340a1fbacf5866ef530ce02698f2 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Wed, 18 Nov 2020 03:15:16 -0500 Subject: [PATCH 8/8] Resolve authenticationRef for CRD target ref (#1339) Signed-off-by: Lionel Villard --- pkg/scaling/scale_handler.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 3ce57b8a274..87a198a7267 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -349,6 +349,9 @@ func (h *scaleHandler) buildScalers(withTriggers *kedav1alpha1.WithTriggers, pod } config.AuthParams = authParams config.PodIdentity = podIdentity + } else { + authParams, _ := resolver.ResolveAuthRef(h.client, logger, trigger.AuthenticationRef, nil, withTriggers.Namespace) + config.AuthParams = authParams } scaler, err := buildScaler(trigger.Type, config)