Skip to content

Commit

Permalink
fix(health): Handling SparkApplication CRD health status if dynamic a…
Browse files Browse the repository at this point in the history
…llocation is enabled (#7557) (#11522)

Signed-off-by: Yevgeniy Fridland <yevg.mord@gmail.com>
Co-authored-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com>
  • Loading branch information
eugen-fried and crenshaw-dev authored Jan 27, 2023
1 parent a825aad commit eaac2c6
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,4 +1,55 @@
health_status = {}
-- Can't use standard lib, math.huge equivalent
infinity = 2^1024-1

local function executor_range_api()
min_executor_instances = 0
max_executor_instances = infinity
if obj.spec.dynamicAllocation.maxExecutors then
max_executor_instances = obj.spec.dynamicAllocation.maxExecutors
end
if obj.spec.dynamicAllocation.minExecutors then
min_executor_instances = obj.spec.dynamicAllocation.minExecutors
end
return min_executor_instances, max_executor_instances
end

local function maybe_executor_range_spark_conf()
min_executor_instances = 0
max_executor_instances = infinity
if obj.spec.sparkConf["spark.streaming.dynamicAllocation.enabled"] ~= nil and
obj.spec.sparkConf["spark.streaming.dynamicAllocation.enabled"] == "true" then
if(obj.spec.sparkConf["spark.streaming.dynamicAllocation.maxExecutors"] ~= nil) then
max_executor_instances = tonumber(obj.spec.sparkConf["spark.streaming.dynamicAllocation.maxExecutors"])
end
if(obj.spec.sparkConf["spark.streaming.dynamicAllocation.minExecutors"] ~= nil) then
min_executor_instances = tonumber(obj.spec.sparkConf["spark.streaming.dynamicAllocation.minExecutors"])
end
return min_executor_instances, max_executor_instances
elseif obj.spec.sparkConf["spark.dynamicAllocation.enabled"] ~= nil and
obj.spec.sparkConf["spark.dynamicAllocation.enabled"] == "true" then
if(obj.spec.sparkConf["spark.dynamicAllocation.maxExecutors"] ~= nil) then
max_executor_instances = tonumber(obj.spec.sparkConf["spark.dynamicAllocation.maxExecutors"])
end
if(obj.spec.sparkConf["spark.dynamicAllocation.minExecutors"] ~= nil) then
min_executor_instances = tonumber(obj.spec.sparkConf["spark.dynamicAllocation.minExecutors"])
end
return min_executor_instances, max_executor_instances
else
return nil
end
end

local function maybe_executor_range()
if obj.spec["dynamicAllocation"] and obj.spec.dynamicAllocation.enabled then
return executor_range_api()
elseif obj.spec["sparkConf"] ~= nil then
return maybe_executor_range_spark_conf()
else
return nil
end
end

if obj.status ~= nil then
if obj.status.applicationState.state ~= nil then
if obj.status.applicationState.state == "" then
Expand All @@ -19,6 +70,13 @@ if obj.status ~= nil then
health_status.status = "Healthy"
health_status.message = "SparkApplication is Running"
return health_status
elseif maybe_executor_range() then
min_executor_instances, max_executor_instances = maybe_executor_range()
if count >= min_executor_instances and count <= max_executor_instances then
health_status.status = "Healthy"
health_status.message = "SparkApplication is Running"
return health_status
end
end
end
end
Expand Down Expand Up @@ -72,4 +130,4 @@ if obj.status ~= nil then
end
health_status.status = "Progressing"
health_status.message = "Waiting for Executor pods"
return health_status
return health_status
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,15 @@ tests:
status: Healthy
message: "SparkApplication is Running"
inputPath: testdata/healthy.yaml
- healthStatus:
status: Healthy
message: "SparkApplication is Running"
inputPath: testdata/healthy_dynamic_alloc.yaml
- healthStatus:
status: Healthy
message: "SparkApplication is Running"
inputPath: testdata/healthy_dynamic_alloc_dstream.yaml
- healthStatus:
status: Healthy
message: "SparkApplication is Running"
inputPath: testdata/healthy_dynamic_alloc_operator_api.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
generation: 4
labels:
argocd.argoproj.io/instance: spark-job
name: spark-job-app
namespace: spark-cluster
resourceVersion: "31812990"
uid: bfee52b0-74ca-4465-8005-f6643097ed64
spec:
executor:
instances: 4
sparkConf:
spark.dynamicAllocation.enabled: 'true'
spark.dynamicAllocation.maxExecutors: '10'
spark.dynamicAllocation.minExecutors: '2'
status:
applicationState:
state: RUNNING
driverInfo:
podName: ingestion-datalake-news-app-driver
webUIAddress: 172.20.207.161:4040
webUIPort: 4040
webUIServiceName: ingestion-datalake-news-app-ui-svc
executionAttempts: 13
executorState:
ingestion-datalake-news-app-1591613851251-exec-1: RUNNING
ingestion-datalake-news-app-1591613851251-exec-2: RUNNING
ingestion-datalake-news-app-1591613851251-exec-4: RUNNING
ingestion-datalake-news-app-1591613851251-exec-5: RUNNING
ingestion-datalake-news-app-1591613851251-exec-6: RUNNING
lastSubmissionAttemptTime: "2020-06-08T10:57:32Z"
sparkApplicationId: spark-a5920b2a5aa04d22a737c60759b5bf82
submissionAttempts: 1
submissionID: 3e713ec8-9f6c-4e78-ac28-749797c846f0
terminationTime: null
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
generation: 4
labels:
argocd.argoproj.io/instance: spark-job
name: spark-job-app
namespace: spark-cluster
resourceVersion: "31812990"
uid: bfee52b0-74ca-4465-8005-f6643097ed64
spec:
executor:
instances: 4
sparkConf:
spark.streaming.dynamicAllocation.enabled: 'true'
spark.streaming.dynamicAllocation.maxExecutors: '10'
spark.streaming.dynamicAllocation.minExecutors: '2'
status:
applicationState:
state: RUNNING
driverInfo:
podName: ingestion-datalake-news-app-driver
webUIAddress: 172.20.207.161:4040
webUIPort: 4040
webUIServiceName: ingestion-datalake-news-app-ui-svc
executionAttempts: 13
executorState:
ingestion-datalake-news-app-1591613851251-exec-1: RUNNING
ingestion-datalake-news-app-1591613851251-exec-4: RUNNING
ingestion-datalake-news-app-1591613851251-exec-6: RUNNING
lastSubmissionAttemptTime: "2020-06-08T10:57:32Z"
sparkApplicationId: spark-a5920b2a5aa04d22a737c60759b5bf82
submissionAttempts: 1
submissionID: 3e713ec8-9f6c-4e78-ac28-749797c846f0
terminationTime: null
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
generation: 4
labels:
argocd.argoproj.io/instance: spark-job
name: spark-job-app
namespace: spark-cluster
resourceVersion: "31812990"
uid: bfee52b0-74ca-4465-8005-f6643097ed64
spec:
executor:
instances: 4
dynamicAllocation:
enabled: true
initialExecutors: 2
minExecutors: 2
maxExecutors: 10
status:
applicationState:
state: RUNNING
driverInfo:
podName: ingestion-datalake-news-app-driver
webUIAddress: 172.20.207.161:4040
webUIPort: 4040
webUIServiceName: ingestion-datalake-news-app-ui-svc
executionAttempts: 13
executorState:
ingestion-datalake-news-app-1591613851251-exec-1: RUNNING
ingestion-datalake-news-app-1591613851251-exec-2: RUNNING
ingestion-datalake-news-app-1591613851251-exec-4: RUNNING
ingestion-datalake-news-app-1591613851251-exec-5: RUNNING
ingestion-datalake-news-app-1591613851251-exec-6: RUNNING
lastSubmissionAttemptTime: "2020-06-08T10:57:32Z"
sparkApplicationId: spark-a5920b2a5aa04d22a737c60759b5bf82
submissionAttempts: 1
submissionID: 3e713ec8-9f6c-4e78-ac28-749797c846f0
terminationTime: null

0 comments on commit eaac2c6

Please sign in to comment.