From 85c2ce89368e9b8935ba2d52e9b6e5ce6610eaf0 Mon Sep 17 00:00:00 2001 From: Andrey Zagrebin Date: Thu, 30 May 2019 16:03:31 +0200 Subject: [PATCH 1/3] [hotfix][network] Add NetworkEnviroment.fromConfiguration factory method Move -> NetworkEnvironmentConfiguration parsing from TaskManagerServicesConfiguration to NetworkEnviroment.fromConfiguration(flinkConfiguration) factory method and use it to create NetworkEnviroment in TaskManagerServices. This way TaskManagerServices does not depend on network internals. Memory page size is additionally parsed for TaskManagerServicesConfiguration to decouple it from NetworkEnvironmentConfiguration because the page size is used also for memory manager outside of NetworkEnviroment. Theoretically shuffle implementations can have their own page size in future, different from TaskManagerOptions.MEMORY_SEGMENT_SIZE. --- .../io/network/NetworkEnvironment.java | 18 ++++++ .../taskexecutor/TaskManagerRunner.java | 8 +-- .../taskexecutor/TaskManagerServices.java | 19 ++++-- .../TaskManagerServicesConfiguration.java | 37 ++++------- .../NetworkEnvironmentConfiguration.java | 43 ++++--------- .../util/ConfigurationParserUtils.java | 25 ++++++++ ...skExecutorLocalStateStoresManagerTest.java | 12 ++-- .../NetworkEnvironmentConfigurationTest.java | 33 ++++++++++ .../TaskManagerServicesConfigurationTest.java | 64 ------------------- .../TaskSubmissionTestEnvironment.java | 3 +- .../StreamNetworkBenchmarkEnvironment.java | 3 +- 11 files changed, 127 insertions(+), 138 deletions(-) delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index c633a5fd83008..01c62ba0f628d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -57,6 +58,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.InetAddress; import java.util.Collection; import java.util.Map; import java.util.Optional; @@ -399,4 +401,20 @@ public boolean isShutdown() { return isShutdown; } } + + public static NetworkEnvironment fromConfiguration( + Configuration configuration, + TaskEventPublisher taskEventPublisher, + MetricGroup metricGroup, + IOManager ioManager, + long maxJvmHeapMemory, + boolean localTaskManagerCommunication, + InetAddress taskManagerAddress) { + final NetworkEnvironmentConfiguration networkConfig = NetworkEnvironmentConfiguration.fromConfiguration( + configuration, + maxJvmHeapMemory, + localTaskManagerCommunication, + taskManagerAddress); + return NetworkEnvironment.create(networkConfig, taskEventPublisher, metricGroup, ioManager); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 4288573cbd872..57a7f2b8a906b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -356,9 +356,7 @@ public static TaskExecutor startTaskManager( TaskManagerServicesConfiguration taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration( configuration, - EnvironmentInformation.getMaxJvmHeapMemory(), - remoteAddress, - localCommunicationOnly); + remoteAddress); Tuple2 taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup( metricRegistry, @@ -367,12 +365,14 @@ public static TaskExecutor startTaskManager( taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval()); TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( + configuration, taskManagerServicesConfiguration, taskManagerMetricGroup.f1, resourceID, rpcService.getExecutor(), // TODO replace this later with some dedicated executor for io. EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(), - EnvironmentInformation.getMaxJvmHeapMemory()); + EnvironmentInformation.getMaxJvmHeapMemory(), + localCommunicationOnly); TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index a9689ce9f4e00..266a38c86b44c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -221,22 +221,27 @@ public void shutDown() throws FlinkException { /** * Creates and returns the task manager services. * + * @param configuration Flink configuration. * @param taskManagerServicesConfiguration task manager configuration * @param taskManagerMetricGroup metric group of the task manager * @param resourceID resource ID of the task manager * @param taskIOExecutor executor for async IO operations * @param freeHeapMemoryWithDefrag an estimate of the size of the free heap memory * @param maxJvmHeapMemory the maximum JVM heap size + * @param localCommunicationOnly True, to skip initializing the network stack. + * Use only in cases where only one task manager runs. * @return task manager components * @throws Exception */ public static TaskManagerServices fromConfiguration( + Configuration configuration, TaskManagerServicesConfiguration taskManagerServicesConfiguration, MetricGroup taskManagerMetricGroup, ResourceID resourceID, Executor taskIOExecutor, long freeHeapMemoryWithDefrag, - long maxJvmHeapMemory) throws Exception { + long maxJvmHeapMemory, + boolean localCommunicationOnly) throws Exception { // pre-start checks checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths()); @@ -246,12 +251,14 @@ public static TaskManagerServices fromConfiguration( // start the I/O manager, it will create some temp directories. final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths()); - final NetworkEnvironment network = NetworkEnvironment.create( - resourceID, - taskManagerServicesConfiguration.getNetworkConfig(), + final NetworkEnvironment network = NetworkEnvironment.fromConfiguration( + configuration, taskEventDispatcher, taskManagerMetricGroup, - ioManager); + ioManager, + maxJvmHeapMemory, + localCommunicationOnly, + taskManagerServicesConfiguration.getTaskManagerAddress()); int dataPort = network.start(); final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration); @@ -382,7 +389,7 @@ private static MemoryManager createMemoryManager( memoryManager = new MemoryManager( memorySize, taskManagerServicesConfiguration.getNumberOfSlots(), - taskManagerServicesConfiguration.getNetworkConfig().networkBufferSize(), + taskManagerServicesConfiguration.getPageSize(), memType, preAllocateMemory); } catch (OutOfMemoryError e) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index 198fd10b57170..f88444e879715 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -26,7 +26,6 @@ import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration; -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.util.ConfigurationParserUtils; import javax.annotation.Nullable; @@ -38,7 +37,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** - * Configuration for the task manager services such as the network environment, the memory manager, + * Configuration for the task manager services such as the memory manager, * the io manager and the metric registry. */ public class TaskManagerServicesConfiguration { @@ -51,8 +50,6 @@ public class TaskManagerServicesConfiguration { private final int numberOfSlots; - private final NetworkEnvironmentConfiguration networkConfig; - @Nullable private final QueryableStateConfiguration queryableStateConfig; @@ -69,6 +66,8 @@ public class TaskManagerServicesConfiguration { private final float memoryFraction; + private final int pageSize; + private final long timerServiceShutdownTimeout; private final boolean localRecoveryEnabled; @@ -82,13 +81,13 @@ public TaskManagerServicesConfiguration( String[] tmpDirPaths, String[] localRecoveryStateRootDirectories, boolean localRecoveryEnabled, - NetworkEnvironmentConfiguration networkConfig, @Nullable QueryableStateConfiguration queryableStateConfig, int numberOfSlots, long configuredMemory, MemoryType memoryType, boolean preAllocateMemory, float memoryFraction, + int pageSize, long timerServiceShutdownTimeout, RetryingRegistrationConfiguration retryingRegistrationConfiguration, Optional