From e70fea9b1b9c2348c2348e821c0f95b76c422c49 Mon Sep 17 00:00:00 2001 From: Philipp Dallig Date: Tue, 31 Jan 2023 11:51:56 +0100 Subject: [PATCH] spec.executor.instances and spec.dynamicAllocation are optional. Support a flexible number of executors. For example with a mounted ConfigMap inside the Spark-Operator. Signed-off-by: Philipp Dallig --- .../SparkApplication/health.lua | 27 +++++++++++----- .../SparkApplication/health_test.yaml | 4 +++ ...thy_dynamic_alloc_without_spec_config.yaml | 31 +++++++++++++++++++ 3 files changed, 54 insertions(+), 8 deletions(-) create mode 100644 resource_customizations/sparkoperator.k8s.io/SparkApplication/testdata/healthy_dynamic_alloc_without_spec_config.yaml diff --git a/resource_customizations/sparkoperator.k8s.io/SparkApplication/health.lua b/resource_customizations/sparkoperator.k8s.io/SparkApplication/health.lua index 5a504602eb83c..07fbf3a4dc64d 100644 --- a/resource_customizations/sparkoperator.k8s.io/SparkApplication/health.lua +++ b/resource_customizations/sparkoperator.k8s.io/SparkApplication/health.lua @@ -5,10 +5,10 @@ infinity = 2^1024-1 local function executor_range_api() min_executor_instances = 0 max_executor_instances = infinity - if obj.spec.dynamicAllocation.maxExecutors then + if obj.spec.dynamicAllocation.maxExecutors then max_executor_instances = obj.spec.dynamicAllocation.maxExecutors end - if obj.spec.dynamicAllocation.minExecutors then + if obj.spec.dynamicAllocation.minExecutors then min_executor_instances = obj.spec.dynamicAllocation.minExecutors end return min_executor_instances, max_executor_instances @@ -17,7 +17,7 @@ end local function maybe_executor_range_spark_conf() min_executor_instances = 0 max_executor_instances = infinity - if obj.spec.sparkConf["spark.streaming.dynamicAllocation.enabled"] ~= nil and + if obj.spec.sparkConf["spark.streaming.dynamicAllocation.enabled"] ~= nil and obj.spec.sparkConf["spark.streaming.dynamicAllocation.enabled"] == "true" then if(obj.spec.sparkConf["spark.streaming.dynamicAllocation.maxExecutors"] ~= nil) then max_executor_instances = tonumber(obj.spec.sparkConf["spark.streaming.dynamicAllocation.maxExecutors"]) @@ -26,7 +26,7 @@ local function maybe_executor_range_spark_conf() min_executor_instances = tonumber(obj.spec.sparkConf["spark.streaming.dynamicAllocation.minExecutors"]) end return min_executor_instances, max_executor_instances - elseif obj.spec.sparkConf["spark.dynamicAllocation.enabled"] ~= nil and + elseif obj.spec.sparkConf["spark.dynamicAllocation.enabled"] ~= nil and obj.spec.sparkConf["spark.dynamicAllocation.enabled"] == "true" then if(obj.spec.sparkConf["spark.dynamicAllocation.maxExecutors"] ~= nil) then max_executor_instances = tonumber(obj.spec.sparkConf["spark.dynamicAllocation.maxExecutors"]) @@ -45,11 +45,19 @@ local function maybe_executor_range() return executor_range_api() elseif obj.spec["sparkConf"] ~= nil then return maybe_executor_range_spark_conf() - else + else return nil end end +local function dynamic_executors_without_spec_config() + if obj.spec.dynamicAllocation == nil and obj.spec.executor.instances == nil then + return true + else + return false + end +end + if obj.status ~= nil then if obj.status.applicationState.state ~= nil then if obj.status.applicationState.state == "" then @@ -60,23 +68,26 @@ if obj.status ~= nil then if obj.status.applicationState.state == "RUNNING" then if obj.status.executorState ~= nil then count=0 - executor_instances = obj.spec.executor.instances for i, executorState in pairs(obj.status.executorState) do if executorState == "RUNNING" then count=count+1 end end - if executor_instances == count then + if obj.spec.executor.instances ~= nil and obj.spec.executor.instances == count then health_status.status = "Healthy" health_status.message = "SparkApplication is Running" return health_status elseif maybe_executor_range() then min_executor_instances, max_executor_instances = maybe_executor_range() - if count >= min_executor_instances and count <= max_executor_instances then + if count >= min_executor_instances and count <= max_executor_instances then health_status.status = "Healthy" health_status.message = "SparkApplication is Running" return health_status end + elseif dynamic_executors_without_spec_config() and count >= 1 then + health_status.status = "Healthy" + health_status.message = "SparkApplication is Running" + return health_status end end end diff --git a/resource_customizations/sparkoperator.k8s.io/SparkApplication/health_test.yaml b/resource_customizations/sparkoperator.k8s.io/SparkApplication/health_test.yaml index 582b446eca324..e0ad7dfdf387d 100644 --- a/resource_customizations/sparkoperator.k8s.io/SparkApplication/health_test.yaml +++ b/resource_customizations/sparkoperator.k8s.io/SparkApplication/health_test.yaml @@ -23,3 +23,7 @@ tests: status: Healthy message: "SparkApplication is Running" inputPath: testdata/healthy_dynamic_alloc_operator_api.yaml +- healthStatus: + status: Healthy + message: "SparkApplication is Running" + inputPath: testdata/healthy_dynamic_alloc_without_spec_config.yaml diff --git a/resource_customizations/sparkoperator.k8s.io/SparkApplication/testdata/healthy_dynamic_alloc_without_spec_config.yaml b/resource_customizations/sparkoperator.k8s.io/SparkApplication/testdata/healthy_dynamic_alloc_without_spec_config.yaml new file mode 100644 index 0000000000000..a2ab7b85b5c50 --- /dev/null +++ b/resource_customizations/sparkoperator.k8s.io/SparkApplication/testdata/healthy_dynamic_alloc_without_spec_config.yaml @@ -0,0 +1,31 @@ +apiVersion: sparkoperator.k8s.io/v1beta2 +kind: SparkApplication +metadata: + generation: 4 + labels: + argocd.argoproj.io/instance: spark-job + name: spark-job-app + namespace: spark-cluster + resourceVersion: "31812990" + uid: bfee52b0-74ca-4465-8005-f6643097ed64 +spec: + executor: {} +status: + applicationState: + state: RUNNING + driverInfo: + podName: ingestion-datalake-news-app-driver + webUIAddress: 172.20.207.161:4040 + webUIPort: 4040 + webUIServiceName: ingestion-datalake-news-app-ui-svc + executionAttempts: 13 + executorState: + ingestion-datalake-news-app-1591613851251-exec-1: RUNNING + ingestion-datalake-news-app-1591613851251-exec-2: RUNNING + ingestion-datalake-news-app-1591613851251-exec-4: RUNNING + ingestion-datalake-news-app-1591613851251-exec-5: RUNNING + lastSubmissionAttemptTime: "2020-06-08T10:57:32Z" + sparkApplicationId: spark-a5920b2a5aa04d22a737c60759b5bf82 + submissionAttempts: 1 + submissionID: 3e713ec8-9f6c-4e78-ac28-749797c846f0 + terminationTime: null