Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: spec.executor.instances is Optional, Support a flexible number of executors #11877

Merged
merged 1 commit into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ infinity = 2^1024-1
local function executor_range_api()
min_executor_instances = 0
max_executor_instances = infinity
if obj.spec.dynamicAllocation.maxExecutors then
if obj.spec.dynamicAllocation.maxExecutors then
max_executor_instances = obj.spec.dynamicAllocation.maxExecutors
end
if obj.spec.dynamicAllocation.minExecutors then
if obj.spec.dynamicAllocation.minExecutors then
min_executor_instances = obj.spec.dynamicAllocation.minExecutors
end
return min_executor_instances, max_executor_instances
Expand All @@ -17,7 +17,7 @@ end
local function maybe_executor_range_spark_conf()
min_executor_instances = 0
max_executor_instances = infinity
if obj.spec.sparkConf["spark.streaming.dynamicAllocation.enabled"] ~= nil and
if obj.spec.sparkConf["spark.streaming.dynamicAllocation.enabled"] ~= nil and
obj.spec.sparkConf["spark.streaming.dynamicAllocation.enabled"] == "true" then
if(obj.spec.sparkConf["spark.streaming.dynamicAllocation.maxExecutors"] ~= nil) then
max_executor_instances = tonumber(obj.spec.sparkConf["spark.streaming.dynamicAllocation.maxExecutors"])
Expand All @@ -26,7 +26,7 @@ local function maybe_executor_range_spark_conf()
min_executor_instances = tonumber(obj.spec.sparkConf["spark.streaming.dynamicAllocation.minExecutors"])
end
return min_executor_instances, max_executor_instances
elseif obj.spec.sparkConf["spark.dynamicAllocation.enabled"] ~= nil and
elseif obj.spec.sparkConf["spark.dynamicAllocation.enabled"] ~= nil and
obj.spec.sparkConf["spark.dynamicAllocation.enabled"] == "true" then
if(obj.spec.sparkConf["spark.dynamicAllocation.maxExecutors"] ~= nil) then
max_executor_instances = tonumber(obj.spec.sparkConf["spark.dynamicAllocation.maxExecutors"])
Expand All @@ -45,11 +45,19 @@ local function maybe_executor_range()
return executor_range_api()
elseif obj.spec["sparkConf"] ~= nil then
return maybe_executor_range_spark_conf()
else
else
return nil
end
end

local function dynamic_executors_without_spec_config()
if obj.spec.dynamicAllocation == nil and obj.spec.executor.instances == nil then
return true
else
return false
end
end

if obj.status ~= nil then
if obj.status.applicationState.state ~= nil then
if obj.status.applicationState.state == "" then
Expand All @@ -60,23 +68,26 @@ if obj.status ~= nil then
if obj.status.applicationState.state == "RUNNING" then
if obj.status.executorState ~= nil then
count=0
executor_instances = obj.spec.executor.instances
for i, executorState in pairs(obj.status.executorState) do
if executorState == "RUNNING" then
count=count+1
end
end
if executor_instances == count then
if obj.spec.executor.instances ~= nil and obj.spec.executor.instances == count then
health_status.status = "Healthy"
health_status.message = "SparkApplication is Running"
return health_status
elseif maybe_executor_range() then
min_executor_instances, max_executor_instances = maybe_executor_range()
if count >= min_executor_instances and count <= max_executor_instances then
if count >= min_executor_instances and count <= max_executor_instances then
health_status.status = "Healthy"
health_status.message = "SparkApplication is Running"
return health_status
end
elseif dynamic_executors_without_spec_config() and count >= 1 then
health_status.status = "Healthy"
health_status.message = "SparkApplication is Running"
return health_status
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,7 @@ tests:
status: Healthy
message: "SparkApplication is Running"
inputPath: testdata/healthy_dynamic_alloc_operator_api.yaml
- healthStatus:
status: Healthy
message: "SparkApplication is Running"
inputPath: testdata/healthy_dynamic_alloc_without_spec_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
generation: 4
labels:
argocd.argoproj.io/instance: spark-job
name: spark-job-app
namespace: spark-cluster
resourceVersion: "31812990"
uid: bfee52b0-74ca-4465-8005-f6643097ed64
spec:
executor: {}
status:
applicationState:
state: RUNNING
driverInfo:
podName: ingestion-datalake-news-app-driver
webUIAddress: 172.20.207.161:4040
webUIPort: 4040
webUIServiceName: ingestion-datalake-news-app-ui-svc
executionAttempts: 13
executorState:
ingestion-datalake-news-app-1591613851251-exec-1: RUNNING
ingestion-datalake-news-app-1591613851251-exec-2: RUNNING
ingestion-datalake-news-app-1591613851251-exec-4: RUNNING
ingestion-datalake-news-app-1591613851251-exec-5: RUNNING
lastSubmissionAttemptTime: "2020-06-08T10:57:32Z"
sparkApplicationId: spark-a5920b2a5aa04d22a737c60759b5bf82
submissionAttempts: 1
submissionID: 3e713ec8-9f6c-4e78-ac28-749797c846f0
terminationTime: null