Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.flink.runtime.client.ClientUtils;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
Expand All @@ -55,11 +54,9 @@
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
Expand Down Expand Up @@ -92,6 +89,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -119,6 +117,11 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {

private final Time rpcTimeout;

@GuardedBy("lock")
private final List<TaskExecutor> taskManagers;

private final TerminatingFatalErrorHandlerFactory taskManagerTerminatingFatalErrorHandlerFactory = new TerminatingFatalErrorHandlerFactory();

private CompletableFuture<Void> terminationFuture;

@GuardedBy("lock")
Expand All @@ -145,8 +148,6 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
@GuardedBy("lock")
private BlobCacheService blobCacheService;

private volatile TaskExecutor[] taskManagers;

@GuardedBy("lock")
private LeaderRetrievalService resourceManagerLeaderRetriever;

Expand All @@ -168,6 +169,9 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
@GuardedBy("lock")
private LeaderRetriever webMonitorLeaderRetriever;

@GuardedBy("lock")
private RpcServiceFactory taskManagerRpcServiceFactory;

/** Flag marking the mini cluster as started/running. */
private volatile boolean running;

Expand All @@ -186,6 +190,8 @@ public MiniCluster(MiniClusterConfiguration miniClusterConfiguration) {
this.rpcTimeout = Time.seconds(10L);
this.terminationFuture = CompletableFuture.completedFuture(null);
running = false;

this.taskManagers = new ArrayList<>(miniClusterConfiguration.getNumTaskManagers());
}

public CompletableFuture<URI> getRestAddress() {
Expand All @@ -209,6 +215,14 @@ public HighAvailabilityServices getHighAvailabilityServices() {
}
}

@VisibleForTesting
@Nonnull
protected Collection<DispatcherResourceManagerComponent<?>> getDispatcherResourceManagerComponents() {
synchronized (lock) {
return Collections.unmodifiableCollection(dispatcherResourceManagerComponents);
}
}

// ------------------------------------------------------------------------
// life cycle
// ------------------------------------------------------------------------
Expand All @@ -234,7 +248,6 @@ public void start() throws Exception {
LOG.debug("Using configuration {}", miniClusterConfiguration);

final Configuration configuration = miniClusterConfiguration.getConfiguration();
final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;

try {
Expand All @@ -248,7 +261,6 @@ public void start() throws Exception {

AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);

final RpcServiceFactory taskManagerRpcServiceFactory;
final RpcServiceFactory dispatcherResourceManagreComponentRpcServiceFactory;

if (useSingleRpcService) {
Expand Down Expand Up @@ -287,16 +299,7 @@ public void start() throws Exception {
configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort())
);

// bring up the TaskManager(s) for the mini cluster
LOG.info("Starting {} TaskManger(s)", numTaskManagers);
taskManagers = startTaskManagers(
configuration,
haServices,
heartbeatServices,
metricRegistry,
blobCacheService,
numTaskManagers,
taskManagerRpcServiceFactory);
startTaskManagers();

MetricQueryServiceRetriever metricQueryServiceRetriever = new AkkaQueryServiceRetriever(
metricQueryServiceActorSystem,
Expand Down Expand Up @@ -410,14 +413,7 @@ public CompletableFuture<Void> closeAsync() {
final int numComponents = 2 + miniClusterConfiguration.getNumTaskManagers();
final Collection<CompletableFuture<Void>> componentTerminationFutures = new ArrayList<>(numComponents);

if (taskManagers != null) {
for (TaskExecutor tm : taskManagers) {
if (tm != null) {
componentTerminationFutures.add(tm.closeAsync());
}
}
taskManagers = null;
}
componentTerminationFutures.addAll(terminateTaskExecutors());

componentTerminationFutures.add(shutDownResourceManagerComponents());

Expand Down Expand Up @@ -470,6 +466,62 @@ private CompletableFuture<Void> closeMetricSystem() {
}
}

@GuardedBy("lock")
private void startTaskManagers() throws Exception {
final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();

LOG.info("Starting {} TaskManger(s)", numTaskManagers);

for (int i = 0; i < numTaskManagers; i++) {
startTaskExecutor();
}
}

@VisibleForTesting
void startTaskExecutor() throws Exception {
synchronized (lock) {
final Configuration configuration = miniClusterConfiguration.getConfiguration();

final TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager(
configuration,
new ResourceID(UUID.randomUUID().toString()),
taskManagerRpcServiceFactory.createRpcService(),
haServices,
heartbeatServices,
metricRegistry,
blobCacheService,
useLocalCommunication(),
taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size()));

taskExecutor.start();
taskManagers.add(taskExecutor);
}
}

@VisibleForTesting
protected boolean useLocalCommunication() {
return miniClusterConfiguration.getNumTaskManagers() == 1;
}

@GuardedBy("lock")
private Collection<? extends CompletableFuture<Void>> terminateTaskExecutors() {
final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(taskManagers.size());
for (int i = 0; i < taskManagers.size(); i++) {
terminationFutures.add(terminateTaskExecutor(i));
}

return terminationFutures;
}

@VisibleForTesting
@Nonnull
protected CompletableFuture<Void> terminateTaskExecutor(int index) {
synchronized (lock) {
final TaskExecutor taskExecutor = taskManagers.get(index);
return taskExecutor.closeAsync();
}
}

// ------------------------------------------------------------------------
// Accessing jobs
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -670,69 +722,6 @@ protected RpcService createRpcService(
return new AkkaRpcService(actorSystem, akkaRpcServiceConfig);
}

protected ResourceManagerRunner startResourceManager(
Configuration configuration,
HighAvailabilityServices haServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
RpcService resourceManagerRpcService,
ClusterInformation clusterInformation,
JobManagerMetricGroup jobManagerMetricGroup) throws Exception {

final ResourceManagerRunner resourceManagerRunner = new ResourceManagerRunner(
ResourceID.generate(),
FlinkResourceManager.RESOURCE_MANAGER_NAME + '_' + UUID.randomUUID(),
configuration,
resourceManagerRpcService,
haServices,
heartbeatServices,
metricRegistry,
clusterInformation,
jobManagerMetricGroup);

resourceManagerRunner.start();

return resourceManagerRunner;
}

protected TaskExecutor[] startTaskManagers(
Configuration configuration,
HighAvailabilityServices haServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
BlobCacheService blobCacheService,
int numTaskManagers,
RpcServiceFactory rpcServiceFactory) throws Exception {

final TaskExecutor[] taskExecutors = new TaskExecutor[numTaskManagers];
final boolean localCommunication = numTaskManagers == 1;

for (int i = 0; i < numTaskManagers; i++) {
taskExecutors[i] = TaskManagerRunner.startTaskManager(
configuration,
new ResourceID(UUID.randomUUID().toString()),
rpcServiceFactory.createRpcService(),
haServices,
heartbeatServices,
metricRegistry,
blobCacheService,
localCommunication,
new TerminatingFatalErrorHandler(i));

taskExecutors[i].start();
}

return taskExecutors;
}

@VisibleForTesting
@Nonnull
protected Collection<DispatcherResourceManagerComponent<?>> getDispatcherResourceManagerComponents() {
synchronized (lock) {
return Collections.unmodifiableCollection(dispatcherResourceManagerComponents);
}
}

// ------------------------------------------------------------------------
// Internal methods
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -924,13 +913,8 @@ public void onFatalError(Throwable exception) {
if (running) {
LOG.error("TaskManager #{} failed.", index, exception);

// let's check if there are still TaskManagers because there could be a concurrent
// shut down operation taking place
TaskExecutor[] currentTaskManagers = taskManagers;

if (currentTaskManagers != null) {
// the shutDown is asynchronous
currentTaskManagers[index].closeAsync();
synchronized (lock) {
taskManagers.get(index).closeAsync();
}
}
}
Expand All @@ -944,4 +928,19 @@ public void onFatalError(Throwable exception) {
closeAsync();
}
}

private class TerminatingFatalErrorHandlerFactory {

/**
* Create a new {@link TerminatingFatalErrorHandler} for the {@link TaskExecutor} with
* the given index.
*
* @param index into the {@link #taskManagers} collection to identify the correct {@link TaskExecutor}.
* @return {@link TerminatingFatalErrorHandler} for the given index
*/
@GuardedBy("lock")
private TerminatingFatalErrorHandler create(int index) {
return new TerminatingFatalErrorHandler(index);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void testConstraintsAfterRestart() throws Exception {
eg.scheduleForExecution();
});

Predicate<Execution> isDeploying = ExecutionGraphTestUtils.isInExecutionState(ExecutionState.DEPLOYING);
Predicate<AccessExecution> isDeploying = ExecutionGraphTestUtils.isInExecutionState(ExecutionState.DEPLOYING);
ExecutionGraphTestUtils.waitForAllExecutionsPredicate(
eg,
isDeploying,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,23 +197,14 @@ public static void waitUntilExecutionVertexState(ExecutionVertex executionVertex
*/
public static void waitForAllExecutionsPredicate(
ExecutionGraph executionGraph,
Predicate<Execution> executionPredicate,
Predicate<AccessExecution> executionPredicate,
long maxWaitMillis) throws TimeoutException {
final Iterable<ExecutionVertex> allExecutionVertices = executionGraph.getAllExecutionVertices();

final Predicate<AccessExecutionGraph> allExecutionsPredicate = allExecutionsPredicate(executionPredicate);
final Deadline deadline = Deadline.fromNow(Duration.ofMillis(maxWaitMillis));
boolean predicateResult;

do {
predicateResult = true;
for (ExecutionVertex executionVertex : allExecutionVertices) {
final Execution currentExecution = executionVertex.getCurrentExecutionAttempt();

if (currentExecution == null || !executionPredicate.test(currentExecution)) {
predicateResult = false;
break;
}
}
predicateResult = allExecutionsPredicate.test(executionGraph);

if (!predicateResult) {
try {
Expand All @@ -229,13 +220,29 @@ public static void waitForAllExecutionsPredicate(
}
}

public static Predicate<AccessExecutionGraph> allExecutionsPredicate(final Predicate<AccessExecution> executionPredicate) {
return accessExecutionGraph -> {
final Iterable<? extends AccessExecutionVertex> allExecutionVertices = accessExecutionGraph.getAllExecutionVertices();

for (AccessExecutionVertex executionVertex : allExecutionVertices) {
final AccessExecution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();

if (currentExecutionAttempt == null || !executionPredicate.test(currentExecutionAttempt)) {
return false;
}
}

return true;
};
}

/**
* Predicate which is true if the given {@link Execution} has a resource assigned.
*/
static final Predicate<Execution> hasResourceAssigned = (Execution execution) -> execution.getAssignedResource() != null;

static Predicate<Execution> isInExecutionState(ExecutionState executionState) {
return (Execution execution) -> execution.getState() == executionState;
public static Predicate<AccessExecution> isInExecutionState(ExecutionState executionState) {
return (AccessExecution execution) -> execution.getState() == executionState;
}

public static void waitUntilFailoverRegionState(FailoverRegion region, JobStatus status, long maxWaitMillis)
Expand Down
Loading