Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,73 @@

package org.apache.flink.runtime.concurrent;

import javax.annotation.Nonnull;
import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;

import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Adapter class for a {@link ScheduledExecutorService} which shall be used as a
* Adapter class for a {@link ScheduledExecutorService} or {@link ScheduledExecutor} which shall be used as a
* {@link ComponentMainThreadExecutor}. It enhances the given executor with an assert that the current thread is the
* main thread of the executor.
*/
public class ComponentMainThreadExecutorServiceAdapter
extends ScheduledExecutorServiceAdapter implements ComponentMainThreadExecutor {
public class ComponentMainThreadExecutorServiceAdapter implements ComponentMainThreadExecutor {

private final ScheduledExecutor scheduledExecutor;

/** A runnable that should assert that the current thread is the expected main thread. */
@Nonnull
private final Runnable mainThreadCheck;

public ComponentMainThreadExecutorServiceAdapter(
@Nonnull ScheduledExecutorService scheduledExecutorService,
@Nonnull Runnable mainThreadCheck) {
super(scheduledExecutorService);
this.mainThreadCheck = mainThreadCheck;
final ScheduledExecutorService scheduledExecutorService,
final Runnable mainThreadCheck) {
this(new ScheduledExecutorServiceAdapter(scheduledExecutorService), mainThreadCheck);
}

public ComponentMainThreadExecutorServiceAdapter(
final ScheduledExecutor scheduledExecutorService,
final Thread mainThread) {
this(scheduledExecutorService, () -> MainThreadValidatorUtil.isRunningInExpectedThread(mainThread));
}

private ComponentMainThreadExecutorServiceAdapter(
final ScheduledExecutor scheduledExecutor,
final Runnable mainThreadCheck) {
this.scheduledExecutor = checkNotNull(scheduledExecutor);
this.mainThreadCheck = checkNotNull(mainThreadCheck);
}

@Override
public void assertRunningInMainThread() {
mainThreadCheck.run();
}

@Override
public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
return scheduledExecutor.schedule(command, delay, unit);
}

@Override
public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) {
return scheduledExecutor.schedule(callable, delay, unit);
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) {
return scheduledExecutor.scheduleAtFixedRate(command, initialDelay, period, unit);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) {
return scheduledExecutor.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}

@Override
public void execute(final Runnable command) {
scheduledExecutor.execute(command);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
Expand All @@ -65,9 +64,6 @@
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.scheduler.adapter.ExecutionGraphToSchedulingTopologyAdapter;
Expand Down Expand Up @@ -108,16 +104,13 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
Expand Down Expand Up @@ -952,11 +945,11 @@ public void scheduleForExecution() throws JobException {
switch (scheduleMode) {

case LAZY_FROM_SOURCES:
newSchedulingFuture = scheduleLazy(slotProvider);
newSchedulingFuture = scheduleLazy();
break;

case EAGER:
newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout);
newSchedulingFuture = scheduleEager();
break;

default:
Expand Down Expand Up @@ -985,123 +978,16 @@ public void scheduleForExecution() throws JobException {
}
}

private CompletableFuture<Void> scheduleLazy(SlotProvider slotProvider) {

final ArrayList<CompletableFuture<Void>> schedulingFutures = new ArrayList<>(numVerticesTotal);
// simply take the vertices without inputs.
for (ExecutionJobVertex ejv : verticesInCreationOrder) {
if (ejv.getJobVertex().isInputVertex()) {
final CompletableFuture<Void> schedulingJobVertexFuture = ejv.scheduleAll(
slotProvider,
allowQueuedScheduling,
LocationPreferenceConstraint.ALL, // since it is an input vertex, the input based location preferences should be empty
Collections.emptySet());

schedulingFutures.add(schedulingJobVertexFuture);
}
}

return FutureUtils.waitForAll(schedulingFutures);
private CompletableFuture<Void> scheduleLazy() {
return SchedulingUtils.scheduleLazy(getAllExecutionVertices(), this);
}

/**
*
*
* @param slotProvider The resource provider from which the slots are allocated
* @param timeout The maximum time that the deployment may take, before a
* TimeoutException is thrown.
* @return Future which is completed once the {@link ExecutionGraph} has been scheduled.
* The future can also be completed exceptionally if an error happened.
*/
private CompletableFuture<Void> scheduleEager(SlotProvider slotProvider, final Time timeout) {
assertRunningInJobMasterMainThread();
checkState(state == JobStatus.RUNNING, "job is not running currently");

// Important: reserve all the space we need up front.
// that way we do not have any operation that can fail between allocating the slots
// and adding them to the list. If we had a failure in between there, that would
// cause the slots to get lost
final boolean queued = allowQueuedScheduling;

// collecting all the slots may resize and fail in that operation without slots getting lost
final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());

final Set<AllocationID> allPreviousAllocationIds =
Collections.unmodifiableSet(computeAllPriorAllocationIdsIfRequiredByScheduling());

// allocate the slots (obtain all their futures
for (ExecutionJobVertex ejv : getVerticesTopologically()) {
// these calls are not blocking, they only return futures
Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll(
slotProvider,
queued,
LocationPreferenceConstraint.ALL,
allPreviousAllocationIds,
timeout);

allAllocationFutures.addAll(allocationFutures);
}

// this future is complete once all slot futures are complete.
// the future fails once one slot future fails.
final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);

return allAllocationsFuture.thenAccept(
(Collection<Execution> executionsToDeploy) -> {
for (Execution execution : executionsToDeploy) {
try {
execution.deploy();
} catch (Throwable t) {
throw new CompletionException(
new FlinkException(
String.format("Could not deploy execution %s.", execution),
t));
}
}
})
// Generate a more specific failure message for the eager scheduling
.exceptionally(
(Throwable throwable) -> {
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
final Throwable resultThrowable;
if (strippedThrowable instanceof TimeoutException) {
int numTotal = allAllocationsFuture.getNumFuturesTotal();
int numComplete = allAllocationsFuture.getNumFuturesCompleted();

String message = "Could not allocate all requires slots within timeout of " +
timeout + ". Slots required: " + numTotal + ", slots allocated: " + numComplete +
", previous allocation IDs: " + allPreviousAllocationIds;

StringBuilder executionMessageBuilder = new StringBuilder();

for (int i = 0; i < allAllocationFutures.size(); i++) {
CompletableFuture<Execution> executionFuture = allAllocationFutures.get(i);

try {
Execution execution = executionFuture.getNow(null);
if (execution != null) {
executionMessageBuilder.append("completed: " + execution);
} else {
executionMessageBuilder.append("incomplete: " + executionFuture);
}
} catch (CompletionException completionException) {
executionMessageBuilder.append("completed exceptionally: " + completionException + "/" + executionFuture);
}

if (i < allAllocationFutures.size() - 1) {
executionMessageBuilder.append(", ");
}
}

message += ", execution status: " + executionMessageBuilder.toString();

resultThrowable = new NoResourceAvailableException(message);
} else {
resultThrowable = strippedThrowable;
}

throw new CompletionException(resultThrowable);
});
private CompletableFuture<Void> scheduleEager() {
return SchedulingUtils.scheduleEager(getAllExecutionVertices(), this);
}

public void cancel() {
Expand Down Expand Up @@ -1414,7 +1300,7 @@ public FailoverStrategy getFailoverStrategy() {
* and is used to disambiguate concurrent modifications between local and global
* failover actions.
*/
long getGlobalModVersion() {
public long getGlobalModVersion() {
return globalModVersion;
}

Expand Down Expand Up @@ -1808,34 +1694,6 @@ public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
}
}

/**
* Computes and returns a set with the prior allocation ids from all execution vertices in the graph.
*/
private Set<AllocationID> computeAllPriorAllocationIds() {
HashSet<AllocationID> allPreviousAllocationIds = new HashSet<>(getNumberOfExecutionJobVertices());
for (ExecutionVertex executionVertex : getAllExecutionVertices()) {
AllocationID latestPriorAllocation = executionVertex.getLatestPriorAllocation();
if (latestPriorAllocation != null) {
allPreviousAllocationIds.add(latestPriorAllocation);
}
}
return allPreviousAllocationIds;
}

/**
* Returns the result of {@link #computeAllPriorAllocationIds()}, but only if the scheduling really requires it.
* Otherwise this method simply returns an empty set.
*/
private Set<AllocationID> computeAllPriorAllocationIdsIfRequiredByScheduling() {
// This is a temporary optimization to avoid computing all previous allocations if not required
// This can go away when we progress with the implementation of the Scheduler.
if (slotProvider instanceof Scheduler && ((Scheduler) slotProvider).requiresPreviousExecutionGraphAllocations()) {
return computeAllPriorAllocationIds();
} else {
return Collections.emptySet();
}
}

// --------------------------------------------------------------------------------------------
// Listeners & Observers
// --------------------------------------------------------------------------------------------
Expand Down
Loading