Skip to content

Commit 8511926

Browse files
authored
HADOOP-17166. ABFS: configure output stream thread pool (#2179)
Adds the options to control the size of the per-output-stream threadpool when writing data through the abfs connector * fs.azure.write.max.concurrent.requests * fs.azure.write.max.requests.to.queue Contributed by Bilahari T H
1 parent 773ac79 commit 8511926

File tree

8 files changed

+163
-3
lines changed

8 files changed

+163
-3
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,14 @@ public class AbfsConfiguration{
8686
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED)
8787
private String isNamespaceEnabledAccount;
8888

89+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_MAX_CONCURRENT_REQUESTS,
90+
DefaultValue = -1)
91+
private int writeMaxConcurrentRequestCount;
92+
93+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_MAX_REQUESTS_TO_QUEUE,
94+
DefaultValue = -1)
95+
private int maxWriteRequestsToQueue;
96+
8997
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_BUFFER_SIZE,
9098
MinValue = MIN_BUFFER_SIZE,
9199
MaxValue = MAX_BUFFER_SIZE,
@@ -822,6 +830,20 @@ public ExponentialRetryPolicy getOauthTokenFetchRetryPolicy() {
822830
oauthTokenFetchRetryDeltaBackoff);
823831
}
824832

833+
public int getWriteMaxConcurrentRequestCount() {
834+
if (this.writeMaxConcurrentRequestCount < 1) {
835+
return 4 * Runtime.getRuntime().availableProcessors();
836+
}
837+
return this.writeMaxConcurrentRequestCount;
838+
}
839+
840+
public int getMaxWriteRequestsToQueue() {
841+
if (this.maxWriteRequestsToQueue < 1) {
842+
return 2 * getWriteMaxConcurrentRequestCount();
843+
}
844+
return this.maxWriteRequestsToQueue;
845+
}
846+
825847
@VisibleForTesting
826848
void setReadBufferSize(int bufferSize) {
827849
this.readBufferSize = bufferSize;

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,8 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppend
490490
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
491491
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
492492
.withAppendBlob(isAppendBlob)
493+
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
494+
.withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
493495
.build();
494496
}
495497

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public final class ConfigurationKeys {
5252
public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF = "fs.azure.oauth.token.fetch.retry.delta.backoff";
5353

5454
// Read and write buffer sizes defined by the user
55+
public static final String AZURE_WRITE_MAX_CONCURRENT_REQUESTS = "fs.azure.write.max.concurrent.requests";
56+
public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue";
5557
public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
5658
public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";
5759
public static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
7070
private byte[] buffer;
7171
private int bufferIndex;
7272
private final int maxConcurrentRequestCount;
73+
private final int maxRequestsThatCanBeQueued;
7374

7475
private ConcurrentLinkedDeque<WriteOperation> writeOperations;
7576
private final ThreadPoolExecutor threadExecutor;
@@ -119,8 +120,11 @@ public AbfsOutputStream(
119120
if (this.isAppendBlob) {
120121
this.maxConcurrentRequestCount = 1;
121122
} else {
122-
this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
123+
this.maxConcurrentRequestCount = abfsOutputStreamContext
124+
.getWriteMaxConcurrentRequestCount();
123125
}
126+
this.maxRequestsThatCanBeQueued = abfsOutputStreamContext
127+
.getMaxWriteRequestsToQueue();
124128
this.threadExecutor
125129
= new ThreadPoolExecutor(maxConcurrentRequestCount,
126130
maxConcurrentRequestCount,
@@ -371,7 +375,7 @@ private synchronized void writeCurrentBufferToService() throws IOException {
371375
final long offset = position;
372376
position += bytesLength;
373377

374-
if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
378+
if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) {
375379
long start = System.currentTimeMillis();
376380
waitForTaskToComplete();
377381
outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis());
@@ -543,6 +547,16 @@ public int getWriteOperationsSize() {
543547
return writeOperations.size();
544548
}
545549

550+
@VisibleForTesting
551+
int getMaxConcurrentRequestCount() {
552+
return this.maxConcurrentRequestCount;
553+
}
554+
555+
@VisibleForTesting
556+
int getMaxRequestsThatCanBeQueued() {
557+
return maxRequestsThatCanBeQueued;
558+
}
559+
546560
/**
547561
* Appending AbfsOutputStream statistics to base toString().
548562
*

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
3333

3434
private boolean isAppendBlob;
3535

36+
private int writeMaxConcurrentRequestCount;
37+
38+
private int maxWriteRequestsToQueue;
39+
3640
public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
3741
super(sasTokenRenewPeriodForStreamsInSeconds);
3842
}
@@ -71,6 +75,18 @@ public AbfsOutputStreamContext build() {
7175
return this;
7276
}
7377

78+
public AbfsOutputStreamContext withWriteMaxConcurrentRequestCount(
79+
final int writeMaxConcurrentRequestCount) {
80+
this.writeMaxConcurrentRequestCount = writeMaxConcurrentRequestCount;
81+
return this;
82+
}
83+
84+
public AbfsOutputStreamContext withMaxWriteRequestsToQueue(
85+
final int maxWriteRequestsToQueue) {
86+
this.maxWriteRequestsToQueue = maxWriteRequestsToQueue;
87+
return this;
88+
}
89+
7490
public int getWriteBufferSize() {
7591
return writeBufferSize;
7692
}
@@ -90,4 +106,12 @@ public AbfsOutputStreamStatistics getStreamStatistics() {
90106
public boolean isAppendBlob() {
91107
return isAppendBlob;
92108
}
109+
110+
public int getWriteMaxConcurrentRequestCount() {
111+
return this.writeMaxConcurrentRequestCount;
112+
}
113+
114+
public int getMaxWriteRequestsToQueue() {
115+
return this.maxWriteRequestsToQueue;
116+
}
93117
}

hadoop-tools/hadoop-azure/src/site/markdown/abfs.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,19 @@ will be -1. To disable readaheads, set this value to 0. If your workload is
796796
doing only random reads (non-sequential) or you are seeing throttling, you
797797
may try setting this value to 0.
798798

799+
To run under limited memory situations configure the following. Especially
800+
when there are too many writes from the same process.
801+
802+
`fs.azure.write.max.concurrent.requests`: To set the maximum concurrent
803+
write requests from an AbfsOutputStream instance to server at any point of
804+
time. Effectively this will be the threadpool size within the
805+
AbfsOutputStream instance. Set the value in between 1 to 8 both inclusive.
806+
807+
`fs.azure.write.max.requests.to.queue`: To set the maximum write requests
808+
that can be queued. Memory consumption of AbfsOutputStream instance can be
809+
tuned with this config considering each queued request holds a buffer. Set
810+
the value 3 or 4 times the value set for s.azure.write.max.concurrent.requests.
811+
799812
### <a name="securityconfigoptions"></a> Security Options
800813
`fs.azure.always.use.https`: Enforces to use HTTPS instead of HTTP when the flag
801814
is made true. Irrespective of the flag, AbfsClient will use HTTPS if the secure
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.azurebfs.services;
20+
21+
import org.assertj.core.api.Assertions;
22+
import org.junit.Test;
23+
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.fs.FSDataOutputStream;
26+
import org.apache.hadoop.fs.Path;
27+
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
28+
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
29+
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
30+
31+
/**
32+
* Test create operation.
33+
*/
34+
public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
35+
private static final Path TEST_FILE_PATH = new Path("testfile");
36+
37+
public ITestAbfsOutputStream() throws Exception {
38+
super();
39+
}
40+
41+
@Test
42+
public void testMaxRequestsAndQueueCapacityDefaults() throws Exception {
43+
Configuration conf = getRawConfiguration();
44+
final AzureBlobFileSystem fs = getFileSystem(conf);
45+
try (FSDataOutputStream out = fs.create(TEST_FILE_PATH)) {
46+
AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
47+
Assertions.assertThat(stream.getMaxConcurrentRequestCount()).describedAs(
48+
"maxConcurrentRequests should be " + getConfiguration()
49+
.getWriteMaxConcurrentRequestCount())
50+
.isEqualTo(getConfiguration().getWriteMaxConcurrentRequestCount());
51+
Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued()).describedAs(
52+
"maxRequestsToQueue should be " + getConfiguration()
53+
.getMaxWriteRequestsToQueue())
54+
.isEqualTo(getConfiguration().getMaxWriteRequestsToQueue());
55+
}
56+
}
57+
58+
@Test
59+
public void testMaxRequestsAndQueueCapacity() throws Exception {
60+
Configuration conf = getRawConfiguration();
61+
int maxConcurrentRequests = 6;
62+
int maxRequestsToQueue = 10;
63+
conf.set(ConfigurationKeys.AZURE_WRITE_MAX_CONCURRENT_REQUESTS,
64+
"" + maxConcurrentRequests);
65+
conf.set(ConfigurationKeys.AZURE_WRITE_MAX_REQUESTS_TO_QUEUE,
66+
"" + maxRequestsToQueue);
67+
final AzureBlobFileSystem fs = getFileSystem(conf);
68+
FSDataOutputStream out = fs.create(TEST_FILE_PATH);
69+
AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
70+
Assertions.assertThat(stream.getMaxConcurrentRequestCount())
71+
.describedAs("maxConcurrentRequests should be " + maxConcurrentRequests)
72+
.isEqualTo(maxConcurrentRequests);
73+
Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued())
74+
.describedAs("maxRequestsToQueue should be " + maxRequestsToQueue)
75+
.isEqualTo(maxRequestsToQueue);
76+
}
77+
78+
}

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.fs.azurebfs.services;
2020

21+
import java.io.IOException;
2122
import java.util.Arrays;
2223
import java.util.HashSet;
2324
import java.util.Random;
@@ -54,13 +55,17 @@ public final class TestAbfsOutputStream {
5455
private AbfsOutputStreamContext populateAbfsOutputStreamContext(int writeBufferSize,
5556
boolean isFlushEnabled,
5657
boolean disableOutputStreamFlush,
57-
boolean isAppendBlob) {
58+
boolean isAppendBlob) throws IOException, IllegalAccessException {
59+
AbfsConfiguration abfsConf = new AbfsConfiguration(new Configuration(),
60+
accountName1);
5861
return new AbfsOutputStreamContext(2)
5962
.withWriteBufferSize(writeBufferSize)
6063
.enableFlush(isFlushEnabled)
6164
.disableOutputStreamFlush(disableOutputStreamFlush)
6265
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
6366
.withAppendBlob(isAppendBlob)
67+
.withWriteMaxConcurrentRequestCount(abfsConf.getWriteMaxConcurrentRequestCount())
68+
.withMaxWriteRequestsToQueue(abfsConf.getMaxWriteRequestsToQueue())
6469
.build();
6570
}
6671

0 commit comments

Comments
 (0)