diff --git a/api/v1beta2/sparkapplication_types.go b/api/v1beta2/sparkapplication_types.go
index c56891187..7bf80ab87 100644
--- a/api/v1beta2/sparkapplication_types.go
+++ b/api/v1beta2/sparkapplication_types.go
@@ -19,6 +19,7 @@ package v1beta2
import (
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
+ policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@@ -549,6 +550,9 @@ type DriverSpec struct {
// PriorityClassName is the name of the PriorityClass for the driver pod.
// +optional
PriorityClassName *string `json:"priorityClassName,omitempty"`
+ // PodDisruptionBudgetSpec is the PodDisruptionBudget specification for the Spark Driver.
+ // +optional
+ PodDisruptionBudgetSpec *policyv1.PodDisruptionBudgetSpec `json:"podDisruptionBudgetSpec,omitempty"`
}
// ExecutorSpec is specification of the executor.
@@ -579,6 +583,9 @@ type ExecutorSpec struct {
// PriorityClassName is the name of the PriorityClass for the executor pod.
// +optional
PriorityClassName *string `json:"priorityClassName,omitempty"`
+ // PodDisruptionBudgetSpec is the PodDisruptionBudget specification for the Spark Executors.
+ // +optional
+ PodDisruptionBudgetSpec *policyv1.PodDisruptionBudgetSpec `json:"podDisruptionBudgetSpec,omitempty"`
}
// NamePath is a pair of a name and a path to which the named objects should be mounted to.
diff --git a/api/v1beta2/zz_generated.deepcopy.go b/api/v1beta2/zz_generated.deepcopy.go
index 635e19af5..9c0901b6c 100644
--- a/api/v1beta2/zz_generated.deepcopy.go
+++ b/api/v1beta2/zz_generated.deepcopy.go
@@ -23,6 +23,7 @@ package v1beta2
import (
"k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
+ policyv1 "k8s.io/api/policy/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
)
@@ -249,6 +250,11 @@ func (in *DriverSpec) DeepCopyInto(out *DriverSpec) {
*out = new(string)
**out = **in
}
+ if in.PodDisruptionBudgetSpec != nil {
+ in, out := &in.PodDisruptionBudgetSpec, &out.PodDisruptionBudgetSpec
+ *out = new(policyv1.PodDisruptionBudgetSpec)
+ (*in).DeepCopyInto(*out)
+ }
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DriverSpec.
@@ -335,6 +341,11 @@ func (in *ExecutorSpec) DeepCopyInto(out *ExecutorSpec) {
*out = new(string)
**out = **in
}
+ if in.PodDisruptionBudgetSpec != nil {
+ in, out := &in.PodDisruptionBudgetSpec, &out.PodDisruptionBudgetSpec
+ *out = new(policyv1.PodDisruptionBudgetSpec)
+ (*in).DeepCopyInto(*out)
+ }
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExecutorSpec.
diff --git a/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml b/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml
index 7aa9c4af2..957e7b4f8 100644
--- a/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml
+++ b/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml
@@ -2981,6 +2981,112 @@ spec:
NodeSelector is the Kubernetes node selector to be added to the driver and executor pods.
This field is mutually exclusive with nodeSelector at SparkApplication level (which will be deprecated).
type: object
+ podDisruptionBudgetSpec:
+ description: PodDisruptionBudgetSpec is the PodDisruptionBudget
+ specification for the Spark Driver.
+ properties:
+ maxUnavailable:
+ anyOf:
+ - type: integer
+ - type: string
+ description: |-
+ An eviction is allowed if at most "maxUnavailable" pods selected by
+ "selector" are unavailable after the eviction, i.e. even in absence of
+ the evicted pod. For example, one can prevent all voluntary evictions
+ by specifying 0. This is a mutually exclusive setting with "minAvailable".
+ x-kubernetes-int-or-string: true
+ minAvailable:
+ anyOf:
+ - type: integer
+ - type: string
+ description: |-
+ An eviction is allowed if at least "minAvailable" pods selected by
+ "selector" will still be available after the eviction, i.e. even in the
+ absence of the evicted pod. So for example you can prevent all voluntary
+ evictions by specifying "100%".
+ x-kubernetes-int-or-string: true
+ selector:
+ description: |-
+ Label query over pods whose evictions are managed by the disruption
+ budget.
+ A null selector will match no pods, while an empty ({}) selector will select
+ all pods within the namespace.
+ properties:
+ matchExpressions:
+ description: matchExpressions is a list of label selector
+ requirements. The requirements are ANDed.
+ items:
+ description: |-
+ A label selector requirement is a selector that contains values, a key, and an operator that
+ relates the key and values.
+ properties:
+ key:
+ description: key is the label key that the selector
+ applies to.
+ type: string
+ operator:
+ description: |-
+ operator represents a key's relationship to a set of values.
+ Valid operators are In, NotIn, Exists and DoesNotExist.
+ type: string
+ values:
+ description: |-
+ values is an array of string values. If the operator is In or NotIn,
+ the values array must be non-empty. If the operator is Exists or DoesNotExist,
+ the values array must be empty. This array is replaced during a strategic
+ merge patch.
+ items:
+ type: string
+ type: array
+ required:
+ - key
+ - operator
+ type: object
+ type: array
+ matchLabels:
+ additionalProperties:
+ type: string
+ description: |-
+ matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels
+ map is equivalent to an element of matchExpressions, whose key field is "key", the
+ operator is "In", and the values array contains only "value". The requirements are ANDed.
+ type: object
+ type: object
+ x-kubernetes-map-type: atomic
+ unhealthyPodEvictionPolicy:
+ description: |-
+ UnhealthyPodEvictionPolicy defines the criteria for when unhealthy pods
+ should be considered for eviction. Current implementation considers healthy pods,
+ as pods that have status.conditions item with type="Ready",status="True".
+
+
+ Valid policies are IfHealthyBudget and AlwaysAllow.
+ If no policy is specified, the default behavior will be used,
+ which corresponds to the IfHealthyBudget policy.
+
+
+ IfHealthyBudget policy means that running pods (status.phase="Running"),
+ but not yet healthy can be evicted only if the guarded application is not
+ disrupted (status.currentHealthy is at least equal to status.desiredHealthy).
+ Healthy pods will be subject to the PDB for eviction.
+
+
+ AlwaysAllow policy means that all running pods (status.phase="Running"),
+ but not yet healthy are considered disrupted and can be evicted regardless
+ of whether the criteria in a PDB is met. This means perspective running
+ pods of a disrupted application might not get a chance to become healthy.
+ Healthy pods will be subject to the PDB for eviction.
+
+
+ Additional policies may be added in the future.
+ Clients making eviction decisions should disallow eviction of unhealthy pods
+ if they encounter an unrecognized policy in this field.
+
+
+ This field is beta-level. The eviction API uses this field when
+ the feature gate PDBUnhealthyPodEvictionPolicy is enabled (enabled by default).
+ type: string
+ type: object
podName:
description: |-
PodName is the name of the driver pod that the user creates. This is used for the
@@ -7767,6 +7873,112 @@ spec:
NodeSelector is the Kubernetes node selector to be added to the driver and executor pods.
This field is mutually exclusive with nodeSelector at SparkApplication level (which will be deprecated).
type: object
+ podDisruptionBudgetSpec:
+ description: PodDisruptionBudgetSpec is the PodDisruptionBudget
+ specification for the Spark Executors.
+ properties:
+ maxUnavailable:
+ anyOf:
+ - type: integer
+ - type: string
+ description: |-
+ An eviction is allowed if at most "maxUnavailable" pods selected by
+ "selector" are unavailable after the eviction, i.e. even in absence of
+ the evicted pod. For example, one can prevent all voluntary evictions
+ by specifying 0. This is a mutually exclusive setting with "minAvailable".
+ x-kubernetes-int-or-string: true
+ minAvailable:
+ anyOf:
+ - type: integer
+ - type: string
+ description: |-
+ An eviction is allowed if at least "minAvailable" pods selected by
+ "selector" will still be available after the eviction, i.e. even in the
+ absence of the evicted pod. So for example you can prevent all voluntary
+ evictions by specifying "100%".
+ x-kubernetes-int-or-string: true
+ selector:
+ description: |-
+ Label query over pods whose evictions are managed by the disruption
+ budget.
+ A null selector will match no pods, while an empty ({}) selector will select
+ all pods within the namespace.
+ properties:
+ matchExpressions:
+ description: matchExpressions is a list of label selector
+ requirements. The requirements are ANDed.
+ items:
+ description: |-
+ A label selector requirement is a selector that contains values, a key, and an operator that
+ relates the key and values.
+ properties:
+ key:
+ description: key is the label key that the selector
+ applies to.
+ type: string
+ operator:
+ description: |-
+ operator represents a key's relationship to a set of values.
+ Valid operators are In, NotIn, Exists and DoesNotExist.
+ type: string
+ values:
+ description: |-
+ values is an array of string values. If the operator is In or NotIn,
+ the values array must be non-empty. If the operator is Exists or DoesNotExist,
+ the values array must be empty. This array is replaced during a strategic
+ merge patch.
+ items:
+ type: string
+ type: array
+ required:
+ - key
+ - operator
+ type: object
+ type: array
+ matchLabels:
+ additionalProperties:
+ type: string
+ description: |-
+ matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels
+ map is equivalent to an element of matchExpressions, whose key field is "key", the
+ operator is "In", and the values array contains only "value". The requirements are ANDed.
+ type: object
+ type: object
+ x-kubernetes-map-type: atomic
+ unhealthyPodEvictionPolicy:
+ description: |-
+ UnhealthyPodEvictionPolicy defines the criteria for when unhealthy pods
+ should be considered for eviction. Current implementation considers healthy pods,
+ as pods that have status.conditions item with type="Ready",status="True".
+
+
+ Valid policies are IfHealthyBudget and AlwaysAllow.
+ If no policy is specified, the default behavior will be used,
+ which corresponds to the IfHealthyBudget policy.
+
+
+ IfHealthyBudget policy means that running pods (status.phase="Running"),
+ but not yet healthy can be evicted only if the guarded application is not
+ disrupted (status.currentHealthy is at least equal to status.desiredHealthy).
+ Healthy pods will be subject to the PDB for eviction.
+
+
+ AlwaysAllow policy means that all running pods (status.phase="Running"),
+ but not yet healthy are considered disrupted and can be evicted regardless
+ of whether the criteria in a PDB is met. This means perspective running
+ pods of a disrupted application might not get a chance to become healthy.
+ Healthy pods will be subject to the PDB for eviction.
+
+
+ Additional policies may be added in the future.
+ Clients making eviction decisions should disallow eviction of unhealthy pods
+ if they encounter an unrecognized policy in this field.
+
+
+ This field is beta-level. The eviction API uses this field when
+ the feature gate PDBUnhealthyPodEvictionPolicy is enabled (enabled by default).
+ type: string
+ type: object
podSecurityContext:
description: PodSecurityContext specifies the PodSecurityContext
to apply.
diff --git a/config/crd/bases/sparkoperator.k8s.io_sparkapplications.yaml b/config/crd/bases/sparkoperator.k8s.io_sparkapplications.yaml
index 4c839e36e..e4ca4bca6 100644
--- a/config/crd/bases/sparkoperator.k8s.io_sparkapplications.yaml
+++ b/config/crd/bases/sparkoperator.k8s.io_sparkapplications.yaml
@@ -2930,6 +2930,112 @@ spec:
NodeSelector is the Kubernetes node selector to be added to the driver and executor pods.
This field is mutually exclusive with nodeSelector at SparkApplication level (which will be deprecated).
type: object
+ podDisruptionBudgetSpec:
+ description: PodDisruptionBudgetSpec is the PodDisruptionBudget
+ specification for the Spark Driver.
+ properties:
+ maxUnavailable:
+ anyOf:
+ - type: integer
+ - type: string
+ description: |-
+ An eviction is allowed if at most "maxUnavailable" pods selected by
+ "selector" are unavailable after the eviction, i.e. even in absence of
+ the evicted pod. For example, one can prevent all voluntary evictions
+ by specifying 0. This is a mutually exclusive setting with "minAvailable".
+ x-kubernetes-int-or-string: true
+ minAvailable:
+ anyOf:
+ - type: integer
+ - type: string
+ description: |-
+ An eviction is allowed if at least "minAvailable" pods selected by
+ "selector" will still be available after the eviction, i.e. even in the
+ absence of the evicted pod. So for example you can prevent all voluntary
+ evictions by specifying "100%".
+ x-kubernetes-int-or-string: true
+ selector:
+ description: |-
+ Label query over pods whose evictions are managed by the disruption
+ budget.
+ A null selector will match no pods, while an empty ({}) selector will select
+ all pods within the namespace.
+ properties:
+ matchExpressions:
+ description: matchExpressions is a list of label selector
+ requirements. The requirements are ANDed.
+ items:
+ description: |-
+ A label selector requirement is a selector that contains values, a key, and an operator that
+ relates the key and values.
+ properties:
+ key:
+ description: key is the label key that the selector
+ applies to.
+ type: string
+ operator:
+ description: |-
+ operator represents a key's relationship to a set of values.
+ Valid operators are In, NotIn, Exists and DoesNotExist.
+ type: string
+ values:
+ description: |-
+ values is an array of string values. If the operator is In or NotIn,
+ the values array must be non-empty. If the operator is Exists or DoesNotExist,
+ the values array must be empty. This array is replaced during a strategic
+ merge patch.
+ items:
+ type: string
+ type: array
+ required:
+ - key
+ - operator
+ type: object
+ type: array
+ matchLabels:
+ additionalProperties:
+ type: string
+ description: |-
+ matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels
+ map is equivalent to an element of matchExpressions, whose key field is "key", the
+ operator is "In", and the values array contains only "value". The requirements are ANDed.
+ type: object
+ type: object
+ x-kubernetes-map-type: atomic
+ unhealthyPodEvictionPolicy:
+ description: |-
+ UnhealthyPodEvictionPolicy defines the criteria for when unhealthy pods
+ should be considered for eviction. Current implementation considers healthy pods,
+ as pods that have status.conditions item with type="Ready",status="True".
+
+
+ Valid policies are IfHealthyBudget and AlwaysAllow.
+ If no policy is specified, the default behavior will be used,
+ which corresponds to the IfHealthyBudget policy.
+
+
+ IfHealthyBudget policy means that running pods (status.phase="Running"),
+ but not yet healthy can be evicted only if the guarded application is not
+ disrupted (status.currentHealthy is at least equal to status.desiredHealthy).
+ Healthy pods will be subject to the PDB for eviction.
+
+
+ AlwaysAllow policy means that all running pods (status.phase="Running"),
+ but not yet healthy are considered disrupted and can be evicted regardless
+ of whether the criteria in a PDB is met. This means perspective running
+ pods of a disrupted application might not get a chance to become healthy.
+ Healthy pods will be subject to the PDB for eviction.
+
+
+ Additional policies may be added in the future.
+ Clients making eviction decisions should disallow eviction of unhealthy pods
+ if they encounter an unrecognized policy in this field.
+
+
+ This field is beta-level. The eviction API uses this field when
+ the feature gate PDBUnhealthyPodEvictionPolicy is enabled (enabled by default).
+ type: string
+ type: object
podName:
description: |-
PodName is the name of the driver pod that the user creates. This is used for the
@@ -7686,6 +7792,112 @@ spec:
NodeSelector is the Kubernetes node selector to be added to the driver and executor pods.
This field is mutually exclusive with nodeSelector at SparkApplication level (which will be deprecated).
type: object
+ podDisruptionBudgetSpec:
+ description: PodDisruptionBudgetSpec is the PodDisruptionBudget
+ specification for the Spark Executors.
+ properties:
+ maxUnavailable:
+ anyOf:
+ - type: integer
+ - type: string
+ description: |-
+ An eviction is allowed if at most "maxUnavailable" pods selected by
+ "selector" are unavailable after the eviction, i.e. even in absence of
+ the evicted pod. For example, one can prevent all voluntary evictions
+ by specifying 0. This is a mutually exclusive setting with "minAvailable".
+ x-kubernetes-int-or-string: true
+ minAvailable:
+ anyOf:
+ - type: integer
+ - type: string
+ description: |-
+ An eviction is allowed if at least "minAvailable" pods selected by
+ "selector" will still be available after the eviction, i.e. even in the
+ absence of the evicted pod. So for example you can prevent all voluntary
+ evictions by specifying "100%".
+ x-kubernetes-int-or-string: true
+ selector:
+ description: |-
+ Label query over pods whose evictions are managed by the disruption
+ budget.
+ A null selector will match no pods, while an empty ({}) selector will select
+ all pods within the namespace.
+ properties:
+ matchExpressions:
+ description: matchExpressions is a list of label selector
+ requirements. The requirements are ANDed.
+ items:
+ description: |-
+ A label selector requirement is a selector that contains values, a key, and an operator that
+ relates the key and values.
+ properties:
+ key:
+ description: key is the label key that the selector
+ applies to.
+ type: string
+ operator:
+ description: |-
+ operator represents a key's relationship to a set of values.
+ Valid operators are In, NotIn, Exists and DoesNotExist.
+ type: string
+ values:
+ description: |-
+ values is an array of string values. If the operator is In or NotIn,
+ the values array must be non-empty. If the operator is Exists or DoesNotExist,
+ the values array must be empty. This array is replaced during a strategic
+ merge patch.
+ items:
+ type: string
+ type: array
+ required:
+ - key
+ - operator
+ type: object
+ type: array
+ matchLabels:
+ additionalProperties:
+ type: string
+ description: |-
+ matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels
+ map is equivalent to an element of matchExpressions, whose key field is "key", the
+ operator is "In", and the values array contains only "value". The requirements are ANDed.
+ type: object
+ type: object
+ x-kubernetes-map-type: atomic
+ unhealthyPodEvictionPolicy:
+ description: |-
+ UnhealthyPodEvictionPolicy defines the criteria for when unhealthy pods
+ should be considered for eviction. Current implementation considers healthy pods,
+ as pods that have status.conditions item with type="Ready",status="True".
+
+
+ Valid policies are IfHealthyBudget and AlwaysAllow.
+ If no policy is specified, the default behavior will be used,
+ which corresponds to the IfHealthyBudget policy.
+
+
+ IfHealthyBudget policy means that running pods (status.phase="Running"),
+ but not yet healthy can be evicted only if the guarded application is not
+ disrupted (status.currentHealthy is at least equal to status.desiredHealthy).
+ Healthy pods will be subject to the PDB for eviction.
+
+
+ AlwaysAllow policy means that all running pods (status.phase="Running"),
+ but not yet healthy are considered disrupted and can be evicted regardless
+ of whether the criteria in a PDB is met. This means perspective running
+ pods of a disrupted application might not get a chance to become healthy.
+ Healthy pods will be subject to the PDB for eviction.
+
+
+ Additional policies may be added in the future.
+ Clients making eviction decisions should disallow eviction of unhealthy pods
+ if they encounter an unrecognized policy in this field.
+
+
+ This field is beta-level. The eviction API uses this field when
+ the feature gate PDBUnhealthyPodEvictionPolicy is enabled (enabled by default).
+ type: string
+ type: object
podSecurityContext:
description: PodSecurityContext specifies the PodSecurityContext
to apply.
diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml
index 4a9d1d526..cb040a499 100644
--- a/config/rbac/role.yaml
+++ b/config/rbac/role.yaml
@@ -76,6 +76,18 @@ rules:
- patch
- update
- watch
+- apiGroups:
+ - policy
+ resources:
+ - poddisruptionbudgets
+ verbs:
+ - create
+ - delete
+ - get
+ - list
+ - patch
+ - update
+ - watch
- apiGroups:
- sparkoperator.k8s.io
resources:
diff --git a/docs/api-docs.md b/docs/api-docs.md
index 08f8d42d8..1cac3ca7f 100644
--- a/docs/api-docs.md
+++ b/docs/api-docs.md
@@ -649,6 +649,20 @@ string
PriorityClassName is the name of the PriorityClass for the driver pod.
+
+
+podDisruptionBudgetSpec
+
+
+Kubernetes policy/v1.PodDisruptionBudgetSpec
+
+
+ |
+
+(Optional)
+ PodDisruptionBudgetSpec is the PodDisruptionBudget specification for the Spark Driver.
+ |
+
DriverState
@@ -876,6 +890,20 @@ string
PriorityClassName is the name of the PriorityClass for the executor pod.
+
+
+podDisruptionBudgetSpec
+
+
+Kubernetes policy/v1.PodDisruptionBudgetSpec
+
+
+ |
+
+(Optional)
+ PodDisruptionBudgetSpec is the PodDisruptionBudget specification for the Spark Executors.
+ |
+
ExecutorState
diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go
index e4cb78d24..8dc790c10 100644
--- a/internal/controller/sparkapplication/controller.go
+++ b/internal/controller/sparkapplication/controller.go
@@ -28,6 +28,7 @@ import (
corev1 "k8s.io/api/core/v1"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
networkingv1 "k8s.io/api/networking/v1"
+ policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@@ -42,7 +43,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"github.com/kubeflow/spark-operator/api/v1beta2"
- "github.com/kubeflow/spark-operator/internal/metrics"
"github.com/kubeflow/spark-operator/internal/scheduler"
"github.com/kubeflow/spark-operator/internal/scheduler/kubescheduler"
"github.com/kubeflow/spark-operator/internal/scheduler/volcano"
@@ -55,22 +55,6 @@ var (
logger = log.Log.WithName("")
)
-// Options defines the options of the controller.
-type Options struct {
- Namespaces []string
- EnableUIService bool
- IngressClassName string
- IngressURLFormat string
- DefaultBatchScheduler string
-
- KubeSchedulerNames []string
-
- SparkApplicationMetrics *metrics.SparkApplicationMetrics
- SparkExecutorMetrics *metrics.SparkExecutorMetrics
-
- MaxTrackedExecutorPerApp int
-}
-
// Reconciler reconciles a SparkApplication object.
type Reconciler struct {
manager ctrl.Manager
@@ -111,6 +95,7 @@ func NewReconciler(
// +kubebuilder:rbac:groups=,resources=resourcequotas,verbs=get;list;watch
// +kubebuilder:rbac:groups=extensions,resources=ingresses,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;list;watch;create;update;patch;delete
+// +kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get
// +kubebuilder:rbac:groups=sparkoperator.k8s.io,resources=sparkapplications,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=sparkoperator.k8s.io,resources=sparkapplications/status,verbs=get;update;patch
@@ -165,7 +150,7 @@ func NewReconciler(
// +--------------------------------------------------------------------------------------------------------------------+
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
key := req.NamespacedName
- app, err := r.getSparkApplication(key)
+ app, err := r.getSparkApplication(ctx, key)
if err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
@@ -232,7 +217,7 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager, options controller.Optio
func (r *Reconciler) handleSparkApplicationDeletion(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
key := req.NamespacedName
- app, err := r.getSparkApplication(key)
+ app, err := r.getSparkApplication(ctx, key)
if err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
@@ -252,7 +237,7 @@ func (r *Reconciler) reconcileNewSparkApplication(ctx context.Context, req ctrl.
retryErr := retry.RetryOnConflict(
retry.DefaultRetry,
func() error {
- old, err := r.getSparkApplication(key)
+ old, err := r.getSparkApplication(ctx, key)
if err != nil {
return err
}
@@ -261,7 +246,7 @@ func (r *Reconciler) reconcileNewSparkApplication(ctx context.Context, req ctrl.
}
app := old.DeepCopy()
- _ = r.submitSparkApplication(app)
+ _ = r.submitSparkApplication(ctx, app)
if err := r.updateSparkApplicationStatus(ctx, app); err != nil {
return err
}
@@ -280,7 +265,7 @@ func (r *Reconciler) reconcileSubmittedSparkApplication(ctx context.Context, req
retryErr := retry.RetryOnConflict(
retry.DefaultRetry,
func() error {
- old, err := r.getSparkApplication(key)
+ old, err := r.getSparkApplication(ctx, key)
if err != nil {
return err
}
@@ -313,7 +298,7 @@ func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Conte
retryErr := retry.RetryOnConflict(
retry.DefaultRetry,
func() error {
- old, err := r.getSparkApplication(key)
+ old, err := r.getSparkApplication(ctx, key)
if err != nil {
return err
}
@@ -329,7 +314,7 @@ func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Conte
}
if timeUntilNextRetryDue <= 0 {
if r.validateSparkResourceDeletion(ctx, app) {
- _ = r.submitSparkApplication(app)
+ _ = r.submitSparkApplication(ctx, app)
} else {
if err := r.deleteSparkResources(ctx, app); err != nil {
logger.Error(err, "failed to delete resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace)
@@ -364,7 +349,7 @@ func (r *Reconciler) reconcileRunningSparkApplication(ctx context.Context, req c
retryErr := retry.RetryOnConflict(
retry.DefaultRetry,
func() error {
- old, err := r.getSparkApplication(key)
+ old, err := r.getSparkApplication(ctx, key)
if err != nil {
return err
}
@@ -396,7 +381,7 @@ func (r *Reconciler) reconcilePendingRerunSparkApplication(ctx context.Context,
retryErr := retry.RetryOnConflict(
retry.DefaultRetry,
func() error {
- old, err := r.getSparkApplication(key)
+ old, err := r.getSparkApplication(ctx, key)
if err != nil {
return err
}
@@ -410,7 +395,7 @@ func (r *Reconciler) reconcilePendingRerunSparkApplication(ctx context.Context,
logger.Info("Successfully deleted resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State)
r.recordSparkApplicationEvent(app)
r.resetSparkApplicationStatus(app)
- _ = r.submitSparkApplication(app)
+ _ = r.submitSparkApplication(ctx, app)
}
if err := r.updateSparkApplicationStatus(ctx, app); err != nil {
return err
@@ -430,7 +415,7 @@ func (r *Reconciler) reconcileInvalidatingSparkApplication(ctx context.Context,
retryErr := retry.RetryOnConflict(
retry.DefaultRetry,
func() error {
- old, err := r.getSparkApplication(key)
+ old, err := r.getSparkApplication(ctx, key)
if err != nil {
return err
}
@@ -464,7 +449,7 @@ func (r *Reconciler) reconcileSucceedingSparkApplication(ctx context.Context, re
retryErr := retry.RetryOnConflict(
retry.DefaultRetry,
func() error {
- old, err := r.getSparkApplication(key)
+ old, err := r.getSparkApplication(ctx, key)
if err != nil {
return err
}
@@ -503,7 +488,7 @@ func (r *Reconciler) reconcileFailingSparkApplication(ctx context.Context, req c
retryErr := retry.RetryOnConflict(
retry.DefaultRetry,
func() error {
- old, err := r.getSparkApplication(key)
+ old, err := r.getSparkApplication(ctx, key)
if err != nil {
return err
}
@@ -553,7 +538,7 @@ func (r *Reconciler) reconcileFailedSparkApplication(ctx context.Context, req ct
func (r *Reconciler) reconcileTerminatedSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
key := req.NamespacedName
- old, err := r.getSparkApplication(key)
+ old, err := r.getSparkApplication(ctx, key)
if err != nil {
return ctrl.Result{Requeue: true}, err
}
@@ -607,7 +592,7 @@ func (r *Reconciler) reconcileUnknownSparkApplication(ctx context.Context, req c
retryErr := retry.RetryOnConflict(
retry.DefaultRetry,
func() error {
- old, err := r.getSparkApplication(key)
+ old, err := r.getSparkApplication(ctx, key)
if err != nil {
return err
}
@@ -633,16 +618,16 @@ func (r *Reconciler) reconcileUnknownSparkApplication(ctx context.Context, req c
}
// getSparkApplication gets the SparkApplication with the given name and namespace.
-func (r *Reconciler) getSparkApplication(key types.NamespacedName) (*v1beta2.SparkApplication, error) {
+func (r *Reconciler) getSparkApplication(ctx context.Context, key types.NamespacedName) (*v1beta2.SparkApplication, error) {
app := &v1beta2.SparkApplication{}
- if err := r.client.Get(context.TODO(), key, app); err != nil {
+ if err := r.client.Get(ctx, key, app); err != nil {
return nil, err
}
return app, nil
}
// submitSparkApplication creates a new submission for the given SparkApplication and submits it using spark-submit.
-func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (submitErr error) {
+func (r *Reconciler) submitSparkApplication(ctx context.Context, app *v1beta2.SparkApplication) (submitErr error) {
logger.Info("Submitting SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State)
// SubmissionID must be set before creating any resources to ensure all the resources are labeled.
@@ -739,20 +724,17 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (subm
}
}
+ if err := r.createPodDisruptionBudgets(ctx, app); err != nil {
+ return fmt.Errorf("failed to create pod disruption budget: %v", err)
+ }
+
defer func() {
if err := r.cleanUpPodTemplateFiles(app); err != nil {
logger.Error(fmt.Errorf("failed to clean up pod template files: %v", err), "name", app.Name, "namespace", app.Namespace)
}
}()
- sparkSubmitArgs, err := buildSparkSubmitArgs(app)
- if err != nil {
- return fmt.Errorf("failed to build spark-submit arguments: %v", err)
- }
-
- // Try submitting the application by running spark-submit.
- logger.Info("Running spark-submit for SparkApplication", "name", app.Name, "namespace", app.Namespace, "arguments", sparkSubmitArgs)
- if err := runSparkSubmit(newSubmission(sparkSubmitArgs, app)); err != nil {
+ if err := r.options.getSparkSubmitter().submit(app); err != nil {
r.recordSparkApplicationEvent(app)
return fmt.Errorf("failed to run spark-submit: %v", err)
}
@@ -761,13 +743,13 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (subm
// updateDriverState finds the driver pod of the application
// and updates the driver state based on the current phase of the pod.
-func (r *Reconciler) updateDriverState(_ context.Context, app *v1beta2.SparkApplication) error {
+func (r *Reconciler) updateDriverState(ctx context.Context, app *v1beta2.SparkApplication) error {
// Either the driver pod doesn't exist yet or its name has not been updated.
if app.Status.DriverInfo.PodName == "" {
return fmt.Errorf("empty driver pod name with application state %s", app.Status.AppState.State)
}
- driverPod, err := r.getDriverPod(app)
+ driverPod, err := r.getDriverPod(ctx, app)
if err != nil {
return err
}
@@ -808,8 +790,8 @@ func (r *Reconciler) updateDriverState(_ context.Context, app *v1beta2.SparkAppl
// updateExecutorState lists the executor pods of the application
// and updates the executor state based on the current phase of the pods.
-func (r *Reconciler) updateExecutorState(_ context.Context, app *v1beta2.SparkApplication) error {
- podList, err := r.getExecutorPods(app)
+func (r *Reconciler) updateExecutorState(ctx context.Context, app *v1beta2.SparkApplication) error {
+ podList, err := r.getExecutorPods(ctx, app)
if err != nil {
return err
}
@@ -884,22 +866,22 @@ func (r *Reconciler) updateExecutorState(_ context.Context, app *v1beta2.SparkAp
return nil
}
-func (r *Reconciler) getExecutorPods(app *v1beta2.SparkApplication) (*corev1.PodList, error) {
+func (r *Reconciler) getExecutorPods(ctx context.Context, app *v1beta2.SparkApplication) (*corev1.PodList, error) {
matchLabels := util.GetResourceLabels(app)
matchLabels[common.LabelSparkRole] = common.SparkRoleExecutor
pods := &corev1.PodList{}
- if err := r.client.List(context.TODO(), pods, client.InNamespace(app.Namespace), client.MatchingLabels(matchLabels)); err != nil {
+ if err := r.client.List(ctx, pods, client.InNamespace(app.Namespace), client.MatchingLabels(matchLabels)); err != nil {
return nil, fmt.Errorf("failed to get pods for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
}
return pods, nil
}
-func (r *Reconciler) getDriverPod(app *v1beta2.SparkApplication) (*corev1.Pod, error) {
+func (r *Reconciler) getDriverPod(ctx context.Context, app *v1beta2.SparkApplication) (*corev1.Pod, error) {
pod := &corev1.Pod{}
var err error
key := types.NamespacedName{Namespace: app.Namespace, Name: app.Status.DriverInfo.PodName}
- err = r.client.Get(context.TODO(), key, pod)
+ err = r.client.Get(ctx, key, pod)
if err == nil {
return pod, nil
}
@@ -944,6 +926,10 @@ func (r *Reconciler) deleteSparkResources(ctx context.Context, app *v1beta2.Spar
return err
}
+ if err := r.deletePodDisruptionBudgets(ctx, app); err != nil {
+ return err
+ }
+
return nil
}
@@ -1021,7 +1007,7 @@ func (r *Reconciler) deleteWebUIIngress(ctx context.Context, app *v1beta2.SparkA
if util.IngressCapabilities.Has("extensions/v1beta1") {
logger.V(1).Info("Deleting extensions/v1beta1 Spark UI Ingress", "name", ingressName, "namespace", app.Namespace)
if err := r.client.Delete(
- context.TODO(),
+ ctx,
&extensionsv1beta1.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: ingressName,
@@ -1247,3 +1233,69 @@ func (r *Reconciler) cleanUpPodTemplateFiles(app *v1beta2.SparkApplication) erro
logger.V(1).Info("Deleted pod template files", "path", path)
return nil
}
+
+func (r *Reconciler) createPodDisruptionBudgets(ctx context.Context, app *v1beta2.SparkApplication) error {
+ for role, spec := range getPodDisruptionBudgetRoles(app) {
+ labels := util.GetResourceLabels(app)
+ labels[common.LabelSparkRole] = role
+
+ name := getPodDisruptionBudgetName(app.Name, role)
+ logger.Info("Creating Spark PodDisruptionBudget", "name", name, "namespace", app.Namespace)
+
+ if err := r.client.Create(ctx, &policyv1.PodDisruptionBudget{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: app.Namespace,
+ Labels: labels,
+ OwnerReferences: []metav1.OwnerReference{util.GetOwnerReference(app)},
+ },
+ Spec: *spec,
+ }); err != nil && !errors.IsAlreadyExists(err) {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (r *Reconciler) deletePodDisruptionBudgets(ctx context.Context, app *v1beta2.SparkApplication) error {
+ for role, _ := range getPodDisruptionBudgetRoles(app) {
+ name := getPodDisruptionBudgetName(app.Name, role)
+ logger.Info("Deleting Spark PodDisruptionBudget", "name", name, "namespace", app.Namespace)
+
+ if err := r.client.Delete(
+ ctx,
+ &policyv1.PodDisruptionBudget{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: app.Namespace,
+ },
+ },
+ &client.DeleteOptions{
+ GracePeriodSeconds: util.Int64Ptr(0),
+ },
+ ); err != nil && !errors.IsNotFound(err) {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func getPodDisruptionBudgetRoles(app *v1beta2.SparkApplication) map[string]*policyv1.PodDisruptionBudgetSpec {
+ roles := make(map[string]*policyv1.PodDisruptionBudgetSpec)
+
+ if app.Spec.Driver.PodDisruptionBudgetSpec != nil {
+ roles[common.SparkRoleDriver] = app.Spec.Driver.PodDisruptionBudgetSpec
+ }
+
+ if app.Spec.Executor.PodDisruptionBudgetSpec != nil {
+ roles[common.SparkRoleExecutor] = app.Spec.Executor.PodDisruptionBudgetSpec
+ }
+
+ return roles
+}
+
+func getPodDisruptionBudgetName(app, role string) string {
+ return app + "-" + role
+}
diff --git a/internal/controller/sparkapplication/controller_test.go b/internal/controller/sparkapplication/controller_test.go
index 66afe0ca4..aa4faba95 100644
--- a/internal/controller/sparkapplication/controller_test.go
+++ b/internal/controller/sparkapplication/controller_test.go
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package sparkapplication_test
+package sparkapplication
import (
"context"
@@ -23,20 +23,29 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
-
corev1 "k8s.io/api/core/v1"
+ policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"github.com/kubeflow/spark-operator/api/v1beta2"
- "github.com/kubeflow/spark-operator/internal/controller/sparkapplication"
"github.com/kubeflow/spark-operator/pkg/common"
"github.com/kubeflow/spark-operator/pkg/util"
)
+type fakeSparkSubmitter struct {
+}
+
+var _ sparkSubmitter = &fakeSparkSubmitter{}
+
+func (s *fakeSparkSubmitter) submit(*v1beta2.SparkApplication) error {
+ return nil
+}
+
var _ = Describe("SparkApplication Controller", func() {
Context("When reconciling a new SparkApplication", func() {
ctx := context.Background()
@@ -117,13 +126,16 @@ var _ = Describe("SparkApplication Controller", func() {
It("Should successfully reconcile a completed SparkApplication", func() {
By("Reconciling the created test SparkApplication")
- reconciler := sparkapplication.NewReconciler(
+ reconciler := NewReconciler(
nil,
k8sClient.Scheme(),
k8sClient,
nil,
nil,
- sparkapplication.Options{Namespaces: []string{appNamespace}},
+ Options{
+ Namespaces: []string{appNamespace},
+ sparkSubmitter: &fakeSparkSubmitter{},
+ },
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expect(err).NotTo(HaveOccurred())
@@ -176,13 +188,13 @@ var _ = Describe("SparkApplication Controller", func() {
Expect(k8sClient.Status().Update(ctx, app)).To(Succeed())
By("Reconciling the expired SparkApplication")
- reconciler := sparkapplication.NewReconciler(
+ reconciler := NewReconciler(
nil,
k8sClient.Scheme(),
k8sClient,
nil,
nil,
- sparkapplication.Options{Namespaces: []string{appNamespace}},
+ Options{Namespaces: []string{appNamespace}},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expect(err).NotTo(HaveOccurred())
@@ -230,13 +242,13 @@ var _ = Describe("SparkApplication Controller", func() {
It("Should successfully reconcile a failed SparkApplication", func() {
By("Reconciling the created test SparkApplication")
- reconciler := sparkapplication.NewReconciler(
+ reconciler := NewReconciler(
nil,
k8sClient.Scheme(),
k8sClient,
nil,
nil,
- sparkapplication.Options{Namespaces: []string{appNamespace}},
+ Options{Namespaces: []string{appNamespace}},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expect(err).NotTo(HaveOccurred())
@@ -289,13 +301,13 @@ var _ = Describe("SparkApplication Controller", func() {
Expect(k8sClient.Status().Update(ctx, app)).To(Succeed())
By("Reconciling the expired SparkApplication")
- reconciler := sparkapplication.NewReconciler(
+ reconciler := NewReconciler(
nil,
k8sClient.Scheme(),
k8sClient,
nil,
nil,
- sparkapplication.Options{Namespaces: []string{appNamespace}},
+ Options{Namespaces: []string{appNamespace}},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expect(err).NotTo(HaveOccurred())
@@ -371,13 +383,13 @@ var _ = Describe("SparkApplication Controller", func() {
It("Should add the executors to the SparkApplication", func() {
By("Reconciling the running SparkApplication")
- reconciler := sparkapplication.NewReconciler(
+ reconciler := NewReconciler(
nil,
k8sClient.Scheme(),
k8sClient,
record.NewFakeRecorder(3),
nil,
- sparkapplication.Options{Namespaces: []string{appNamespace}, MaxTrackedExecutorPerApp: 10},
+ Options{Namespaces: []string{appNamespace}, MaxTrackedExecutorPerApp: 10},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expect(err).NotTo(HaveOccurred())
@@ -390,13 +402,13 @@ var _ = Describe("SparkApplication Controller", func() {
It("Should only add 1 executor to the SparkApplication", func() {
By("Reconciling the running SparkApplication")
- reconciler := sparkapplication.NewReconciler(
+ reconciler := NewReconciler(
nil,
k8sClient.Scheme(),
k8sClient,
record.NewFakeRecorder(3),
nil,
- sparkapplication.Options{Namespaces: []string{appNamespace}, MaxTrackedExecutorPerApp: 1},
+ Options{Namespaces: []string{appNamespace}, MaxTrackedExecutorPerApp: 1},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expect(err).NotTo(HaveOccurred())
@@ -407,6 +419,84 @@ var _ = Describe("SparkApplication Controller", func() {
Expect(app.Status.ExecutorState).To(HaveLen(1))
})
})
+
+ Context("When reconciling a SparkApplication with PodDisruptionBudget", func() {
+ ctx := context.Background()
+ appName := "test-pdb"
+ appNamespace := "default"
+ key := types.NamespacedName{
+ Name: appName,
+ Namespace: appNamespace,
+ }
+ pdbMinAvailable := &intstr.IntOrString{Type: intstr.Int, IntVal: 1}
+ recorder := record.NewFakeRecorder(1024)
+
+ BeforeEach(func() {
+ By("Creating a test SparkApplication")
+ app := &v1beta2.SparkApplication{}
+ if err := k8sClient.Get(ctx, key, app); err != nil && errors.IsNotFound(err) {
+ app = &v1beta2.SparkApplication{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: appName,
+ Namespace: appNamespace,
+ Labels: map[string]string{
+ common.LabelSparkAppName: app.Name,
+ },
+ },
+ Spec: v1beta2.SparkApplicationSpec{
+ MainApplicationFile: util.StringPtr("local:///dummy.jar"),
+ Driver: v1beta2.DriverSpec{
+ PodDisruptionBudgetSpec: &policyv1.PodDisruptionBudgetSpec{
+ MinAvailable: pdbMinAvailable,
+ },
+ },
+ Executor: v1beta2.ExecutorSpec{
+ PodDisruptionBudgetSpec: &policyv1.PodDisruptionBudgetSpec{
+ MinAvailable: pdbMinAvailable,
+ },
+ },
+ },
+ }
+ v1beta2.SetSparkApplicationDefaults(app)
+ Expect(k8sClient.Create(ctx, app)).To(Succeed())
+ }
+ })
+
+ BeforeEach(func() {
+ By("Reconciling the created test SparkApplication")
+ reconciler := NewReconciler(
+ nil,
+ k8sClient.Scheme(),
+ k8sClient,
+ recorder,
+ nil,
+ Options{Namespaces: []string{appNamespace}, sparkSubmitter: &fakeSparkSubmitter{}},
+ )
+ result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
+ Expect(err).NotTo(HaveOccurred())
+ Expect(result.Requeue).To(BeFalse())
+ })
+
+ AfterEach(func() {
+ app := &v1beta2.SparkApplication{}
+ Expect(k8sClient.Get(ctx, key, app)).To(Succeed())
+
+ By("Deleting the created test SparkApplication")
+ Expect(k8sClient.Delete(ctx, app)).To(Succeed())
+ })
+
+ It("Should create a driver PodDisruptionBudget", func() {
+ pdb := &policyv1.PodDisruptionBudget{}
+ Expect(k8sClient.Get(ctx, types.NamespacedName{Name: appName + "-driver", Namespace: appNamespace}, pdb)).To(Succeed())
+ Expect(pdb.Spec.MinAvailable).To(Equal(pdbMinAvailable))
+ })
+
+ It("Should create an executor PodDisruptionBudget", func() {
+ pdb := &policyv1.PodDisruptionBudget{}
+ Expect(k8sClient.Get(ctx, types.NamespacedName{Name: appName + "-executor", Namespace: appNamespace}, pdb)).To(Succeed())
+ Expect(pdb.Spec.MinAvailable).To(Equal(pdbMinAvailable))
+ })
+ })
})
func getDriverNamespacedName(appName string, appNamespace string) types.NamespacedName {
diff --git a/internal/controller/sparkapplication/monitoring_config_test.go b/internal/controller/sparkapplication/monitoring_config_test.go
index 2b83bb141..bb50f0607 100644
--- a/internal/controller/sparkapplication/monitoring_config_test.go
+++ b/internal/controller/sparkapplication/monitoring_config_test.go
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package sparkapplication_test
+package sparkapplication
// func TestConfigPrometheusMonitoring(t *testing.T) {
// type testcase struct {
diff --git a/internal/controller/sparkapplication/options.go b/internal/controller/sparkapplication/options.go
new file mode 100644
index 000000000..b980736d7
--- /dev/null
+++ b/internal/controller/sparkapplication/options.go
@@ -0,0 +1,28 @@
+package sparkapplication
+
+import "github.com/kubeflow/spark-operator/internal/metrics"
+
+// Options defines the options of the controller.
+type Options struct {
+ Namespaces []string
+ EnableUIService bool
+ IngressClassName string
+ IngressURLFormat string
+ DefaultBatchScheduler string
+
+ KubeSchedulerNames []string
+
+ SparkApplicationMetrics *metrics.SparkApplicationMetrics
+ SparkExecutorMetrics *metrics.SparkExecutorMetrics
+
+ MaxTrackedExecutorPerApp int
+
+ sparkSubmitter sparkSubmitter
+}
+
+func (o *Options) getSparkSubmitter() sparkSubmitter {
+ if o.sparkSubmitter == nil {
+ o.sparkSubmitter = newSparkSubmitterHandler()
+ }
+ return o.sparkSubmitter
+}
diff --git a/internal/controller/sparkapplication/submitter.go b/internal/controller/sparkapplication/submitter.go
new file mode 100644
index 000000000..7e724dfc2
--- /dev/null
+++ b/internal/controller/sparkapplication/submitter.go
@@ -0,0 +1,29 @@
+package sparkapplication
+
+import (
+ "fmt"
+
+ "github.com/kubeflow/spark-operator/api/v1beta2"
+)
+
+type sparkSubmitter interface {
+ submit(*v1beta2.SparkApplication) error
+}
+
+type sparkSubmitterHandler struct {
+}
+
+func newSparkSubmitterHandler() sparkSubmitter {
+ return &sparkSubmitterHandler{}
+}
+
+func (s *sparkSubmitterHandler) submit(app *v1beta2.SparkApplication) error {
+ sparkSubmitArgs, err := buildSparkSubmitArgs(app)
+ if err != nil {
+ return fmt.Errorf("failed to build spark-submit arguments: %v", err)
+ }
+
+ // Try submitting the application by running spark-submit.
+ logger.Info("Running spark-submit for SparkApplication", "name", app.Name, "namespace", app.Namespace, "arguments", sparkSubmitArgs)
+ return runSparkSubmit(newSubmission(sparkSubmitArgs, app))
+}
diff --git a/internal/controller/sparkapplication/suite_test.go b/internal/controller/sparkapplication/suite_test.go
index 02ce4c260..a14ecde09 100644
--- a/internal/controller/sparkapplication/suite_test.go
+++ b/internal/controller/sparkapplication/suite_test.go
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package sparkapplication_test
+package sparkapplication
import (
"fmt"