Skip to content

Commit

Permalink
fix(health): spec.executor.instances is Optional, Support a flexible…
Browse files Browse the repository at this point in the history
… number of executors (argoproj#11877)

Support a flexible number of executors. For example with a mounted ConfigMap inside the Spark-Operator.

Signed-off-by: Philipp Dallig <philipp.dallig@gmail.com>
Signed-off-by: Jimmy Neville <jimmyeneville@gmail.com>
  • Loading branch information
Reamer authored and Jneville0815 committed Sep 30, 2023
1 parent 514776d commit f230b2e
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ infinity = 2^1024-1
local function executor_range_api()
min_executor_instances = 0
max_executor_instances = infinity
if obj.spec.dynamicAllocation.maxExecutors then
if obj.spec.dynamicAllocation.maxExecutors then
max_executor_instances = obj.spec.dynamicAllocation.maxExecutors
end
if obj.spec.dynamicAllocation.minExecutors then
if obj.spec.dynamicAllocation.minExecutors then
min_executor_instances = obj.spec.dynamicAllocation.minExecutors
end
return min_executor_instances, max_executor_instances
Expand All @@ -17,7 +17,7 @@ end
local function maybe_executor_range_spark_conf()
min_executor_instances = 0
max_executor_instances = infinity
if obj.spec.sparkConf["spark.streaming.dynamicAllocation.enabled"] ~= nil and
if obj.spec.sparkConf["spark.streaming.dynamicAllocation.enabled"] ~= nil and
obj.spec.sparkConf["spark.streaming.dynamicAllocation.enabled"] == "true" then
if(obj.spec.sparkConf["spark.streaming.dynamicAllocation.maxExecutors"] ~= nil) then
max_executor_instances = tonumber(obj.spec.sparkConf["spark.streaming.dynamicAllocation.maxExecutors"])
Expand All @@ -26,7 +26,7 @@ local function maybe_executor_range_spark_conf()
min_executor_instances = tonumber(obj.spec.sparkConf["spark.streaming.dynamicAllocation.minExecutors"])
end
return min_executor_instances, max_executor_instances
elseif obj.spec.sparkConf["spark.dynamicAllocation.enabled"] ~= nil and
elseif obj.spec.sparkConf["spark.dynamicAllocation.enabled"] ~= nil and
obj.spec.sparkConf["spark.dynamicAllocation.enabled"] == "true" then
if(obj.spec.sparkConf["spark.dynamicAllocation.maxExecutors"] ~= nil) then
max_executor_instances = tonumber(obj.spec.sparkConf["spark.dynamicAllocation.maxExecutors"])
Expand All @@ -45,11 +45,19 @@ local function maybe_executor_range()
return executor_range_api()
elseif obj.spec["sparkConf"] ~= nil then
return maybe_executor_range_spark_conf()
else
else
return nil
end
end

local function dynamic_executors_without_spec_config()
if obj.spec.dynamicAllocation == nil and obj.spec.executor.instances == nil then
return true
else
return false
end
end

if obj.status ~= nil then
if obj.status.applicationState.state ~= nil then
if obj.status.applicationState.state == "" then
Expand All @@ -60,23 +68,26 @@ if obj.status ~= nil then
if obj.status.applicationState.state == "RUNNING" then
if obj.status.executorState ~= nil then
count=0
executor_instances = obj.spec.executor.instances
for i, executorState in pairs(obj.status.executorState) do
if executorState == "RUNNING" then
count=count+1
end
end
if executor_instances == count then
if obj.spec.executor.instances ~= nil and obj.spec.executor.instances == count then
health_status.status = "Healthy"
health_status.message = "SparkApplication is Running"
return health_status
elseif maybe_executor_range() then
min_executor_instances, max_executor_instances = maybe_executor_range()
if count >= min_executor_instances and count <= max_executor_instances then
if count >= min_executor_instances and count <= max_executor_instances then
health_status.status = "Healthy"
health_status.message = "SparkApplication is Running"
return health_status
end
elseif dynamic_executors_without_spec_config() and count >= 1 then
health_status.status = "Healthy"
health_status.message = "SparkApplication is Running"
return health_status
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,7 @@ tests:
status: Healthy
message: "SparkApplication is Running"
inputPath: testdata/healthy_dynamic_alloc_operator_api.yaml
- healthStatus:
status: Healthy
message: "SparkApplication is Running"
inputPath: testdata/healthy_dynamic_alloc_without_spec_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
generation: 4
labels:
argocd.argoproj.io/instance: spark-job
name: spark-job-app
namespace: spark-cluster
resourceVersion: "31812990"
uid: bfee52b0-74ca-4465-8005-f6643097ed64
spec:
executor: {}
status:
applicationState:
state: RUNNING
driverInfo:
podName: ingestion-datalake-news-app-driver
webUIAddress: 172.20.207.161:4040
webUIPort: 4040
webUIServiceName: ingestion-datalake-news-app-ui-svc
executionAttempts: 13
executorState:
ingestion-datalake-news-app-1591613851251-exec-1: RUNNING
ingestion-datalake-news-app-1591613851251-exec-2: RUNNING
ingestion-datalake-news-app-1591613851251-exec-4: RUNNING
ingestion-datalake-news-app-1591613851251-exec-5: RUNNING
lastSubmissionAttemptTime: "2020-06-08T10:57:32Z"
sparkApplicationId: spark-a5920b2a5aa04d22a737c60759b5bf82
submissionAttempts: 1
submissionID: 3e713ec8-9f6c-4e78-ac28-749797c846f0
terminationTime: null

0 comments on commit f230b2e

Please sign in to comment.