-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Apache Pulsar Scaler #3021
Add Apache Pulsar Scaler #3021
Conversation
Signed-off-by: Mark Mussett <mmussett@tibco.com>
Signed-off-by: Mark Mussett <mmussett@tibco.com>
Signed-off-by: Mark Mussett <mmussett@tibco.com>
Signed-off-by: Lan Liang <gcslyp@gmail.com>
…gger metadata Signed-off-by: Lan Liang <gcslyp@gmail.com>
e2e CI failed and this is error:
Whant can i do for it and how can i check it myself, thanks! |
hey @liangyuanpeng , |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for this new scaler ❤️ ❤️ ❤️
I have left some comments inline
pkg/scalers/pulsar_scaler.go
Outdated
func (s *pulsarScaler) GetStats() (*pulsarStats, error) { | ||
stats := new(pulsarStats) | ||
|
||
topic := strings.ReplaceAll(s.metadata.topic, "persistent://", "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need the full name topic in any case? I mean, inside the scaler the only usage I have seen is this and we replace the given value every time. Maybe if the scaler requests the topic with the prefix "persistent://" (because it's the common language in pulsar for instance) , maybe we can parse in during parsePulsarMetadata
to do it only once.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you absolutely right, we just need to parse it once.
pkg/scalers/pulsar_scaler.go
Outdated
return nil, fmt.Errorf("error parsing pulsar metadata: %s", err) | ||
} | ||
|
||
client := &http.Client{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, use the internal function to generate the httpClient, we use it to unify all the configurations. You can find an example here
pkg/scalers/pulsar_scaler.go
Outdated
topic := strings.ReplaceAll(s.metadata.topic, "persistent://", "") | ||
statsURL := s.metadata.adminURL + "/admin/v2/persistent/" + topic + "/stats" | ||
|
||
req, err := http.NewRequest("GET", statsURL, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, propagate the context and use it here
pkg/scalers/pulsar_scaler.go
Outdated
} | ||
|
||
// IsActive determines if we need to scale from zero | ||
func (s *pulsarScaler) IsActive(_ context.Context) (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, use this context and propagate it to http request using http.NewRequestWithContext
pkg/scalers/pulsar_scaler.go
Outdated
} | ||
|
||
// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric | ||
func (s *pulsarScaler) GetMetrics(_ context.Context, metricName string, _ labels.Selector) ([]external_metrics.ExternalMetricValue, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, use this context and propagate it to http request using http.NewRequestWithContext
pkg/scalers/pulsar_scaler.go
Outdated
targetMetricValue := resource.NewQuantity(s.metadata.msgBacklogThreshold, resource.DecimalSI) | ||
externalMetric := &v2beta2.ExternalMetricSource{ | ||
Metric: v2beta2.MetricIdentifier{ | ||
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s", "pulsar", s.metadata.topic, s.metadata.subscription)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For names, we use the internal function that ensures the metric name is unique inside the scaler, you can find an example here. The whole process is defined in the docs about GetMetricsSpecForScaling
Signed-off-by: Lan Liang <gcslyp@gmail.com>
…lete unuse log Signed-off-by: Lan Liang <gcslyp@gmail.com>
I have not experience for ts, but i'm really working for pulsar scaler e2e test. Will push it on this PR or next PR. |
We prefer to merge together new scalers with the e2e tests but no worries, no rush with the PR, next release will be in 3 months 😄 |
The changes look good, only some nits (apart from e2e tests):
Thanks a lot! |
@liangyuanpeng we are migrating e2e tests to Go, you can see this inital PR: #3079 Once that PR is merged you can start writing e2e tests based on that and not deal with TS. |
@zroubalik Thanks, Actually i'm finished the e2e test, but i need more time to ready for this PR. I'm happy of i can write e2e test with go , so i will working when the #3079 is merged. |
@liangyuanpeng the initial PR with Golang based e2e tests has been merged, feel free to use it as a base for Pulsar e2e tests. |
I will work for this on this week. |
@liangyuanpeng Any thought if we can close this PR soon? We are planning to release in 2 weeks. |
@tomkerkhove Sorry for my late, I am busy with my job bu i still have interested int finish this PR and I will finish the e2e test asap. :) |
Signed-off-by: Lan Liang <gcslyp@gmail.com>
Signed-off-by: Lan Liang <gcslyp@gmail.com>
@liangyuanpeng could you please add activation property as part of this PR, we have started to adding this existing scalers as you can see in these PRs: |
Absolutely, push it today. |
Signed-off-by: Lan Liang <gcslyp@gmail.com>
@liangyuanpeng could you please fixt he problems found in the Static Checks? https://github.com/kedacore/keda/runs/7393791519?check_suite_focus=true |
Signed-off-by: Lan Liang <gcslyp@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please rebase your branch and fix the outstanding issues?
tests/scalers_go/pulsar/pulsar_test.go:222:17: undeclared name: `WaitForDeploymentReplicaCount` (typecheck)
assert.True(t, WaitForDeploymentReplicaCount(t, kc, consumerDeploymentName, testNamespace, 0, 60, 1),
^
tests/scalers_go/pulsar/pulsar_test.go:249:17: undeclared name: `WaitForDeploymentReplicaCount` (typecheck)
assert.True(t, WaitForDeploymentReplicaCount(t, kc, consumerDeploymentName, testNamespace, 5, 300, 1),
^
tests/scalers_go/pulsar/pulsar_test.go:256:17: undeclared name: `WaitForDeploymentReplicaCount` (typecheck)
assert.True(t, WaitForDeploymentReplicaCount(t, kc, consumerDeploymentName, testNamespace, 0, 300, 1),
-> replace WaitForDeploymentReplicaCount()
with WaitForDeploymentReplicaReadyCount()
.
pkg/scalers/kafka_scaler.go:132:13: string `enable` has 3 occurrences, make it a constant (goconst)
if val == "enable" {
-> make it constant
pkg/scalers/pulsar_scaler.go
Outdated
if val, ok := config.TriggerMetadata["tls"]; ok { | ||
val = strings.TrimSpace(val) | ||
|
||
if val == "enable" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this constant pls, so it doesn't fail the static check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this constant pls, so it doesn't fail the static check
Seems like the CI message is talking about kafka_scaler
and this PR is working for pulsar_scaler
, should i change it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just a confusing warning message, it is caused by the "enabled"
added in this new code in pulsar scaler.
…const Signed-off-by: Lan Liang <gcslyp@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey,
Thanks for this awesome improvement! ❤️
I have left some comments inline and I'd like to request also to update the changelog adding an entry for this scaler. You can see the conventions for the changelog here
pkg/scalers/pulsar_scaler.go
Outdated
ca string | ||
|
||
statsURL string | ||
activationTargetQueryValue int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that this variable is for the activation feature, please use the same name as the target/threashold value with the prefix activation
, I think that in this case it would be activationMsgBacklogThreshold
pkg/scalers/pulsar_scaler.go
Outdated
} | ||
|
||
meta.activationTargetQueryValue = 0 | ||
if val, ok := config.TriggerMetadata["activationTargetQueryValue"]; ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the target value is msgBacklog
, based on the convention defined, the key should be activationMsgBacklog
pkg/scalers/pulsar_scaler.go
Outdated
} | ||
return stats, nil | ||
case 404: | ||
return nil, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we return error here?
pkg/scalers/pulsar_scaler.go
Outdated
} | ||
|
||
if !found { | ||
return nil, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have to return an error here, otherwise we will have a panic in the code which calls here
pkg/scalers/pulsar_scaler.go
Outdated
metric := external_metrics.ExternalMetricValue{ | ||
MetricName: metricName, | ||
Value: *resource.NewQuantity(msgBacklog, resource.DecimalSI), | ||
Timestamp: metav1.Now(), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have a helper to generate this, it's the func GenerateMetricMili
, you can see a sample here
pkg/scalers/pulsar_scaler.go
Outdated
func (s *pulsarScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec { | ||
targetMetricValue := resource.NewQuantity(s.metadata.msgBacklogThreshold, resource.DecimalSI) | ||
|
||
metricName := fmt.Sprintf("%s-%s-%s", "pulsar", s.metadata.topic, s.metadata.subscription) | ||
|
||
externalMetric := &v2beta2.ExternalMetricSource{ | ||
Metric: v2beta2.MetricIdentifier{ | ||
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(metricName)), | ||
}, | ||
Target: v2beta2.MetricTarget{ | ||
Type: v2beta2.AverageValueMetricType, | ||
AverageValue: targetMetricValue, | ||
}, | ||
} | ||
metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: pulsarMetricType} | ||
return []v2beta2.MetricSpec{metricSpec} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some considerations here:
- Use the function
GetMetricTarget
to generate the target section (a sample here) - I'd move the
metricName
generation to the parsing section to avoid parsing it every time
"replica count should be 0 after a minute") | ||
|
||
testActivation(t, kc, data) | ||
KubectlDeleteWithTemplate(t, data, "publishJobTemplate", publishJobTemplate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd move this inside testActivation
func because it's related with cleaning up the activation test
Simplify the logic. Co-authored-by: Jorge Turrado <jorge_turrado@hotmail.es> Co-authored-by: Jorge Turrado Ferrero <Jorge_turrado@hotmail.es>
…d add some error to return Signed-off-by: Lan Liang <gcslyp@gmail.com>
Signed-off-by: Lan Liang <gcslyp@gmail.com>
Signed-off-by: Lan Liang <gcslyp@gmail.com>
/run-e2e pulsar* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good!
Only one more thing, could you add an entry in the changelog?
@liangyuanpeng any update on this please? We would like to complete this by the end of the week. So we can ship a new release the next one. |
@liangyuanpeng any updates please? we would need to merge it today in order to get it into the next release, if not that we will target 2.9. |
/run-e2e pulsar* |
Provide a description of what has been changed
Checklist
Fixes #853
This PR is base on #1666
doc PR --> kedacore/keda-docs#761