Skip to content

Commit

Permalink
KEDA task count query should ignore k8s queue
Browse files Browse the repository at this point in the history
CeleryKubernetesExecutor lets us use both celery and kubernetes executors.
KEDA lets us scale down to zero when there are no celery tasks running.
If we have no celery tasks running, and we run a k8s task, then KEDA will
launch a worker even though there are still no celery tasks.  We can prevent
this from happening by ignoring the kubernetes queue in the KEDA query.
  • Loading branch information
dstandish committed Aug 5, 2021
1 parent 4dae4ec commit ecfa75c
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 3 deletions.
4 changes: 3 additions & 1 deletion chart/templates/workers/worker-kedaautoscaler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,7 @@ spec:
query: >-
SELECT ceil(COUNT(*)::decimal / {{ .Values.config.celery.worker_concurrency }})
FROM task_instance
WHERE state='running' OR state='queued'
WHERE (state='running' OR state='queued')
{{ $k8s_queue := default (printf "kubernetes") .Values.config.celery_kubernetes_executor.kubernetes_queue -}}
{{ eq .Values.executor "CeleryKubernetesExecutor" | ternary (printf "AND queue != '%s'" $k8s_queue) (print "") | indent 14 }}
{{- end }}
36 changes: 34 additions & 2 deletions chart/tests/test_keda.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def test_keda_enabled(self, executor, is_created):
)
def test_keda_concurrency(self, executor, concurrency):
"""
ScaledObject should only be created when set to enabled and executor is Celery or CeleryKubernetes
Verify keda sql query is uses configured concurrency
"""
docs = render_chart(
values={
Expand All @@ -73,8 +73,40 @@ def test_keda_concurrency(self, executor, concurrency):
)
expected_query = (
f"SELECT ceil(COUNT(*)::decimal / {concurrency}) "
"FROM task_instance WHERE state='running' OR state='queued'"
"FROM task_instance WHERE (state='running' OR state='queued')"
)
assert jmespath.search("spec.triggers[0].metadata.query", docs[0]) == expected_query

@parameterized.expand(
[
("CeleryExecutor", None, False),
("CeleryExecutor", 'my_queue', False),
("CeleryKubernetesExecutor", None, True),
("CeleryKubernetesExecutor", 'my_queue', True),
]
)
def test_keda_query_kubernetes_queue(self, executor, queue, should_filter):
"""
Verify keda sql query ignores kubernetes queue when CKE is used.
Sometimes a user might want to use a different queue name for k8s executor tasks,
and we also verify here that we use the configured queue name in that case.
"""
values = {
"workers": {"keda": {"enabled": True}, "persistence": {"enabled": False}},
"executor": executor,
}
if queue:
values.update({'config': {'celery_kubernetes_executor': {'kubernetes_queue': queue}}})
docs = render_chart(
values=values,
show_only=["templates/workers/worker-kedaautoscaler.yaml"],
)
expected_query = (
"SELECT ceil(COUNT(*)::decimal / 16) "
"FROM task_instance WHERE (state='running' OR state='queued')"
)
if should_filter:
expected_query += f" AND queue != '{queue or 'kubernetes'}'"
assert jmespath.search("spec.triggers[0].metadata.query", docs[0]) == expected_query

@parameterized.expand(
Expand Down
2 changes: 2 additions & 0 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1111,6 +1111,8 @@ config:
reinit_frequency: '{{ .Values.kerberos.reinitFrequency }}'
principal: '{{ .Values.kerberos.principal }}'
ccache: '{{ .Values.kerberos.ccacheMountPath }}/{{ .Values.kerberos.ccacheFileName }}'
celery_kubernetes_executor:
kubernetes_queue: 'kubernetes'
kubernetes:
namespace: '{{ .Release.Namespace }}'
airflow_configmap: '{{ include "airflow_config" . }}'
Expand Down

0 comments on commit ecfa75c

Please sign in to comment.