Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/_includes/generated/shuffle_service_configuration.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 65%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>shuffle-service-factory.class</h5></td>
<td style="word-wrap: break-word;">"org.apache.flink.runtime.io.network.NettyShuffleServiceFactory"</td>
<td>The full class name of the shuffle service factory implementation to be used by the cluster. The default implementation uses Netty for network communication and local memory as well disk space to store results on a TaskExecutor.</td>
</tr>
</tbody>
</table>
4 changes: 4 additions & 0 deletions docs/ops/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ The configuration keys in this section are independent of the used resource mana

{% include generated/resource_manager_configuration.html %}

### Shuffle Service

{% include generated/shuffle_service_configuration.html %}

### YARN

{% include generated/yarn_config_configuration.html %}
Expand Down
4 changes: 4 additions & 0 deletions docs/ops/config.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ The configuration keys in this section are independent of the used resource mana

{% include generated/resource_manager_configuration.html %}

### Shuffle Service

{% include generated/shuffle_service_configuration.html %}

### YARN

{% include generated/yarn_config_configuration.html %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,13 +338,23 @@ private static ObjectStreamClass getEquivalentSerializer(String classDescriptorN
* @param classLoader to use for loading the class
* @param <T> type of the instantiated class
* @return Instance of the given class name
* @throws ClassNotFoundException if the class could not be found
* @throws FlinkException if the class could not be found
*/
public static <T> T instantiate(final String className, final Class<T> targetType, final ClassLoader classLoader) throws ClassNotFoundException {
final Class<? extends T> clazz = Class.forName(
className,
false,
classLoader).asSubclass(targetType);
public static <T> T instantiate(final String className, final Class<T> targetType, final ClassLoader classLoader) throws FlinkException {
final Class<? extends T> clazz;
try {
clazz = Class.forName(
className,
false,
classLoader).asSubclass(targetType);
} catch (ClassNotFoundException e) {
throw new FlinkException(
String.format(
"Could not instantiate class '%s' of type '%s'. Please make sure that this class is on your class path.",
className,
targetType.getName()),
e);
}

return instantiate(clazz);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class ConfigOptionsDocGenerator {

static final OptionsClassLocation[] LOCATIONS = new OptionsClassLocation[]{
new OptionsClassLocation("flink-core", "org.apache.flink.configuration"),
new OptionsClassLocation("flink-runtime", "org.apache.flink.runtime.shuffle"),
new OptionsClassLocation("flink-yarn", "org.apache.flink.yarn.configuration"),
new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.configuration"),
new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.runtime.clusterframework"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleServiceLoader;

/**
* Singleton default factory for {@link JobManagerRunner}.
Expand All @@ -58,6 +60,7 @@ public JobManagerRunner createJobManagerRunner(
final SlotPoolFactory slotPoolFactory = DefaultSlotPoolFactory.fromConfiguration(configuration);
final SchedulerFactory schedulerFactory = DefaultSchedulerFactory.fromConfiguration(configuration);
final SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration, jobManagerServices.getRestartStrategyFactory());
final ShuffleMaster<?> shuffleMaster = ShuffleServiceLoader.loadShuffleServiceFactory(configuration).createShuffleMaster(configuration);

final JobMasterServiceFactory jobMasterFactory = new DefaultJobMasterServiceFactory(
jobMasterConfiguration,
Expand All @@ -69,7 +72,8 @@ public JobManagerRunner createJobManagerRunner(
heartbeatServices,
jobManagerJobMetricGroupFactory,
fatalErrorHandler,
schedulerNGFactory);
schedulerNGFactory,
shuffleMaster);

return new JobManagerRunner(
jobGraph,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
private String jsonPlan;

/** Shuffle master to register partitions for task deployment. */
private final ShuffleMaster<?> shuffleMaster = NettyShuffleMaster.INSTANCE;
private final ShuffleMaster<?> shuffleMaster;

// --------------------------------------------------------------------------------------------
// Constructors
Expand Down Expand Up @@ -405,7 +405,8 @@ public ExecutionGraph(
slotProvider,
userClassLoader,
blobWriter,
allocationTimeout);
allocationTimeout,
NettyShuffleMaster.INSTANCE);
}

public ExecutionGraph(
Expand All @@ -419,7 +420,8 @@ public ExecutionGraph(
SlotProvider slotProvider,
ClassLoader userClassLoader,
BlobWriter blobWriter,
Time allocationTimeout) throws IOException {
Time allocationTimeout,
ShuffleMaster<?> shuffleMaster) throws IOException {

checkNotNull(futureExecutor);

Expand Down Expand Up @@ -467,6 +469,8 @@ public ExecutionGraph(
"ExecutionGraph is not initialized with proper main thread executor. " +
"Call to ExecutionGraph.start(...) required.");

this.shuffleMaster = checkNotNull(shuffleMaster);

LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.util.DynamicCodeLoadingException;
Expand Down Expand Up @@ -92,8 +93,8 @@ public static ExecutionGraph buildGraph(
MetricGroup metrics,
BlobWriter blobWriter,
Time allocationTimeout,
Logger log)
throws JobExecutionException, JobException {
Logger log,
ShuffleMaster<?> shuffleMaster) throws JobExecutionException, JobException {

checkNotNull(jobGraph, "job graph cannot be null");

Expand Down Expand Up @@ -129,7 +130,8 @@ public static ExecutionGraph buildGraph(
slotProvider,
classLoader,
blobWriter,
allocationTimeout);
allocationTimeout,
shuffleMaster);
} catch (IOException e) {
throw new JobException("Could not create the ExecutionGraph.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,20 +164,10 @@ private static HighAvailabilityServices createCustomHAServices(Configuration con
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
final String haServicesClassName = config.getString(HighAvailabilityOptions.HA_MODE);

final HighAvailabilityServicesFactory highAvailabilityServicesFactory;

try {
highAvailabilityServicesFactory = InstantiationUtil.instantiate(
haServicesClassName,
HighAvailabilityServicesFactory.class,
classLoader);
} catch (Exception e) {
throw new FlinkException(
String.format(
"Could not instantiate the HighAvailabilityServicesFactory '%s'. Please make sure that this class is on your class path.",
haServicesClassName),
e);
}
final HighAvailabilityServicesFactory highAvailabilityServicesFactory = InstantiationUtil.instantiate(
haServicesClassName,
HighAvailabilityServicesFactory.class,
classLoader);

try {
return highAvailabilityServicesFactory.createHAServices(config, executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@
package org.apache.flink.runtime.io.network;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.metrics.InputBufferPoolUsageGauge;
import org.apache.flink.runtime.io.network.metrics.InputBuffersGauge;
Expand All @@ -35,8 +33,6 @@
import org.apache.flink.runtime.io.network.metrics.OutputBufferPoolUsageGauge;
import org.apache.flink.runtime.io.network.metrics.OutputBuffersGauge;
import org.apache.flink.runtime.io.network.metrics.ResultPartitionMetrics;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionFactory;
Expand All @@ -50,7 +46,6 @@
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
import org.apache.flink.util.Preconditions;

Expand All @@ -65,7 +60,6 @@
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* The implementation of {@link ShuffleEnvironment} based on netty network communication, local memory and disk files.
Expand All @@ -76,18 +70,14 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti

private static final Logger LOG = LoggerFactory.getLogger(NettyShuffleEnvironment.class);

private static final String METRIC_GROUP_NETWORK = "Network";
private static final String METRIC_TOTAL_MEMORY_SEGMENT = "TotalMemorySegments";
private static final String METRIC_AVAILABLE_MEMORY_SEGMENT = "AvailableMemorySegments";

private static final String METRIC_OUTPUT_QUEUE_LENGTH = "outputQueueLength";
private static final String METRIC_OUTPUT_POOL_USAGE = "outPoolUsage";
private static final String METRIC_INPUT_QUEUE_LENGTH = "inputQueueLength";
private static final String METRIC_INPUT_POOL_USAGE = "inPoolUsage";

private final Object lock = new Object();

private final ResourceID taskExecutorLocation;
private final ResourceID taskExecutorResourceId;

private final NettyShuffleEnvironmentConfiguration config;

Expand All @@ -105,87 +95,25 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti

private boolean isClosed;

private NettyShuffleEnvironment(
ResourceID taskExecutorLocation,
NettyShuffleEnvironment(
ResourceID taskExecutorResourceId,
NettyShuffleEnvironmentConfiguration config,
NetworkBufferPool networkBufferPool,
ConnectionManager connectionManager,
ResultPartitionManager resultPartitionManager,
ResultPartitionFactory resultPartitionFactory,
SingleInputGateFactory singleInputGateFactory) {
this.taskExecutorLocation = taskExecutorLocation;
this.taskExecutorResourceId = taskExecutorResourceId;
this.config = config;
this.networkBufferPool = networkBufferPool;
this.connectionManager = connectionManager;
this.resultPartitionManager = resultPartitionManager;
this.inputGatesById = new ConcurrentHashMap<>();
this.inputGatesById = new ConcurrentHashMap<>(10);
this.resultPartitionFactory = resultPartitionFactory;
this.singleInputGateFactory = singleInputGateFactory;
this.isClosed = false;
}

public static NettyShuffleEnvironment create(
NettyShuffleEnvironmentConfiguration config,
ResourceID taskExecutorLocation,
TaskEventPublisher taskEventPublisher,
MetricGroup metricGroup,
IOManager ioManager) {
checkNotNull(taskExecutorLocation);
checkNotNull(ioManager);
checkNotNull(taskEventPublisher);
checkNotNull(config);

NettyConfig nettyConfig = config.nettyConfig();

ResultPartitionManager resultPartitionManager = new ResultPartitionManager();

ConnectionManager connectionManager = nettyConfig != null ?
new NettyConnectionManager(resultPartitionManager, taskEventPublisher, nettyConfig, config.isCreditBased()) :
new LocalConnectionManager();

NetworkBufferPool networkBufferPool = new NetworkBufferPool(
config.numNetworkBuffers(),
config.networkBufferSize(),
config.networkBuffersPerChannel());

registerNetworkMetrics(metricGroup, networkBufferPool);

ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory(
resultPartitionManager,
ioManager,
networkBufferPool,
config.networkBuffersPerChannel(),
config.floatingNetworkBuffersPerGate(),
config.isForcePartitionReleaseOnConsumption());

SingleInputGateFactory singleInputGateFactory = new SingleInputGateFactory(
taskExecutorLocation,
config,
connectionManager,
resultPartitionManager,
taskEventPublisher,
networkBufferPool);

return new NettyShuffleEnvironment(
taskExecutorLocation,
config,
networkBufferPool,
connectionManager,
resultPartitionManager,
resultPartitionFactory,
singleInputGateFactory);
}

private static void registerNetworkMetrics(MetricGroup metricGroup, NetworkBufferPool networkBufferPool) {
checkNotNull(metricGroup);

MetricGroup networkGroup = metricGroup.addGroup(METRIC_GROUP_NETWORK);
networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT,
networkBufferPool::getTotalNumberOfMemorySegments);
networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT,
networkBufferPool::getNumberOfAvailableMemorySegments);
}

// --------------------------------------------------------------------------------------------
// Properties
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -320,7 +248,7 @@ public boolean updatePartitionInfo(
checkArgument(shuffleDescriptor instanceof NettyShuffleDescriptor,
"Tried to update unknown channel with unknown ShuffleDescriptor %s.",
shuffleDescriptor.getClass().getName());
inputGate.updateInputChannel(taskExecutorLocation, (NettyShuffleDescriptor) shuffleDescriptor);
inputGate.updateInputChannel(taskExecutorResourceId, (NettyShuffleDescriptor) shuffleDescriptor);
return true;
}

Expand Down Expand Up @@ -358,6 +286,7 @@ public void close() {
LOG.info("Shutting down the network environment and its components.");

// terminate all network connections
//noinspection OverlyBroadCatchBlock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's revert this.

try {
LOG.debug("Shutting down network connection manager");
connectionManager.shutdown();
Expand Down Expand Up @@ -395,18 +324,4 @@ public boolean isClosed() {
return isClosed;
}
}

public static NettyShuffleEnvironment fromShuffleContext(ShuffleEnvironmentContext shuffleEnvironmentContext) {
NettyShuffleEnvironmentConfiguration networkConfig = NettyShuffleEnvironmentConfiguration.fromConfiguration(
shuffleEnvironmentContext.getConfiguration(),
shuffleEnvironmentContext.getMaxJvmHeapMemory(),
shuffleEnvironmentContext.isLocalCommunicationOnly(),
shuffleEnvironmentContext.getHostAddress());
return create(
networkConfig,
shuffleEnvironmentContext.getLocation(),
shuffleEnvironmentContext.getEventPublisher(),
shuffleEnvironmentContext.getParentMetricGroup(),
shuffleEnvironmentContext.getIOManager());
}
}
Loading