Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.spark.k8s.operator.reconciler;

import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_APPLICATION_NAME;
import static org.apache.spark.k8s.operator.reconciler.ReconcileProgress.completeAndDefaultRequeue;
import static org.apache.spark.k8s.operator.utils.Utils.commonResourceLabelsStr;

Expand All @@ -45,7 +46,6 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.spark.k8s.operator.Constants;
import org.apache.spark.k8s.operator.SparkAppSubmissionWorker;
import org.apache.spark.k8s.operator.SparkApplication;
import org.apache.spark.k8s.operator.context.SparkAppContext;
Expand All @@ -60,8 +60,8 @@
import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppResourceObserveStep;
import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppRunningStep;
import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppTerminatedStep;
import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppUnknownStateStep;
import org.apache.spark.k8s.operator.reconciler.reconcilesteps.AppValidateStep;
import org.apache.spark.k8s.operator.reconciler.reconcilesteps.UnknownStateStep;
import org.apache.spark.k8s.operator.utils.LoggingUtils;
import org.apache.spark.k8s.operator.utils.ReconcilerUtils;
import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;
Expand Down Expand Up @@ -140,8 +140,7 @@ public Map<String, EventSource> prepareEventSources(
EventSource podEventSource =
new InformerEventSource<>(
InformerConfiguration.from(Pod.class, context)
.withSecondaryToPrimaryMapper(
Mappers.fromLabel(Constants.LABEL_SPARK_APPLICATION_NAME))
.withSecondaryToPrimaryMapper(Mappers.fromLabel(LABEL_SPARK_APPLICATION_NAME))
.withLabelSelector(commonResourceLabelsStr())
.build(),
context);
Expand All @@ -153,42 +152,35 @@ protected List<AppReconcileStep> getReconcileSteps(final SparkApplication app) {
steps.add(new AppValidateStep());
steps.add(new AppTerminatedStep());
switch (app.getStatus().getCurrentState().getCurrentStateSummary()) {
case Submitted:
case ScheduledToRestart:
steps.add(new AppInitStep());
break;
case DriverRequested:
case DriverStarted:
case Submitted, ScheduledToRestart -> steps.add(new AppInitStep());
case DriverRequested, DriverStarted -> {
steps.add(
new AppResourceObserveStep(
List.of(new AppDriverStartObserver(), new AppDriverReadyObserver())));
steps.add(
new AppResourceObserveStep(Collections.singletonList(new AppDriverRunningObserver())));
steps.add(
new AppResourceObserveStep(Collections.singletonList(new AppDriverTimeoutObserver())));
break;
case DriverReady:
case InitializedBelowThresholdExecutors:
case RunningHealthy:
case RunningWithBelowThresholdExecutors:
}
case DriverReady,
InitializedBelowThresholdExecutors,
RunningHealthy,
RunningWithBelowThresholdExecutors -> {
steps.add(new AppRunningStep());
steps.add(
new AppResourceObserveStep(Collections.singletonList(new AppDriverRunningObserver())));
steps.add(
new AppResourceObserveStep(Collections.singletonList(new AppDriverTimeoutObserver())));
break;
case DriverReadyTimedOut:
case DriverStartTimedOut:
case ExecutorsStartTimedOut:
case Succeeded:
case DriverEvicted:
case Failed:
case SchedulingFailure:
steps.add(new AppCleanUpStep());
break;
default:
steps.add(new UnknownStateStep());
break;
}
case DriverReadyTimedOut,
DriverStartTimedOut,
ExecutorsStartTimedOut,
Succeeded,
DriverEvicted,
Failed,
SchedulingFailure ->
steps.add(new AppCleanUpStep());
default -> steps.add(new AppUnknownStateStep());
}
return steps;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.apache.spark.k8s.operator.utils.ReconcilerUtils;
import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;

/** Request all driver and driver resources when starting an attempt */
/** Request all driver and its resources when starting an attempt */
@Slf4j
public class AppInitStep extends AppReconcileStep {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import io.fabric8.kubernetes.api.model.Pod;

Expand Down Expand Up @@ -56,7 +55,7 @@ protected ReconcileProgress observeDriver(
.map(o -> o.observe(driverPodOptional.get(), app.getSpec(), app.getStatus()))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
.toList();
if (stateUpdates.isEmpty()) {
return proceed();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@

package org.apache.spark.k8s.operator.reconciler.reconcilesteps;

import static org.apache.spark.k8s.operator.Constants.*;
import static org.apache.spark.k8s.operator.reconciler.ReconcileProgress.completeAndDefaultRequeue;

import java.util.Collections;
import java.util.Set;

import io.fabric8.kubernetes.api.model.Pod;

import org.apache.spark.k8s.operator.Constants;
import org.apache.spark.k8s.operator.context.SparkAppContext;
import org.apache.spark.k8s.operator.reconciler.ReconcileProgress;
import org.apache.spark.k8s.operator.reconciler.observers.AppDriverRunningObserver;
Expand All @@ -51,28 +51,28 @@ public ReconcileProgress reconcile(
|| instanceConfig.getInitExecutors() == 0L
|| !prevStateSummary.isStarting() && instanceConfig.getMinExecutors() == 0L) {
proposedStateSummary = ApplicationStateSummary.RunningHealthy;
stateMessage = Constants.RUNNING_HEALTHY_MESSAGE;
stateMessage = RUNNING_HEALTHY_MESSAGE;
} else {
Set<Pod> executors = context.getExecutorsForApplication();
long runningExecutors = executors.stream().filter(PodUtils::isPodReady).count();
if (prevStateSummary.isStarting()) {
if (runningExecutors >= instanceConfig.getInitExecutors()) {
proposedStateSummary = ApplicationStateSummary.RunningHealthy;
stateMessage = Constants.RUNNING_HEALTHY_MESSAGE;
stateMessage = RUNNING_HEALTHY_MESSAGE;
} else if (runningExecutors > 0L) {
proposedStateSummary = ApplicationStateSummary.InitializedBelowThresholdExecutors;
stateMessage = Constants.INITIALIZED_WITH_BELOW_THRESHOLD_EXECUTORS_MESSAGE;
stateMessage = INITIALIZED_WITH_BELOW_THRESHOLD_EXECUTORS_MESSAGE;
} else {
// keep previous state for 0 executor
proposedStateSummary = prevStateSummary;
}
} else {
if (runningExecutors >= instanceConfig.getMinExecutors()) {
proposedStateSummary = ApplicationStateSummary.RunningHealthy;
stateMessage = Constants.RUNNING_HEALTHY_MESSAGE;
stateMessage = RUNNING_HEALTHY_MESSAGE;
} else {
proposedStateSummary = ApplicationStateSummary.RunningWithBelowThresholdExecutors;
stateMessage = Constants.RUNNING_WITH_BELOW_THRESHOLD_EXECUTORS_MESSAGE;
stateMessage = RUNNING_WITH_BELOW_THRESHOLD_EXECUTORS_MESSAGE;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;

/** Abnormal state handler */
public class UnknownStateStep extends AppReconcileStep {
public class AppUnknownStateStep extends AppReconcileStep {
@Override
public ReconcileProgress reconcile(
SparkAppContext context, SparkAppStatusRecorder statusRecorder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@

import static org.apache.spark.k8s.operator.reconciler.ReconcileProgress.completeAndImmediateRequeue;
import static org.apache.spark.k8s.operator.reconciler.ReconcileProgress.proceed;
import static org.apache.spark.k8s.operator.spec.DeploymentMode.ClientMode;
import static org.apache.spark.k8s.operator.utils.SparkAppStatusUtils.isValidApplicationStatus;

import lombok.extern.slf4j.Slf4j;

import org.apache.spark.k8s.operator.context.SparkAppContext;
import org.apache.spark.k8s.operator.reconciler.ReconcileProgress;
import org.apache.spark.k8s.operator.spec.DeploymentMode;
import org.apache.spark.k8s.operator.status.ApplicationState;
import org.apache.spark.k8s.operator.status.ApplicationStateSummary;
import org.apache.spark.k8s.operator.status.ApplicationStatus;
import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;

/** Validates the submitted app. This can be re-factored into webhook in future. */
/** Validates the submitted app. This can be re-factored into webhook in the future. */
@Slf4j
public class AppValidateStep extends AppReconcileStep {
@Override
Expand All @@ -43,7 +43,7 @@ public ReconcileProgress reconcile(
log.warn("Spark application found with empty status. Resetting to initial state.");
statusRecorder.persistStatus(context, new ApplicationStatus());
}
if (DeploymentMode.ClientMode.equals(context.getResource().getSpec())) {
if (ClientMode.equals(context.getResource().getSpec())) {
ApplicationState failure =
new ApplicationState(ApplicationStateSummary.Failed, "Client mode is not supported yet.");
statusRecorder.persistStatus(
Expand Down