From 4b52ee424a127641bc9c9a256e4c7c9e75198347 Mon Sep 17 00:00:00 2001 From: yanghua Date: Thu, 2 Mar 2023 16:14:25 +0800 Subject: [PATCH] [Addon #579] Refactor the spark-workload parameter definition and add spark-py example Signed-off-by: yanghua --- .../sparkapp-py.yaml | 49 +++++++++++++++++++ .../spark-kubernetes-operator/sparkapp.yaml | 10 ++++ .../definitions/spark-workload.cue | 49 ++++++++++++++++++- 3 files changed, 106 insertions(+), 2 deletions(-) create mode 100644 examples/spark-kubernetes-operator/sparkapp-py.yaml diff --git a/examples/spark-kubernetes-operator/sparkapp-py.yaml b/examples/spark-kubernetes-operator/sparkapp-py.yaml new file mode 100644 index 000000000..1731b23ea --- /dev/null +++ b/examples/spark-kubernetes-operator/sparkapp-py.yaml @@ -0,0 +1,49 @@ +apiVersion: core.oam.dev/v1beta1 +kind: Application +metadata: + name: spark-app-v1 + namespace: spark-cluster +spec: + components: + - name: spark-workload-component + type: spark-workload + properties: + name: my-spark-py-app + namespace: spark-cluster + type: Python + pythonVersion: "3" + mode: cluster + image: "gcr.io/spark-operator/spark-py:v3.1.1" + imagePullPolicy: Always + mainClass: org.apache.spark.examples.streaming.JavaQueueStream + mainApplicationFile: "local:///opt/spark/examples/src/main/python/pi.py" + sparkVersion: "3.1.1" + restartPolicy: + type: OnFailure + onFailureRetries: 3 + onFailureRetryInterval: 10 + onSubmissionFailureRetries: 5 + onSubmissionFailureRetryInterval: 20 + volumes: + - name: "test-volume" + hostPath: + path: "/tmp" + type: Directory + driver: + cores: 1 + coreLimit: "1200m" + memory: "1024m" + labels: + version: 3.1.1 + volumeMounts: + - name: "test-volume" + mountPath: "/tmp" + executor: + cores: 1 + instances: 1 + memory: "1024m" + labels: + version: 3.1.1 + volumeMounts: + - name: "test-volume" + mountPath: "/tmp" diff --git a/examples/spark-kubernetes-operator/sparkapp.yaml b/examples/spark-kubernetes-operator/sparkapp.yaml index 34966ac44..920b68d1c 100644 --- a/examples/spark-kubernetes-operator/sparkapp.yaml +++ b/examples/spark-kubernetes-operator/sparkapp.yaml @@ -17,6 +17,8 @@ spec: mainClass: org.apache.spark.examples.streaming.JavaQueueStream mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar" sparkVersion: "3.1.1" + restartPolicy: + type: Never volumes: - name: "test-volume" hostPath: @@ -24,11 +26,19 @@ spec: type: Directory driver: cores: 1 + coreLimit: "1200m" + memory: "1024m" + labels: + version: 3.1.1 volumeMounts: - name: "test-volume" mountPath: "/tmp" executor: cores: 1 + instances: 1 + memory: "1024m" + labels: + version: 3.1.1 volumeMounts: - name: "test-volume" mountPath: "/tmp" diff --git a/experimental/addons/spark-kubernetes-operator/definitions/spark-workload.cue b/experimental/addons/spark-kubernetes-operator/definitions/spark-workload.cue index f7959eb89..b3efe9b04 100644 --- a/experimental/addons/spark-kubernetes-operator/definitions/spark-workload.cue +++ b/experimental/addons/spark-kubernetes-operator/definitions/spark-workload.cue @@ -28,9 +28,30 @@ template: { mainApplicationFile: string // +usage=Specify the version of Spark the application uses sparkVersion: string + // +usage=Specify the policy on if and in which conditions the controller should restart an application + restartPolicy?: { + // +usage=Type value option: "Always", "Never", "OnFailure" + type: string + // +usage=Specify the number of times to retry submitting an application before giving up. This is best effort and actual retry attempts can be >= the value specified due to caching. These are required if RestartPolicy is OnFailure + onSubmissionFailureRetries?: int + // +usage=Specify the number of times to retry running an application before giving up + onFailureRetries?: int + // +usage=Specify the interval in seconds between retries on failed submissions + onSubmissionFailureRetryInterval?: int + // +usage=Specify the interval in seconds between retries on failed runs + onFailureRetryInterval?: int + } // +usage=Specify the driver sepc request for the driver pod driver: { - cores: int + // +usage=Specify the cores maps to spark.driver.cores or spark.executor.cores for the driver and executors, respectively + cores?: int + // +usage=Specify a hard limit on CPU cores for the pod + coreLimit?: string + // +usage=Specify the amount of memory to request for the pod + memory?: string + // +usage=Specify the Kubernetes labels to be added to the pod + labels?: [string]: string + // +usage=Specify the volumes listed in “.spec.volumes” to mount into the main container’s filesystem volumeMounts?: [...{ name: string mountPath: string @@ -38,7 +59,16 @@ template: { } // +usage=Specify the executor spec request for the executor pod executor: { - cores: int + // +usage=Specify the cores maps to spark.driver.cores or spark.executor.cores for the driver and executors, respectively + cores?: int + // +usage=Specify a hard limit on CPU cores for the pod + coreLimit?: string + // +usage=Specify the amount of memory to request for the pod + memory?: string + instances?: int + // +usage=Specify the Kubernetes labels to be added to the pod + labels?: [string]: string + // +usage=Specify the volumes listed in “.spec.volumes” to mount into the main container’s filesystem volumeMounts?: [...{ name: string mountPath: string @@ -62,6 +92,21 @@ template: { type: *"Directory" | string } }] + // +usage=Specify the dependencies captures all possible types of dependencies of a Spark application + deps?: { + // +usage=Specify a list of JAR files the Spark application depends on + jars?: [...string] + // +usage=Specify a list of files the Spark application depends on + files?: [...string] + // +usage=Specify a list of Python files the Spark application depends on + pyFiles?: [...string] + // +usage=Specify a list of maven coordinates of jars to include on the driver and executor classpaths. This will search the local maven repo, then maven central and any additional remote repositories given by the “repositories” option. Each package should be of the form “groupId:artifactId:version” + packages?: [...string] + // +usage=Specify a list of “groupId:artifactId”, to exclude while resolving the dependencies provided in Packages to avoid dependency conflicts + excludePackages?: [...string] + // +usage=Specify a list of additional remote repositories to search for the maven coordinate given with the “packages” option + repositories?: [...string] + } } output: {