diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java index 3956935c..eb6f86cb 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java @@ -19,6 +19,7 @@ package org.apache.spark.k8s.operator.reconciler; +import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_APPLICATION_NAME; import static org.apache.spark.k8s.operator.reconciler.ReconcileProgress.completeAndDefaultRequeue; import static org.apache.spark.k8s.operator.utils.Utils.commonResourceLabelsStr; @@ -45,7 +46,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.spark.k8s.operator.Constants; import org.apache.spark.k8s.operator.SparkAppSubmissionWorker; import org.apache.spark.k8s.operator.SparkApplication; import org.apache.spark.k8s.operator.context.SparkAppContext; @@ -60,8 +60,8 @@ import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppResourceObserveStep; import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppRunningStep; import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppTerminatedStep; +import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppUnknownStateStep; import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppValidateStep; -import org.apache.spark.k8s.operator.reconciler.reconcilesteps.UnknownStateStep; import org.apache.spark.k8s.operator.utils.LoggingUtils; import org.apache.spark.k8s.operator.utils.ReconcilerUtils; import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder; @@ -140,8 +140,7 @@ public Map prepareEventSources( EventSource podEventSource = new InformerEventSource<>( InformerConfiguration.from(Pod.class, context) - .withSecondaryToPrimaryMapper( - Mappers.fromLabel(Constants.LABEL_SPARK_APPLICATION_NAME)) + .withSecondaryToPrimaryMapper(Mappers.fromLabel(LABEL_SPARK_APPLICATION_NAME)) .withLabelSelector(commonResourceLabelsStr()) .build(), context); @@ -153,12 +152,8 @@ protected List getReconcileSteps(final SparkApplication app) { steps.add(new AppValidateStep()); steps.add(new AppTerminatedStep()); switch (app.getStatus().getCurrentState().getCurrentStateSummary()) { - case Submitted: - case ScheduledToRestart: - steps.add(new AppInitStep()); - break; - case DriverRequested: - case DriverStarted: + case Submitted, ScheduledToRestart -> steps.add(new AppInitStep()); + case DriverRequested, DriverStarted -> { steps.add( new AppResourceObserveStep( List.of(new AppDriverStartObserver(), new AppDriverReadyObserver()))); @@ -166,29 +161,26 @@ protected List getReconcileSteps(final SparkApplication app) { new AppResourceObserveStep(Collections.singletonList(new AppDriverRunningObserver()))); steps.add( new AppResourceObserveStep(Collections.singletonList(new AppDriverTimeoutObserver()))); - break; - case DriverReady: - case InitializedBelowThresholdExecutors: - case RunningHealthy: - case RunningWithBelowThresholdExecutors: + } + case DriverReady, + InitializedBelowThresholdExecutors, + RunningHealthy, + RunningWithBelowThresholdExecutors -> { steps.add(new AppRunningStep()); steps.add( new AppResourceObserveStep(Collections.singletonList(new AppDriverRunningObserver()))); steps.add( new AppResourceObserveStep(Collections.singletonList(new AppDriverTimeoutObserver()))); - break; - case DriverReadyTimedOut: - case DriverStartTimedOut: - case ExecutorsStartTimedOut: - case Succeeded: - case DriverEvicted: - case Failed: - case SchedulingFailure: - steps.add(new AppCleanUpStep()); - break; - default: - steps.add(new UnknownStateStep()); - break; + } + case DriverReadyTimedOut, + DriverStartTimedOut, + ExecutorsStartTimedOut, + Succeeded, + DriverEvicted, + Failed, + SchedulingFailure -> + steps.add(new AppCleanUpStep()); + default -> steps.add(new AppUnknownStateStep()); } return steps; } diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStep.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStep.java index e102bce1..70d79cc9 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStep.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStep.java @@ -45,7 +45,7 @@ import org.apache.spark.k8s.operator.utils.ReconcilerUtils; import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder; -/** Request all driver and driver resources when starting an attempt */ +/** Request all driver and its resources when starting an attempt */ @Slf4j public class AppInitStep extends AppReconcileStep { @Override diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppReconcileStep.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppReconcileStep.java index 14f6b995..91542ce3 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppReconcileStep.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppReconcileStep.java @@ -26,7 +26,6 @@ import java.time.Duration; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; import io.fabric8.kubernetes.api.model.Pod; @@ -56,7 +55,7 @@ protected ReconcileProgress observeDriver( .map(o -> o.observe(driverPodOptional.get(), app.getSpec(), app.getStatus())) .filter(Optional::isPresent) .map(Optional::get) - .collect(Collectors.toList()); + .toList(); if (stateUpdates.isEmpty()) { return proceed(); } else { diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStep.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStep.java index 6d614ece..a47f08ff 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStep.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStep.java @@ -19,6 +19,7 @@ package org.apache.spark.k8s.operator.reconciler.reconcilesteps; +import static org.apache.spark.k8s.operator.Constants.*; import static org.apache.spark.k8s.operator.reconciler.ReconcileProgress.completeAndDefaultRequeue; import java.util.Collections; @@ -26,7 +27,6 @@ import io.fabric8.kubernetes.api.model.Pod; -import org.apache.spark.k8s.operator.Constants; import org.apache.spark.k8s.operator.context.SparkAppContext; import org.apache.spark.k8s.operator.reconciler.ReconcileProgress; import org.apache.spark.k8s.operator.reconciler.observers.AppDriverRunningObserver; @@ -51,17 +51,17 @@ public ReconcileProgress reconcile( || instanceConfig.getInitExecutors() == 0L || !prevStateSummary.isStarting() && instanceConfig.getMinExecutors() == 0L) { proposedStateSummary = ApplicationStateSummary.RunningHealthy; - stateMessage = Constants.RUNNING_HEALTHY_MESSAGE; + stateMessage = RUNNING_HEALTHY_MESSAGE; } else { Set executors = context.getExecutorsForApplication(); long runningExecutors = executors.stream().filter(PodUtils::isPodReady).count(); if (prevStateSummary.isStarting()) { if (runningExecutors >= instanceConfig.getInitExecutors()) { proposedStateSummary = ApplicationStateSummary.RunningHealthy; - stateMessage = Constants.RUNNING_HEALTHY_MESSAGE; + stateMessage = RUNNING_HEALTHY_MESSAGE; } else if (runningExecutors > 0L) { proposedStateSummary = ApplicationStateSummary.InitializedBelowThresholdExecutors; - stateMessage = Constants.INITIALIZED_WITH_BELOW_THRESHOLD_EXECUTORS_MESSAGE; + stateMessage = INITIALIZED_WITH_BELOW_THRESHOLD_EXECUTORS_MESSAGE; } else { // keep previous state for 0 executor proposedStateSummary = prevStateSummary; @@ -69,10 +69,10 @@ public ReconcileProgress reconcile( } else { if (runningExecutors >= instanceConfig.getMinExecutors()) { proposedStateSummary = ApplicationStateSummary.RunningHealthy; - stateMessage = Constants.RUNNING_HEALTHY_MESSAGE; + stateMessage = RUNNING_HEALTHY_MESSAGE; } else { proposedStateSummary = ApplicationStateSummary.RunningWithBelowThresholdExecutors; - stateMessage = Constants.RUNNING_WITH_BELOW_THRESHOLD_EXECUTORS_MESSAGE; + stateMessage = RUNNING_WITH_BELOW_THRESHOLD_EXECUTORS_MESSAGE; } } } diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/UnknownStateStep.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppUnknownStateStep.java similarity index 96% rename from spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/UnknownStateStep.java rename to spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppUnknownStateStep.java index 0f6159d7..715c235a 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/UnknownStateStep.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppUnknownStateStep.java @@ -31,7 +31,7 @@ import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder; /** Abnormal state handler */ -public class UnknownStateStep extends AppReconcileStep { +public class AppUnknownStateStep extends AppReconcileStep { @Override public ReconcileProgress reconcile( SparkAppContext context, SparkAppStatusRecorder statusRecorder) { diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppValidateStep.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppValidateStep.java index b8f50123..86b62bb9 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppValidateStep.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppValidateStep.java @@ -21,19 +21,19 @@ import static org.apache.spark.k8s.operator.reconciler.ReconcileProgress.completeAndImmediateRequeue; import static org.apache.spark.k8s.operator.reconciler.ReconcileProgress.proceed; +import static org.apache.spark.k8s.operator.spec.DeploymentMode.ClientMode; import static org.apache.spark.k8s.operator.utils.SparkAppStatusUtils.isValidApplicationStatus; import lombok.extern.slf4j.Slf4j; import org.apache.spark.k8s.operator.context.SparkAppContext; import org.apache.spark.k8s.operator.reconciler.ReconcileProgress; -import org.apache.spark.k8s.operator.spec.DeploymentMode; import org.apache.spark.k8s.operator.status.ApplicationState; import org.apache.spark.k8s.operator.status.ApplicationStateSummary; import org.apache.spark.k8s.operator.status.ApplicationStatus; import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder; -/** Validates the submitted app. This can be re-factored into webhook in future. */ +/** Validates the submitted app. This can be re-factored into webhook in the future. */ @Slf4j public class AppValidateStep extends AppReconcileStep { @Override @@ -43,7 +43,7 @@ public ReconcileProgress reconcile( log.warn("Spark application found with empty status. Resetting to initial state."); statusRecorder.persistStatus(context, new ApplicationStatus()); } - if (DeploymentMode.ClientMode.equals(context.getResource().getSpec())) { + if (ClientMode.equals(context.getResource().getSpec())) { ApplicationState failure = new ApplicationState(ApplicationStateSummary.Failed, "Client mode is not supported yet."); statusRecorder.persistStatus(