Skip to content

Commit

Permalink
HADOOP-19317. S3A: fs.s3a.connection.expect.continue controls 100 CON…
Browse files Browse the repository at this point in the history
…TINUE behavior (#7134)


New option

  fs.s3a.connection.expect.continue

This controls whether or not an PUT request to the S3 store
sets the "Expect: 100-continue" header and awaits a 100 CONTINUE
response before uploading any data.

This allows for throttling and other problems to be detected fast.

The default is "true" -the header is sent.

Contributed by Steve Loughran
  • Loading branch information
steveloughran authored Nov 19, 2024
1 parent 317db31 commit 7543f3a
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,20 @@ private Constants() {
public static final Duration DEFAULT_CONNECTION_IDLE_TIME_DURATION =
Duration.ofSeconds(60);

/**
* Should PUT requests await a 100 CONTINUE responses before uploading
* data?
* <p>
* Value: {@value}.
*/
public static final String CONNECTION_EXPECT_CONTINUE =
"fs.s3a.connection.expect.continue";

/**
* Default value for {@link #CONNECTION_EXPECT_CONTINUE}.
*/
public static final boolean CONNECTION_EXPECT_CONTINUE_DEFAULT = true;

// socket send buffer to be used in Amazon client
public static final String SOCKET_SEND_BUFFER = "fs.s3a.socket.send.buffer";
public static final int DEFAULT_SOCKET_SEND_BUFFER = 8 * 1024;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_ACQUISITION_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_S3;
import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_STS;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_EXPECT_CONTINUE;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_EXPECT_CONTINUE_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_IDLE_TIME;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_KEEPALIVE;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_TTL;
Expand Down Expand Up @@ -149,6 +151,7 @@ public static ApacheHttpClient.Builder createHttpClientBuilder(Configuration con
.connectionMaxIdleTime(conn.getMaxIdleTime())
.connectionTimeout(conn.getEstablishTimeout())
.connectionTimeToLive(conn.getConnectionTTL())
.expectContinueEnabled(conn.isExpectContinueEnabled())
.maxConnections(conn.getMaxConnections())
.socketTimeout(conn.getSocketTimeout())
.tcpKeepAlive(conn.isKeepAlive())
Expand Down Expand Up @@ -491,14 +494,15 @@ public String toString() {
* All the connection settings, wrapped as a class for use by
* both the sync and async client.
*/
static class ConnectionSettings {
static final class ConnectionSettings {
private final int maxConnections;
private final boolean keepAlive;
private final Duration acquisitionTimeout;
private final Duration connectionTTL;
private final Duration establishTimeout;
private final Duration maxIdleTime;
private final Duration socketTimeout;
private final boolean expectContinueEnabled;

private ConnectionSettings(
final int maxConnections,
Expand All @@ -507,14 +511,16 @@ private ConnectionSettings(
final Duration connectionTTL,
final Duration establishTimeout,
final Duration maxIdleTime,
final Duration socketTimeout) {
final Duration socketTimeout,
final boolean expectContinueEnabled) {
this.maxConnections = maxConnections;
this.keepAlive = keepAlive;
this.acquisitionTimeout = acquisitionTimeout;
this.connectionTTL = connectionTTL;
this.establishTimeout = establishTimeout;
this.maxIdleTime = maxIdleTime;
this.socketTimeout = socketTimeout;
this.expectContinueEnabled = expectContinueEnabled;
}

int getMaxConnections() {
Expand Down Expand Up @@ -545,6 +551,10 @@ Duration getSocketTimeout() {
return socketTimeout;
}

boolean isExpectContinueEnabled() {
return expectContinueEnabled;
}

@Override
public String toString() {
return "ConnectionSettings{" +
Expand All @@ -555,6 +565,7 @@ public String toString() {
", establishTimeout=" + establishTimeout +
", maxIdleTime=" + maxIdleTime +
", socketTimeout=" + socketTimeout +
", expectContinueEnabled=" + expectContinueEnabled +
'}';
}
}
Expand Down Expand Up @@ -615,14 +626,18 @@ static ConnectionSettings createConnectionSettings(Configuration conf) {
DEFAULT_SOCKET_TIMEOUT_DURATION, TimeUnit.MILLISECONDS,
minimumOperationDuration);

final boolean expectContinueEnabled = conf.getBoolean(CONNECTION_EXPECT_CONTINUE,
CONNECTION_EXPECT_CONTINUE_DEFAULT);

return new ConnectionSettings(
maxConnections,
keepAlive,
acquisitionTimeout,
connectionTTL,
establishTimeout,
maxIdleTime,
socketTimeout);
socketTimeout,
expectContinueEnabled);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,19 @@ If you are working with third party stores, please check [third party stores in

See [Timeouts](performance.html#timeouts).

### <a name="networking"></a> Low-level Network Options
### <a name="networking"></a> Low-level Network/Http Options

The S3A connector uses [Apache HttpClient](https://hc.apache.org/index.html) to connect to
S3 Stores.
The client is configured to create a pool of HTTP connections with S3, so that once
the initial set of connections have been made they can be re-used for followup operations.

Core aspects of pool settings are:
* The pool size is set by `fs.s3a.connection.maximum` -if a process asks for more connections than this then
threads will be blocked until they are available.
* The time blocked before an exception is raised is set in `fs.s3a.connection.acquisition.timeout`.
* The time an idle connection will be kept in the pool is set by `fs.s3a.connection.idle.time`.
* The time limit for even a non-idle connection to be kept open is set in `fs.s3a.connection.ttl`.

```xml

Expand All @@ -163,6 +175,69 @@ See [Timeouts](performance.html#timeouts).
</description>
</property>

<property>
<name>fs.s3a.connection.acquisition.timeout</name>
<value>60s</value>
<description>
Time to wait for an HTTP connection from the pool.
Too low: operations fail on a busy process.
When high, it isn't obvious that the connection pool is overloaded,
simply that jobs are slow.
</description>
</property>

<property>
<name>fs.s3a.connection.request.timeout</name>
<value>60s</value>
<description>
Total time for a single request to take from the HTTP verb to the
response from the server.
0 means "no limit"
</description>
</property>

<property>
<name>fs.s3a.connection.part.upload.timeout</name>
<value>15m</value>
<description>
Timeout for uploading all of a small object or a single part
of a larger one.
</description>
</property>

<property>
<name>fs.s3a.connection.ttl</name>
<value>5m</value>
<description>
Expiration time of an Http connection from the connection pool:
</description>
</property>

<property>
<name>fs.s3a.connection.idle.time</name>
<value>60s</value>
<description>
Time for an idle HTTP connection to be kept the HTTP connection
pool before being closed.
Too low: overhead of creating connections.
Too high, risk of stale connections and inability to use the
adaptive load balancing of the S3 front end.
</description>
</property>

<property>
<name>fs.s3a.connection.expect.continue</name>
<value>true</value>
<description>
Should PUT requests await a 100 CONTINUE responses before uploading
data?
This should normally be left alone unless a third party store which
does not support it is encountered, or file upload over long
distance networks time out.
(see HADOOP-19317 as an example)
</description>
</property>

<property>
<name>fs.s3a.connection.ssl.enabled</name>
<value>true</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.s3a.S3ATestUtils;

import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_EXPECT_CONTINUE;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;

/**
Expand All @@ -47,8 +49,8 @@ public class ITestS3AContractCreate extends AbstractContractCreateTest {
@Parameterized.Parameters
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{false},
{true}
{false, false},
{true, true}
});
}

Expand All @@ -57,8 +59,15 @@ public static Collection<Object[]> params() {
*/
private final boolean createPerformance;

public ITestS3AContractCreate(final boolean createPerformance) {
/**
* Expect a 100-continue response?
*/
private final boolean expectContinue;

public ITestS3AContractCreate(final boolean createPerformance,
final boolean expectContinue) {
this.createPerformance = createPerformance;
this.expectContinue = expectContinue;
}

@Override
Expand All @@ -71,6 +80,10 @@ protected Configuration createConfiguration() {
final Configuration conf = setPerformanceFlags(
super.createConfiguration(),
createPerformance ? "create" : "");
removeBaseAndBucketOverrides(
conf,
CONNECTION_EXPECT_CONTINUE);
conf.setBoolean(CONNECTION_EXPECT_CONTINUE, expectContinue);
S3ATestUtils.disableFilesystemCaching(conf);
return conf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.fs.s3a.Constants;

import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_EXPECT_CONTINUE;
import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
Expand Down Expand Up @@ -69,18 +70,23 @@ private boolean isMultipartCopyEnabled() {
* Create a configuration without multipart upload,
* and a long request timeout to allow for a very slow
* PUT in close.
* <p>
* 100-continue is disabled so as to verify the behavior
* on a large PUT.
* @return the configuration to create the test FS with.
*/
@Override
protected Configuration createScaleConfiguration() {
Configuration conf = super.createScaleConfiguration();
removeBaseAndBucketOverrides(conf,
CONNECTION_EXPECT_CONTINUE,
IO_CHUNK_BUFFER_SIZE,
MIN_MULTIPART_THRESHOLD,
MULTIPART_UPLOADS_ENABLED,
MULTIPART_SIZE,
PART_UPLOAD_TIMEOUT,
REQUEST_TIMEOUT);
conf.setBoolean(CONNECTION_EXPECT_CONTINUE, false);
conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360);
conf.setInt(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
Expand Down

0 comments on commit 7543f3a

Please sign in to comment.