diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java index f77498d5d5b36..e4086bd7f27bf 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java @@ -95,14 +95,6 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con "Timeout for all outbound connections. If you should experience problems with connecting to a" + " TaskManager due to a slow network, you should increase this value."); - /** Timeout for the startup of the actor system. */ - public static final ConfigOption STARTUP_TIMEOUT = - ConfigOptions.key("akka.startup-timeout") - .stringType() - .noDefaultValue() - .withDescription( - "Timeout after which the startup of a remote component is considered being failed."); - /** Override SSL support for the Akka transport. */ public static final ConfigOption SSL_ENABLED = ConfigOptions.key("akka.ssl.enabled") @@ -120,7 +112,7 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con .withDescription( "Maximum size of messages which are sent between the JobManager and the TaskManagers. If Flink" + " fails because messages exceed this limit, then you should increase it. The message size requires a" - + " size-unit specifier."); + + " size-unit specifier. Has to be at least 32 KiB"); /** Maximum number of messages until another actor is executed by the same thread. */ public static final ConfigOption DISPATCHER_THROUGHPUT = @@ -178,13 +170,15 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con .defaultValue(true) .withDescription("Exit JVM on fatal Akka errors."); - /** Milliseconds a gate should be closed for after a remote connection was disconnected. */ - public static final ConfigOption RETRY_GATE_CLOSED_FOR = - ConfigOptions.key("akka.retry-gate-closed-for") - .longType() - .defaultValue(50L) + /** Retry outbound connection only after this backoff. */ + public static final ConfigOption OUTBOUND_RESTART_BACKOFF = + ConfigOptions.key("akka.outbound-restart-backoff") + .stringType() + .defaultValue("50 ms") .withDescription( - "Milliseconds a gate should be closed for after a remote connection was disconnected."); + "Retry outbound connection only after this backoff." + + " Replaces the \"akka.retry-gate-closed-for\" key, which is now deprecated." + + " For now, if only the deprecated key is set, its value will be picked up."); // ================================================== // Configurations for fork-join-executor. @@ -223,9 +217,39 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con .build()); // ================================================== - // Configurations for client-socket-work-pool. + // Deprecated options // ================================================== + /** + * Timeout for the startup of the actor system. + * + * @deprecated Don't use this option anymore. It has no effect on Flink. + */ + @Deprecated + public static final ConfigOption STARTUP_TIMEOUT = + ConfigOptions.key("akka.startup-timeout") + .stringType() + .noDefaultValue() + .withDescription( + "Timeout after which the startup of a remote component is considered being failed."); + + /** + * Milliseconds a gate should be closed for after a remote connection was disconnected. + * + * @deprecated Don't use this option anymore. It has no effect on Flink. + */ + @Deprecated + public static final ConfigOption RETRY_GATE_CLOSED_FOR = + ConfigOptions.key("akka.retry-gate-closed-for") + .longType() + .defaultValue(50L) + .withDescription( + "Milliseconds a gate should be closed for after a remote connection was disconnected." + + " Replaced by \"akka.outbound-restart-backoff\"." + + " For now, this is used as a backup if the new option is not set."); + + /** @deprecated Don't use this option anymore. It has no effect on Flink. */ + @Deprecated public static final ConfigOption CLIENT_SOCKET_WORKER_POOL_SIZE_MIN = ConfigOptions.key("akka.client-socket-worker-pool.pool-size-min") .intType() @@ -235,6 +259,8 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con .text("Min number of threads to cap factor-based number to.") .build()); + /** @deprecated Don't use this option anymore. It has no effect on Flink. */ + @Deprecated public static final ConfigOption CLIENT_SOCKET_WORKER_POOL_SIZE_MAX = ConfigOptions.key("akka.client-socket-worker-pool.pool-size-max") .intType() @@ -244,6 +270,8 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con .text("Max number of threads to cap factor-based number to.") .build()); + /** @deprecated Don't use this option anymore. It has no effect on Flink. */ + @Deprecated public static final ConfigOption CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR = ConfigOptions.key("akka.client-socket-worker-pool.pool-size-factor") .doubleType() @@ -257,10 +285,8 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con + " pool-size-max values.") .build()); - // ================================================== - // Configurations for server-socket-work-pool. - // ================================================== - + /** @deprecated Don't use this option anymore. It has no effect on Flink. */ + @Deprecated public static final ConfigOption SERVER_SOCKET_WORKER_POOL_SIZE_MIN = ConfigOptions.key("akka.server-socket-worker-pool.pool-size-min") .intType() @@ -270,6 +296,8 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con .text("Min number of threads to cap factor-based number to.") .build()); + /** @deprecated Don't use this option anymore. It has no effect on Flink. */ + @Deprecated public static final ConfigOption SERVER_SOCKET_WORKER_POOL_SIZE_MAX = ConfigOptions.key("akka.server-socket-worker-pool.pool-size-max") .intType() @@ -279,6 +307,8 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con .text("Max number of threads to cap factor-based number to.") .build()); + /** @deprecated Don't use this option anymore. It has no effect on Flink. */ + @Deprecated public static final ConfigOption SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR = ConfigOptions.key("akka.server-socket-worker-pool.pool-size-factor") .doubleType() @@ -292,10 +322,6 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con + " pool-size-max values.") .build()); - // ================================================== - // Deprecated options - // ================================================== - /** * The Akka death watch heartbeat interval. * diff --git a/flink-rpc/flink-rpc-akka/pom.xml b/flink-rpc/flink-rpc-akka/pom.xml index 9de36d885041c..ea976b4ba1153 100644 --- a/flink-rpc/flink-rpc-akka/pom.xml +++ b/flink-rpc/flink-rpc-akka/pom.xml @@ -96,11 +96,6 @@ under the License. akka-slf4j_${scala.binary.version} ${akka.version} - - io.netty - netty - 3.10.6.Final - org.slf4j diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaBootstrapTools.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaBootstrapTools.java index 696821e6a62d7..63b02bb064381 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaBootstrapTools.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaBootstrapTools.java @@ -122,8 +122,7 @@ public static ActorSystem startRemoteActorSystem( } catch (Exception e) { // we can continue to try if this contains a netty channel exception Throwable cause = e.getCause(); - if (!(cause instanceof org.jboss.netty.channel.ChannelException - || cause instanceof java.net.BindException)) { + if (!(cause instanceof java.net.BindException)) { throw e; } // else fall through the loop and try the next port } diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java index 41a87dd4ec9d6..372cc14b9391a 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java @@ -46,7 +46,7 @@ private AkkaRpcServiceConfiguration( boolean captureAskCallStack, boolean forceRpcInvocationSerialization) { - checkArgument(maximumFramesize > 0L, "Maximum framesize must be positive."); + checkArgument(maximumFramesize >= 32768L, "Maximum framesize must be at least 32 KiB."); this.configuration = configuration; this.timeout = timeout; this.maximumFramesize = maximumFramesize; diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java index 2e91569d933e9..00d9fc7358efd 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java @@ -21,7 +21,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.rpc.AddressResolution; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcSystem; @@ -54,9 +53,6 @@ public class AkkaRpcServiceUtils { private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcServiceUtils.class); - private static final String AKKA_TCP = "akka.tcp"; - private static final String AKKA_SSL_TCP = "akka.ssl.tcp"; - static final String SUPERVISOR_NAME = "rpc"; private static final String SIMPLE_AKKA_CONFIG_TEMPLATE = @@ -129,16 +125,7 @@ public static String getRpcUrl( checkNotNull(config, "config is null"); - final boolean sslEnabled = - config.getBoolean(AkkaOptions.SSL_ENABLED) - && SecurityOptions.isInternalSSLEnabled(config); - - return getRpcUrl( - hostname, - port, - endpointName, - addressResolution, - sslEnabled ? AkkaProtocol.SSL_TCP : AkkaProtocol.TCP); + return getRpcUrl(hostname, port, endpointName, addressResolution); } /** @@ -147,15 +134,10 @@ public static String getRpcUrl( * @param endpointName The name of the RPC endpoint. * @param addressResolution Whether to try address resolution of the given hostname or not. This * allows to fail fast in case that the hostname cannot be resolved. - * @param akkaProtocol True, if security/encryption is enabled, false otherwise. * @return The RPC URL of the specified RPC endpoint. */ public static String getRpcUrl( - String hostname, - int port, - String endpointName, - AddressResolution addressResolution, - AkkaProtocol akkaProtocol) + String hostname, int port, String endpointName, AddressResolution addressResolution) throws UnknownHostException { checkNotNull(hostname, "hostname is null"); @@ -170,8 +152,7 @@ public static String getRpcUrl( final String hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port); - return internalRpcUrl( - endpointName, Optional.of(new RemoteAddressInformation(hostPort, akkaProtocol))); + return internalRpcUrl(endpointName, Optional.of(hostPort)); } public static String getLocalRpcUrl(String endpointName) { @@ -182,35 +163,9 @@ public static boolean isRecipientTerminatedException(Throwable exception) { return exception.getMessage().contains("had already been terminated."); } - private static final class RemoteAddressInformation { - private final String hostnameAndPort; - private final AkkaProtocol akkaProtocol; - - private RemoteAddressInformation(String hostnameAndPort, AkkaProtocol akkaProtocol) { - this.hostnameAndPort = hostnameAndPort; - this.akkaProtocol = akkaProtocol; - } - - private String getHostnameAndPort() { - return hostnameAndPort; - } - - private AkkaProtocol getAkkaProtocol() { - return akkaProtocol; - } - } - - private static String internalRpcUrl( - String endpointName, Optional remoteAddressInformation) { - final String protocolPrefix = - remoteAddressInformation - .map(rai -> akkaProtocolToString(rai.getAkkaProtocol())) - .orElse("akka"); - final Optional optionalHostnameAndPort = - remoteAddressInformation.map(RemoteAddressInformation::getHostnameAndPort); - - final StringBuilder url = new StringBuilder(String.format("%s://flink", protocolPrefix)); - optionalHostnameAndPort.ifPresent(hostPort -> url.append("@").append(hostPort)); + private static String internalRpcUrl(String endpointName, Optional optHostPort) { + final StringBuilder url = new StringBuilder("akka://flink"); + optHostPort.ifPresent(hostPort -> url.append("@").append(hostPort)); url.append("/user/").append(SUPERVISOR_NAME).append("/").append(endpointName); @@ -218,16 +173,6 @@ private static String internalRpcUrl( return url.toString(); } - private static String akkaProtocolToString(AkkaProtocol akkaProtocol) { - return akkaProtocol == AkkaProtocol.SSL_TCP ? AKKA_SSL_TCP : AKKA_TCP; - } - - /** Whether to use TCP or encrypted TCP for Akka. */ - public enum AkkaProtocol { - TCP, - SSL_TCP - } - // ------------------------------------------------------------------------ // RPC service configuration // ------------------------------------------------------------------------ diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java index 65bf7a7f41bda..15d98762f0e1a 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java @@ -33,8 +33,6 @@ import akka.actor.AddressFromURIString; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import org.jboss.netty.logging.InternalLoggerFactory; -import org.jboss.netty.logging.Slf4JLoggerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +42,6 @@ import java.io.StringWriter; import java.net.InetSocketAddress; import java.net.MalformedURLException; -import java.time.Duration; import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -89,8 +86,8 @@ private static Config getBasicAkkaConfig(Configuration configuration) { .add(" log-dead-letters = " + logLifecycleEvents) .add(" log-dead-letters-during-shutdown = " + logLifecycleEvents) .add(" jvm-exit-on-fatal-error = " + jvmExitOnFatalError) - .add(" serialize-messages = off") .add(" actor {") + .add(" serialize-messages = off") .add(" guardian-supervisor-strategy = " + supervisorStrategy) .add(" warn-about-java-serializer-usage = off") .add(" allow-java-serialization = on") @@ -206,79 +203,36 @@ private static void addBaseRemoteAkkaConfig( Configuration configuration, int port, int externalPort) { - final Duration akkaAskTimeout = configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION); - - final String startupTimeout = - TimeUtils.getStringInMillis( - TimeUtils.parseDuration( - configuration.getString( - AkkaOptions.STARTUP_TIMEOUT, - TimeUtils.getStringInMillis( - akkaAskTimeout.multipliedBy(10L))))); - final String akkaTCPTimeout = TimeUtils.getStringInMillis( TimeUtils.parseDuration(configuration.getString(AkkaOptions.TCP_TIMEOUT))); final String akkaFramesize = configuration.getString(AkkaOptions.FRAMESIZE); - final int clientSocketWorkerPoolPoolSizeMin = - configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN); - final int clientSocketWorkerPoolPoolSizeMax = - configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX); - final double clientSocketWorkerPoolPoolSizeFactor = - configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR); - final int serverSocketWorkerPoolPoolSizeMin = - configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN); - final int serverSocketWorkerPoolPoolSizeMax = - configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX); - final double serverSocketWorkerPoolPoolSizeFactor = - configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR); - - final String logLifecycleEvents = - booleanToOnOrOff(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)); - - final long retryGateClosedFor = configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR); + final String outboundRestartBackoff; + if (!configuration.contains(AkkaOptions.OUTBOUND_RESTART_BACKOFF) + && configuration.contains(AkkaOptions.RETRY_GATE_CLOSED_FOR)) { + outboundRestartBackoff = + configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR) + " ms"; + } else { + outboundRestartBackoff = configuration.getString(AkkaOptions.OUTBOUND_RESTART_BACKOFF); + } akkaConfigBuilder .add("akka {") .add(" actor {") - .add(" provider = \"akka.remote.RemoteActorRefProvider\"") + .add(" provider = remote") .add(" }") - .add(" remote.artery.enabled = false") - .add(" remote.startup-timeout = " + startupTimeout) .add(" remote.warn-about-direct-use = off") .add(" remote.use-unsafe-remote-features-outside-cluster = on") - .add(" remote.classic {") - .add(" # disable the transport failure detector by setting very high values") - .add(" transport-failure-detector{") - .add(" acceptable-heartbeat-pause = 6000 s") - .add(" heartbeat-interval = 1000 s") - .add(" threshold = 300") + .add(" remote.artery {") + .add(" canonical.port = " + externalPort) + .add(" bind.port = " + port) + .add(" advanced {") + .add(" maximum-frame-size = " + akkaFramesize) + .add(" outbound-restart-backoff = " + outboundRestartBackoff) + .add(" tcp.connection-timeout = " + akkaTCPTimeout) .add(" }") - .add(" enabled-transports = [\"akka.remote.classic.netty.tcp\"]") - .add(" netty {") - .add(" tcp {") - .add(" transport-class = \"akka.remote.transport.netty.NettyTransport\"") - .add(" port = " + externalPort) - .add(" bind-port = " + port) - .add(" connection-timeout = " + akkaTCPTimeout) - .add(" maximum-frame-size = " + akkaFramesize) - .add(" tcp-nodelay = on") - .add(" client-socket-worker-pool {") - .add(" pool-size-min = " + clientSocketWorkerPoolPoolSizeMin) - .add(" pool-size-max = " + clientSocketWorkerPoolPoolSizeMax) - .add(" pool-size-factor = " + clientSocketWorkerPoolPoolSizeFactor) - .add(" }") - .add(" server-socket-worker-pool {") - .add(" pool-size-min = " + serverSocketWorkerPoolPoolSizeMin) - .add(" pool-size-max = " + serverSocketWorkerPoolPoolSizeMax) - .add(" pool-size-factor = " + serverSocketWorkerPoolPoolSizeFactor) - .add(" }") - .add(" }") - .add(" }") - .add(" log-remote-lifecycle-events = " + logLifecycleEvents) - .add(" retry-gate-closed-for = " + retryGateClosedFor + " ms") .add(" }") .add("}"); } @@ -296,13 +250,9 @@ private static void addHostnameRemoteAkkaConfig( akkaConfigBuilder .add("akka {") - .add(" remote.classic {") - .add(" netty {") - .add(" tcp {") - .add(" hostname = \"" + effectiveHostname + "\"") - .add(" bind-hostname = \"" + bindAddress + "\"") - .add(" }") - .add(" }") + .add(" remote.artery {") + .add(" canonical.hostname = \"" + effectiveHostname + "\"") + .add(" bind.hostname = \"" + bindAddress + "\"") .add(" }") .add("}"); } @@ -314,7 +264,9 @@ private static void addSslRemoteAkkaConfig( configuration.getBoolean(AkkaOptions.SSL_ENABLED) && SecurityOptions.isInternalSSLEnabled(configuration); - final String akkaEnableSSL = booleanToOnOrOff(akkaEnableSSLConfig); + if (!akkaEnableSSLConfig) { + return; + } final String akkaSSLKeyStore = configuration.getString( @@ -362,26 +314,19 @@ private static void addSslRemoteAkkaConfig( akkaConfigBuilder .add("akka {") - .add(" remote.classic {") - .add(" enabled-transports = [\"akka.remote.classic.netty.ssl\"]") - .add(" netty {") - .add(" ssl = ${akka.remote.classic.netty.tcp}") - .add(" ssl {") - .add(" enable-ssl = " + akkaEnableSSL) - .add(" ssl-engine-provider = " + sslEngineProviderName) - .add(" security {") - .add(" key-store = \"" + akkaSSLKeyStore + "\"") - .add(" key-store-password = \"" + akkaSSLKeyStorePassword + "\"") - .add(" key-password = \"" + akkaSSLKeyPassword + "\"") - .add(" trust-store = \"" + akkaSSLTrustStore + "\"") - .add(" trust-store-password = \"" + akkaSSLTrustStorePassword + "\"") - .add(" protocol = " + akkaSSLProtocol + "") - .add(" enabled-algorithms = " + akkaSSLAlgorithms + "") - .add(" random-number-generator = \"\"") - .add(" require-mutual-authentication = on") - .add(" cert-fingerprints = " + akkaSSLCertFingerprints + "") - .add(" }") - .add(" }") + .add(" remote.artery {") + .add(" transport = tls-tcp") + .add(" ssl.ssl-engine-provider = " + sslEngineProviderName) + .add(" ssl.config-ssl-engine {") + .add(" key-store = \"" + akkaSSLKeyStore + "\"") + .add(" key-store-password = \"" + akkaSSLKeyStorePassword + "\"") + .add(" key-password = \"" + akkaSSLKeyPassword + "\"") + .add(" trust-store = \"" + akkaSSLTrustStore + "\"") + .add(" trust-store-password = \"" + akkaSSLTrustStorePassword + "\"") + .add(" protocol = " + akkaSSLProtocol) + .add(" enabled-algorithms = " + akkaSSLAlgorithms) + .add(" cert-fingerprints = " + akkaSSLCertFingerprints) + .add(" require-mutual-authentication = on") .add(" }") .add(" }") .add("}"); @@ -416,8 +361,6 @@ private static ActorSystem createActorSystem(Config akkaConfig) { * @return created actor system */ public static ActorSystem createActorSystem(String actorSystemName, Config akkaConfig) { - // Initialize slf4j as logger of Akka's Netty instead of java.util.logging (FLINK-1650) - InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory()); return RobustActorSystem.create(actorSystemName, akkaConfig); } @@ -439,7 +382,7 @@ public static ActorSystem createDefaultActorSystem() { * @return Flink's Akka default config */ private static Config getDefaultAkkaConfig() { - return getAkkaConfig(new Configuration(), new HostAndPort("", 0)); + return getAkkaConfig(new Configuration(), new HostAndPort("localhost", 0)); } /** @@ -484,8 +427,9 @@ public static Config getAkkaConfig( AkkaUtils.getBasicAkkaConfig(configuration).withFallback(executorConfig); if (externalAddress != null) { + final Config remoteConfig; if (bindAddress != null) { - final Config remoteConfig = + remoteConfig = AkkaUtils.getRemoteAkkaConfig( configuration, bindAddress.getHost(), @@ -493,18 +437,16 @@ public static Config getAkkaConfig( externalAddress.getHost(), externalAddress.getPort()); - return remoteConfig.withFallback(defaultConfig); } else { - final Config remoteConfig = + remoteConfig = AkkaUtils.getRemoteAkkaConfig( configuration, NetUtils.getWildcardIPAddress(), externalAddress.getPort(), externalAddress.getHost(), externalAddress.getPort()); - - return remoteConfig.withFallback(defaultConfig); } + return remoteConfig.withFallback(defaultConfig); } return defaultConfig; diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/CustomSSLEngineProvider.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/CustomSSLEngineProvider.java index 7f40c1554aa66..9c3c006c60aa3 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/CustomSSLEngineProvider.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/CustomSSLEngineProvider.java @@ -21,7 +21,7 @@ import akka.actor.ActorSystem; import akka.remote.RemoteTransportException; -import akka.remote.transport.netty.ConfigSSLEngineProvider; +import akka.remote.artery.tcp.ConfigSSLEngineProvider; import com.typesafe.config.Config; import javax.net.ssl.TrustManager; @@ -33,7 +33,6 @@ /** * Extension of the {@link ConfigSSLEngineProvider} to use a {@link FingerprintTrustManagerFactory}. */ -@SuppressWarnings("deprecation") public class CustomSSLEngineProvider extends ConfigSSLEngineProvider { private final String sslTrustStore; private final String sslTrustStorePassword; @@ -42,7 +41,7 @@ public class CustomSSLEngineProvider extends ConfigSSLEngineProvider { public CustomSSLEngineProvider(ActorSystem system) { super(system); final Config securityConfig = - system.settings().config().getConfig("akka.remote.classic.netty.ssl.security"); + system.settings().config().getConfig("akka.remote.artery.ssl.config-ssl-engine"); sslTrustStore = securityConfig.getString("trust-store"); sslTrustStorePassword = securityConfig.getString("trust-store-password"); sslCertFingerprints = securityConfig.getStringList("cert-fingerprints"); diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.java index f66f6ca688d96..9ad7ca6b94991 100644 --- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.java +++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.java @@ -44,7 +44,7 @@ /** Tests for the over sized response message handling of the {@link AkkaRpcActor}. */ class AkkaRpcActorOversizedResponseMessageTest { - private static final int FRAMESIZE = 32000; + private static final int FRAMESIZE = 32768; private static final String OVERSIZED_PAYLOAD = new String(new byte[FRAMESIZE]); diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaUtilsTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaUtilsTest.java index eff94b36d84d4..e1f7f5fdacd3a 100644 --- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaUtilsTest.java +++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaUtilsTest.java @@ -17,7 +17,6 @@ package org.apache.flink.runtime.rpc.akka; -import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.rpc.AddressResolution; @@ -30,7 +29,6 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; -import java.time.Duration; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -47,11 +45,7 @@ void getHostFromAkkaURLForRemoteAkkaURL() throws Exception { final String remoteAkkaUrl = AkkaRpcServiceUtils.getRpcUrl( - host, - port, - "actor", - AddressResolution.NO_ADDRESS_RESOLUTION, - AkkaRpcServiceUtils.AkkaProtocol.TCP); + host, port, "actor", AddressResolution.NO_ADDRESS_RESOLUTION); final InetSocketAddress result = AkkaUtils.getInetSocketAddressFromAkkaURL(remoteAkkaUrl); @@ -59,7 +53,7 @@ void getHostFromAkkaURLForRemoteAkkaURL() throws Exception { } @Test - void getHostFromAkkaURLThrowsExceptionIfAddressCannotBeRetrieved() throws Exception { + void getHostFromAkkaURLThrowsExceptionIfAddressCannotBeRetrieved() { final String localAkkaURL = "akka://flink/user/actor"; assertThatThrownBy(() -> AkkaUtils.getInetSocketAddressFromAkkaURL(localAkkaURL)) @@ -154,7 +148,7 @@ void getAkkaConfigNormalizesHostName() { final Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, new HostAndPort(hostname, port)); - assertThat(akkaConfig.getString("akka.remote.classic.netty.tcp.hostname")) + assertThat(akkaConfig.getString("akka.remote.artery.canonical.hostname")) .isEqualTo(NetUtils.unresolvedHostToNormalizedString(hostname)); } @@ -163,7 +157,7 @@ void getAkkaConfigDefaultsToLocalHost() throws UnknownHostException { final Config akkaConfig = AkkaUtils.getAkkaConfig(new Configuration(), new HostAndPort("", 0)); - final String hostname = akkaConfig.getString("akka.remote.classic.netty.tcp.hostname"); + final String hostname = akkaConfig.getString("akka.remote.artery.canonical.hostname"); assertThat(InetAddress.getByName(hostname).isLoopbackAddress()).isTrue(); } @@ -212,21 +206,10 @@ void getAkkaConfigHandlesIPv6Address() { AkkaUtils.getAkkaConfig( new Configuration(), new HostAndPort(ipv6AddressString, 1234)); - assertThat(akkaConfig.getString("akka.remote.classic.netty.tcp.hostname")) + assertThat(akkaConfig.getString("akka.remote.artery.canonical.hostname")) .isEqualTo(NetUtils.unresolvedHostToNormalizedString(ipv6AddressString)); } - @Test - void getAkkaConfigDefaultsStartupTimeoutTo10TimesOfAskTimeout() { - final Configuration configuration = new Configuration(); - configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMillis(100)); - - final Config akkaConfig = - AkkaUtils.getAkkaConfig(configuration, new HostAndPort("localhost", 31337)); - - assertThat(akkaConfig.getString("akka.remote.startup-timeout")).isEqualTo("1000ms"); - } - @Test void getAkkaConfigSslEngineProviderWithoutCertFingerprint() { final Configuration configuration = new Configuration(); @@ -234,11 +217,11 @@ void getAkkaConfigSslEngineProviderWithoutCertFingerprint() { final Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, new HostAndPort("localhost", 31337)); - final Config sslConfig = akkaConfig.getConfig("akka.remote.classic.netty.ssl"); + final Config sslConfig = akkaConfig.getConfig("akka.remote.artery.ssl"); assertThat(sslConfig.getString("ssl-engine-provider")) .isEqualTo("org.apache.flink.runtime.rpc.akka.CustomSSLEngineProvider"); - assertThat(sslConfig.getStringList("security.cert-fingerprints")).isEmpty(); + assertThat(sslConfig.getStringList("config-ssl-engine.cert-fingerprints")).isEmpty(); } @Test @@ -251,10 +234,11 @@ void getAkkaConfigSslEngineProviderWithCertFingerprint() { final Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, new HostAndPort("localhost", 31337)); - final Config sslConfig = akkaConfig.getConfig("akka.remote.classic.netty.ssl"); + final Config sslConfig = akkaConfig.getConfig("akka.remote.artery.ssl"); assertThat(sslConfig.getString("ssl-engine-provider")) .isEqualTo("org.apache.flink.runtime.rpc.akka.CustomSSLEngineProvider"); - assertThat(sslConfig.getStringList("security.cert-fingerprints")).contains(fingerprint); + assertThat(sslConfig.getStringList("config-ssl-engine.cert-fingerprints")) + .contains(fingerprint); } } diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java index cfeab0387fb08..1810f79e1914a 100644 --- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java +++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java @@ -46,7 +46,7 @@ class MessageSerializationTest { private static RpcService akkaRpcService1; private static RpcService akkaRpcService2; - private static final int maxFrameSize = 32000; + private static final int maxFrameSize = 32768; @BeforeAll static void setup() throws Exception { diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/RemoteAkkaRpcActorTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/RemoteAkkaRpcActorTest.java index 255ebf126d21c..7f00883575570 100644 --- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/RemoteAkkaRpcActorTest.java +++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/RemoteAkkaRpcActorTest.java @@ -26,6 +26,7 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.Optional; @@ -145,6 +146,7 @@ void failsRpcResultImmediatelyIfEndpointIsStopped() throws Exception { } @Test + @Disabled("Not meaningful for Artery") void failsRpcResultImmediatelyIfRemoteRpcServiceIsNotAvailable() throws Exception { final AkkaRpcService toBeClosedRpcService = AkkaRpcServiceUtils.createRemoteRpcService( diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java index acaccedff97b0..a06f56b02c4d4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java @@ -22,8 +22,6 @@ import org.apache.flink.api.common.io.GenericInputFormat; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.configuration.AkkaOptions; -import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.runtime.testutils.MiniClusterResource; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; @@ -40,15 +38,12 @@ import static org.junit.Assert.assertEquals; /** Integration tests for {@link org.apache.flink.api.java.RemoteEnvironment}. */ -@SuppressWarnings("serial") public class RemoteEnvironmentITCase extends TestLogger { private static final int TM_SLOTS = 4; private static final int USER_DOP = 2; - private static final String VALID_STARTUP_TIMEOUT = "100 s"; - @ClassRule public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( @@ -59,15 +54,12 @@ public class RemoteEnvironmentITCase extends TestLogger { /** Ensure that the program parallelism can be set even if the configuration is supplied. */ @Test public void testUserSpecificParallelism() throws Exception { - Configuration config = new Configuration(); - config.setString(AkkaOptions.STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT); - final URI restAddress = MINI_CLUSTER_RESOURCE.getRestAddress(); final String hostname = restAddress.getHost(); final int port = restAddress.getPort(); final ExecutionEnvironment env = - ExecutionEnvironment.createRemoteEnvironment(hostname, port, config); + ExecutionEnvironment.createRemoteEnvironment(hostname, port); env.setParallelism(USER_DOP); DataSet result =