Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,6 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con
"Timeout for all outbound connections. If you should experience problems with connecting to a"
+ " TaskManager due to a slow network, you should increase this value.");

/** Timeout for the startup of the actor system. */
public static final ConfigOption<String> STARTUP_TIMEOUT =
ConfigOptions.key("akka.startup-timeout")
.stringType()
.noDefaultValue()
.withDescription(
"Timeout after which the startup of a remote component is considered being failed.");

/** Override SSL support for the Akka transport. */
public static final ConfigOption<Boolean> SSL_ENABLED =
ConfigOptions.key("akka.ssl.enabled")
Expand All @@ -120,7 +112,7 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con
.withDescription(
"Maximum size of messages which are sent between the JobManager and the TaskManagers. If Flink"
+ " fails because messages exceed this limit, then you should increase it. The message size requires a"
+ " size-unit specifier.");
+ " size-unit specifier. Has to be at least 32 KiB");

/** Maximum number of messages until another actor is executed by the same thread. */
public static final ConfigOption<Integer> DISPATCHER_THROUGHPUT =
Expand Down Expand Up @@ -178,13 +170,15 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con
.defaultValue(true)
.withDescription("Exit JVM on fatal Akka errors.");

/** Milliseconds a gate should be closed for after a remote connection was disconnected. */
public static final ConfigOption<Long> RETRY_GATE_CLOSED_FOR =
ConfigOptions.key("akka.retry-gate-closed-for")
.longType()
.defaultValue(50L)
/** Retry outbound connection only after this backoff. */
public static final ConfigOption<String> OUTBOUND_RESTART_BACKOFF =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a de-facto replacement RETRY_GATE_CLOSED_FOR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this change based on the configuration comments.

classic doc:

      # After failed to establish an outbound connection, the remoting will mark the
      # address as failed. This configuration option controls how much time should
      # be elapsed before reattempting a new connection. While the address is
      # gated, all messages sent to the address are delivered to dead-letters.
      # Since this setting limits the rate of reconnects setting it to a
      # very short interval (i.e. less than a second) may result in a storm of
      # reconnect attempts.
      retry-gate-closed-for = 5 s

artery doc:

        # Retry outbound connection after this backoff.
        # Only used when transport is tcp or tls-tcp.
        outbound-restart-backoff = 1 second

The outbound-restart-backoff comments are a lot less specific, but they point to the same direction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they point to the same direction

That was also my conclusion from the docs. Shall we add retry-gate-closed-for as a deprecated key to the new option?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that makes sense. I also thought about using the value of retry-gate-closed-for if that is set explicitly and outbound-restart-backoff is missing.

ConfigOptions.key("akka.outbound-restart-backoff")
.stringType()
.defaultValue("50 ms")
.withDescription(
"Milliseconds a gate should be closed for after a remote connection was disconnected.");
"Retry outbound connection only after this backoff."
+ " Replaces the \"akka.retry-gate-closed-for\" key, which is now deprecated."
+ " For now, if only the deprecated key is set, its value will be picked up.");

// ==================================================
// Configurations for fork-join-executor.
Expand Down Expand Up @@ -223,9 +217,39 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con
.build());

// ==================================================
// Configurations for client-socket-work-pool.
// Deprecated options
// ==================================================

/**
* Timeout for the startup of the actor system.
*
* @deprecated Don't use this option anymore. It has no effect on Flink.
*/
@Deprecated
public static final ConfigOption<String> STARTUP_TIMEOUT =
ConfigOptions.key("akka.startup-timeout")
.stringType()
.noDefaultValue()
.withDescription(
"Timeout after which the startup of a remote component is considered being failed.");

/**
* Milliseconds a gate should be closed for after a remote connection was disconnected.
*
* @deprecated Don't use this option anymore. It has no effect on Flink.
*/
@Deprecated
public static final ConfigOption<Long> RETRY_GATE_CLOSED_FOR =
ConfigOptions.key("akka.retry-gate-closed-for")
.longType()
.defaultValue(50L)
.withDescription(
"Milliseconds a gate should be closed for after a remote connection was disconnected."
+ " Replaced by \"akka.outbound-restart-backoff\"."
+ " For now, this is used as a backup if the new option is not set.");

/** @deprecated Don't use this option anymore. It has no effect on Flink. */
@Deprecated
public static final ConfigOption<Integer> CLIENT_SOCKET_WORKER_POOL_SIZE_MIN =
ConfigOptions.key("akka.client-socket-worker-pool.pool-size-min")
.intType()
Expand All @@ -235,6 +259,8 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con
.text("Min number of threads to cap factor-based number to.")
.build());

/** @deprecated Don't use this option anymore. It has no effect on Flink. */
@Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs regeneration of the docs

public static final ConfigOption<Integer> CLIENT_SOCKET_WORKER_POOL_SIZE_MAX =
ConfigOptions.key("akka.client-socket-worker-pool.pool-size-max")
.intType()
Expand All @@ -244,6 +270,8 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con
.text("Max number of threads to cap factor-based number to.")
.build());

/** @deprecated Don't use this option anymore. It has no effect on Flink. */
@Deprecated
public static final ConfigOption<Double> CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR =
ConfigOptions.key("akka.client-socket-worker-pool.pool-size-factor")
.doubleType()
Expand All @@ -257,10 +285,8 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con
+ " pool-size-max values.")
.build());

// ==================================================
// Configurations for server-socket-work-pool.
// ==================================================

/** @deprecated Don't use this option anymore. It has no effect on Flink. */
@Deprecated
public static final ConfigOption<Integer> SERVER_SOCKET_WORKER_POOL_SIZE_MIN =
ConfigOptions.key("akka.server-socket-worker-pool.pool-size-min")
.intType()
Expand All @@ -270,6 +296,8 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con
.text("Min number of threads to cap factor-based number to.")
.build());

/** @deprecated Don't use this option anymore. It has no effect on Flink. */
@Deprecated
public static final ConfigOption<Integer> SERVER_SOCKET_WORKER_POOL_SIZE_MAX =
ConfigOptions.key("akka.server-socket-worker-pool.pool-size-max")
.intType()
Expand All @@ -279,6 +307,8 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con
.text("Max number of threads to cap factor-based number to.")
.build());

/** @deprecated Don't use this option anymore. It has no effect on Flink. */
@Deprecated
public static final ConfigOption<Double> SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR =
ConfigOptions.key("akka.server-socket-worker-pool.pool-size-factor")
.doubleType()
Expand All @@ -292,10 +322,6 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con
+ " pool-size-max values.")
.build());

// ==================================================
// Deprecated options
// ==================================================

/**
* The Akka death watch heartbeat interval.
*
Expand Down
5 changes: 0 additions & 5 deletions flink-rpc/flink-rpc-akka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,6 @@ under the License.
<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.10.6.Final</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs a NOTICE update

</dependency>

<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ public static ActorSystem startRemoteActorSystem(
} catch (Exception e) {
// we can continue to try if this contains a netty channel exception
Throwable cause = e.getCause();
if (!(cause instanceof org.jboss.netty.channel.ChannelException
|| cause instanceof java.net.BindException)) {
if (!(cause instanceof java.net.BindException)) {
throw e;
} // else fall through the loop and try the next port
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private AkkaRpcServiceConfiguration(
boolean captureAskCallStack,
boolean forceRpcInvocationSerialization) {

checkArgument(maximumFramesize > 0L, "Maximum framesize must be positive.");
checkArgument(maximumFramesize >= 32768L, "Maximum framesize must be at least 32 KiB.");
this.configuration = configuration;
this.timeout = timeout;
this.maximumFramesize = maximumFramesize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcSystem;
Expand Down Expand Up @@ -54,9 +53,6 @@ public class AkkaRpcServiceUtils {

private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcServiceUtils.class);

private static final String AKKA_TCP = "akka.tcp";
private static final String AKKA_SSL_TCP = "akka.ssl.tcp";

static final String SUPERVISOR_NAME = "rpc";

private static final String SIMPLE_AKKA_CONFIG_TEMPLATE =
Expand Down Expand Up @@ -129,16 +125,7 @@ public static String getRpcUrl(

checkNotNull(config, "config is null");

final boolean sslEnabled =
config.getBoolean(AkkaOptions.SSL_ENABLED)
&& SecurityOptions.isInternalSSLEnabled(config);

return getRpcUrl(
hostname,
port,
endpointName,
addressResolution,
sslEnabled ? AkkaProtocol.SSL_TCP : AkkaProtocol.TCP);
return getRpcUrl(hostname, port, endpointName, addressResolution);
}

/**
Expand All @@ -147,15 +134,10 @@ public static String getRpcUrl(
* @param endpointName The name of the RPC endpoint.
* @param addressResolution Whether to try address resolution of the given hostname or not. This
* allows to fail fast in case that the hostname cannot be resolved.
* @param akkaProtocol True, if security/encryption is enabled, false otherwise.
* @return The RPC URL of the specified RPC endpoint.
*/
public static String getRpcUrl(
String hostname,
int port,
String endpointName,
AddressResolution addressResolution,
AkkaProtocol akkaProtocol)
String hostname, int port, String endpointName, AddressResolution addressResolution)
throws UnknownHostException {

checkNotNull(hostname, "hostname is null");
Expand All @@ -170,8 +152,7 @@ public static String getRpcUrl(

final String hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port);

return internalRpcUrl(
endpointName, Optional.of(new RemoteAddressInformation(hostPort, akkaProtocol)));
return internalRpcUrl(endpointName, Optional.of(hostPort));
}

public static String getLocalRpcUrl(String endpointName) {
Expand All @@ -182,52 +163,16 @@ public static boolean isRecipientTerminatedException(Throwable exception) {
return exception.getMessage().contains("had already been terminated.");
}

private static final class RemoteAddressInformation {
private final String hostnameAndPort;
private final AkkaProtocol akkaProtocol;

private RemoteAddressInformation(String hostnameAndPort, AkkaProtocol akkaProtocol) {
this.hostnameAndPort = hostnameAndPort;
this.akkaProtocol = akkaProtocol;
}

private String getHostnameAndPort() {
return hostnameAndPort;
}

private AkkaProtocol getAkkaProtocol() {
return akkaProtocol;
}
}

private static String internalRpcUrl(
String endpointName, Optional<RemoteAddressInformation> remoteAddressInformation) {
final String protocolPrefix =
remoteAddressInformation
.map(rai -> akkaProtocolToString(rai.getAkkaProtocol()))
.orElse("akka");
final Optional<String> optionalHostnameAndPort =
remoteAddressInformation.map(RemoteAddressInformation::getHostnameAndPort);

final StringBuilder url = new StringBuilder(String.format("%s://flink", protocolPrefix));
optionalHostnameAndPort.ifPresent(hostPort -> url.append("@").append(hostPort));
private static String internalRpcUrl(String endpointName, Optional<String> optHostPort) {
final StringBuilder url = new StringBuilder("akka://flink");
optHostPort.ifPresent(hostPort -> url.append("@").append(hostPort));

url.append("/user/").append(SUPERVISOR_NAME).append("/").append(endpointName);

// protocolPrefix://flink[@hostname:port]/user/rpc/endpointName
return url.toString();
}

private static String akkaProtocolToString(AkkaProtocol akkaProtocol) {
return akkaProtocol == AkkaProtocol.SSL_TCP ? AKKA_SSL_TCP : AKKA_TCP;
}

/** Whether to use TCP or encrypted TCP for Akka. */
public enum AkkaProtocol {
TCP,
SSL_TCP
}

// ------------------------------------------------------------------------
// RPC service configuration
// ------------------------------------------------------------------------
Expand Down
Loading