Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.java.BatchTableEnvironment;
Expand Down Expand Up @@ -362,7 +362,7 @@ private abstract static class LimitNetworkBuffersTestEnvironment extends Executi
public static void setAsContext() {
Configuration config = new Configuration();
// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
config.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
final LocalEnvironment le = new LocalEnvironment(config);

initializeContextEnvironment(new ExecutionEnvironmentFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,15 @@ public final class ConfigConstants {
public static final String TASK_MANAGER_IPC_PORT_KEY = "taskmanager.rpc.port";

/**
* @deprecated use {@link NetworkEnvironmentOptions#DATA_PORT} instead
* @deprecated use {@link NettyShuffleEnvironmentOptions#DATA_PORT} instead
*/
@Deprecated
public static final String TASK_MANAGER_DATA_PORT_KEY = "taskmanager.data.port";

/**
* Config parameter to override SSL support for taskmanager's data transport.
*
* @deprecated use {@link NetworkEnvironmentOptions#DATA_SSL_ENABLED} instead
* @deprecated use {@link NettyShuffleEnvironmentOptions#DATA_SSL_ENABLED} instead
*/
@Deprecated
public static final String TASK_MANAGER_DATA_SSL_ENABLED = "taskmanager.data.ssl.enabled";
Expand Down Expand Up @@ -270,7 +270,7 @@ public final class ConfigConstants {
* The config parameter defining the number of buffers used in the network stack. This defines the
* number of possible tasks and shuffles.
*
* @deprecated Use {@link NetworkEnvironmentOptions#NETWORK_NUM_BUFFERS} instead
* @deprecated Use {@link NettyShuffleEnvironmentOptions#NETWORK_NUM_BUFFERS} instead
*/
@Deprecated
public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers";
Expand Down Expand Up @@ -1392,15 +1392,15 @@ public final class ConfigConstants {
* The default network port the task manager expects to receive transfer envelopes on. The {@code 0} means that
* the TaskManager searches for a free port.
*
* @deprecated use {@link NetworkEnvironmentOptions#DATA_PORT} instead
* @deprecated use {@link NettyShuffleEnvironmentOptions#DATA_PORT} instead
*/
@Deprecated
public static final int DEFAULT_TASK_MANAGER_DATA_PORT = 0;

/**
* The default value to override ssl support for task manager's data transport.
*
* @deprecated use {@link NetworkEnvironmentOptions#DATA_SSL_ENABLED} instead
* @deprecated use {@link NettyShuffleEnvironmentOptions#DATA_SSL_ENABLED} instead
*/
@Deprecated
public static final boolean DEFAULT_TASK_MANAGER_DATA_SSL_ENABLED = true;
Expand All @@ -1424,7 +1424,7 @@ public final class ConfigConstants {
/**
* Config key has been deprecated. Therefore, no default value required.
*
* @deprecated {@link NetworkEnvironmentOptions#NETWORK_NUM_BUFFERS} provides the default value now
* @deprecated {@link NettyShuffleEnvironmentOptions#NETWORK_NUM_BUFFERS} provides the default value now
*/
@Deprecated
public static final int DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS = 2048;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*/
@PublicEvolving
@ConfigGroups(groups = @ConfigGroup(name = "NetworkNetty", keyPrefix = "taskmanager.network.netty"))
public class NetworkEnvironmentOptions {
public class NettyShuffleEnvironmentOptions {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this renaming strictly necessary? If yes, then we need to add a release not that this class has been renamed since it's PublicEvolving. What we could also do is to add class NetworkEnvironmentOptions extends NettyShuffleEnvironmentOptions and deprecate this class. That way it would not break backwards compatibility.

Copy link
Contributor Author

@azagrebin azagrebin Jun 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class has been recently introduced. The options belong to TaskManagerOptions before 1.9 and have been deprecated to become part of netty shuffle configuration group.


// ------------------------------------------------------------------------
// Network General Options
Expand Down Expand Up @@ -212,5 +212,5 @@ public class NetworkEnvironmentOptions {
// ------------------------------------------------------------------------

/** Not intended to be instantiated. */
private NetworkEnvironmentOptions() {}
private NettyShuffleEnvironmentOptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.TestLogger;

Expand Down Expand Up @@ -71,7 +71,7 @@ public void checkOperatingSystem() {
}

/**
* Tests that {@link NetworkEnvironmentConfiguration#calculateNetworkBufferMemory(long, Configuration)}
* Tests that {@link NettyShuffleEnvironmentConfiguration#calculateNetworkBufferMemory(long, Configuration)}
* has the same result as the shell script.
*/
@Test
Expand Down Expand Up @@ -159,9 +159,9 @@ private static Configuration getConfig(
config.setLong(KEY_TASKM_MEM_SIZE, javaMemMB);
config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, useOffHeap);

config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, netBufMemFrac);
config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(netBufMemMin));
config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(netBufMemMax));
config.setFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, netBufMemFrac);
config.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(netBufMemMin));
config.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(netBufMemMax));

if (managedMemSizeMB == 0) {
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
Expand Down Expand Up @@ -202,7 +202,7 @@ private static Configuration getRandomConfig(final Random ran) {
Configuration config = getConfig(javaMemMB, useOffHeap, frac, min, max, managedMemSize, managedMemFrac);
long totalJavaMemorySize = ((long) javaMemMB) << 20; // megabytes to bytes
final int networkBufMB =
(int) (NetworkEnvironmentConfiguration.calculateNetworkBufferMemory(totalJavaMemorySize, config) >> 20);
(int) (NettyShuffleEnvironmentConfiguration.calculateNetworkBufferMemory(totalJavaMemorySize, config) >> 20);
// max (exclusive): total - netbuf
managedMemSize = Math.min(javaMemMB - networkBufMB - 1, ran.nextInt(Integer.MAX_VALUE));
} else {
Expand All @@ -228,14 +228,14 @@ private void compareNetworkBufJavaVsScript(final Configuration config, final flo

final long totalJavaMemorySizeMB = config.getLong(KEY_TASKM_MEM_SIZE, 0L);

long javaNetworkBufMem = NetworkEnvironmentConfiguration.calculateNetworkBufferMemory(
long javaNetworkBufMem = NettyShuffleEnvironmentConfiguration.calculateNetworkBufferMemory(
totalJavaMemorySizeMB << 20, config);

String[] command = {"src/test/bin/calcTMNetBufMem.sh",
totalJavaMemorySizeMB + "m",
String.valueOf(config.getFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN),
config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX)};
String.valueOf(config.getFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
config.getString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN),
config.getString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX)};

String scriptOutput = executeScript(command);

Expand Down Expand Up @@ -272,9 +272,9 @@ private void compareHeapSizeJavaVsScript(final Configuration config, float toler
String[] command = {"src/test/bin/calcTMHeapSizeMB.sh",
totalJavaMemorySizeMB + "m",
String.valueOf(config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)),
String.valueOf(config.getFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN),
config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX),
String.valueOf(config.getFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
config.getString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN),
config.getString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX),
config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE),
String.valueOf(config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION))};
String scriptOutput = executeScript(command);
Expand Down
Loading