Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,13 @@ TransportClient createClient(InetSocketAddress address)
logger.debug("Creating new connection to {}", address);

Bootstrap bootstrap = new Bootstrap();
int connCreateTimeout = conf.connectionCreationTimeoutMs();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have simply thrown an exception when connCreateTimeout <= 0 - and we can avoid the rest of the PR.
Will that not work ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

w/o this PR, here we simply throw an exception, it doesn't work

Copy link
Contributor

@mridulm mridulm Jul 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will throw the exception and user will fix invalid config first time they run: exact same behavior as with checkValue

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not the same; checkvalue fails app in driver ahead and tells users what is invalid. He we fail during executor allocating. Users may not encounter exceptions, or exceptions and config are suppressed

Copy link
Contributor

@mridulm mridulm Jul 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This failure is in createClient (when introduced) - and will fail all invocations for that module - which will in turn fail the task/stage/job. This is similar in behavior to a configuration referenced in executor which fails in checkValue.
The failure reason will inform the user about the invalid configuration.

Do you have a scenario where an exception thrown in this codepath does not result in failure ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that createClient failures of executor startup will not result in task fail. The executor will fail after max io retries, and then the app attempt will fail appMaster after the max number of executor failures is reached, and there are still many app attempts waiting for us to retry. It could be hours before you notice the app fail without the driver log telling what mistake we've made.

Copy link
Contributor

@mridulm mridulm Jul 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yaooqinn we have discussed this quite a bit already and now going in circles :-)
Essentially my point is this - instead of rejecting invalid configurations, we are trying to workaround invalid user configs. Once released, this will be a pattern we have to then continue supporting - I am not seeing a good reason why we have to introduce this new behavior.

I would recommend simply rejecting the invalid config with an IllegalArgumentException (we do this in a bunch of other places as well for invalid user config), instead of introducing behavior which tries to workaround it [1]: as long as the user gets to see the exception, they can fix the issue and mitigate it.

Thoughts ?

+CC @dongjoon-hyun as well, since you reviewed the PR.

[1] If there is utility in introducing this, and not as a workaround for user config, that would be interesting to know more about - it is not coming out from the discussion here or in the jira.

bootstrap.group(workerGroup)
.channel(socketChannelClass)
// Disable Nagle's Algorithm since we don't want packets to wait
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionCreationTimeoutMs())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connCreateTimeout)
.option(ChannelOption.ALLOCATOR, pooledAllocator);

if (conf.receiveBuf() > 0) {
Expand All @@ -276,10 +277,19 @@ public void initChannel(SocketChannel ch) {
// Connect to the remote server
long preConnect = System.nanoTime();
ChannelFuture cf = bootstrap.connect(address);
if (!cf.await(conf.connectionCreationTimeoutMs())) {

if (connCreateTimeout <= 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although negative cases are prevented already via connectionCreationTimeoutMs changes, this condition looks safer.

cf.awaitUninterruptibly();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be a plain await() instead of awaitUninterruptibly(), since the latter might lead to uninterruptible canceled tasks. See #16866 for a past fix for a similar issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. Could you make a PR to fix it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we need a new JIRA to fix that instead of a followup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @JoshRosen. The fix looks good to me.

While considering the usage of await and awaitUninterruptibly, I consulted the Netty documentation (https://netty.io/4.0/api/io/netty/channel/ChannelFuture.html) which recommended the use of awaitUninterruptibly in all its GOOD examples. So...

assert cf.isDone();
if (cf.isCancelled()) {
throw new IOException(String.format("Connecting to %s cancelled", address));
} else if (!cf.isSuccess()) {
throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
}
} else if (!cf.await(connCreateTimeout)) {
throw new IOException(
String.format("Connecting to %s timed out (%s ms)",
address, conf.connectionCreationTimeoutMs()));
address, connCreateTimeout));
} else if (cf.cause() != null) {
throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,15 @@ public int connectionTimeoutMs() {
conf.get("spark.network.timeout", "120s"));
long defaultTimeoutMs = JavaUtils.timeStringAsSec(
conf.get(SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY, defaultNetworkTimeoutS + "s")) * 1000;
return (int) defaultTimeoutMs;
return defaultTimeoutMs < 0 ? 0 : (int) defaultTimeoutMs;
}

/** Connect creation timeout in milliseconds. Default 30 secs. */
public int connectionCreationTimeoutMs() {
long connectionTimeoutS = TimeUnit.MILLISECONDS.toSeconds(connectionTimeoutMs());
long defaultTimeoutMs = JavaUtils.timeStringAsSec(
conf.get(SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY, connectionTimeoutS + "s")) * 1000;
return (int) defaultTimeoutMs;
return defaultTimeoutMs < 0 ? 0 : (int) defaultTimeoutMs;
}

/** Number of concurrent connections between two nodes for fetching data. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;

import org.apache.spark.network.TestUtils;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.server.NoOpRpcHandler;
Expand All @@ -45,6 +41,8 @@
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportConf;

import static org.junit.Assert.*;

public class TransportClientFactorySuite {
private TransportConf conf;
private TransportContext context;
Expand Down Expand Up @@ -237,4 +235,31 @@ public void fastFailConnectionInTimeWindow() {
Assert.assertThrows("fail this connection directly", IOException.class,
() -> factory.createClient(TestUtils.getLocalHost(), unreachablePort, true));
}

@Test
public void unlimitedConnectionAndCreationTimeouts() throws IOException, InterruptedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding this.

Map<String, String> configMap = new HashMap<>();
configMap.put("spark.shuffle.io.connectionTimeout", "-1");
configMap.put("spark.shuffle.io.connectionCreationTimeout", "-1");
TransportConf conf = new TransportConf("shuffle", new MapConfigProvider(configMap));
RpcHandler rpcHandler = new NoOpRpcHandler();
try (TransportContext ctx = new TransportContext(conf, rpcHandler, true);
TransportClientFactory factory = ctx.createClientFactory()){
TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort());
assertTrue(c1.isActive());
long expiredTime = System.currentTimeMillis() + 5000;
while (c1.isActive() && System.currentTimeMillis() < expiredTime) {
Thread.sleep(10);
}
assertTrue(c1.isActive());
// When connectionCreationTimeout is unlimited, the connection shall be able to
// fail when the server is not reachable.
TransportServer server = ctx.createServer();
int unreachablePort = server.getPort();
JavaUtils.closeQuietly(server);
IOException exception = Assert.assertThrows(IOException.class,
() -> factory.createClient(TestUtils.getLocalHost(), unreachablePort, true));
assertNotEquals(exception.getCause(), null);
}
}
}