Skip to content

Commit 7f9ca10

Browse files
HADOOP-18517. ABFS: Add fs.azure.enable.readahead option to disable readahead (#5103)
* HADOOP-18517. ABFS: Add fs.azure.enable.readahead option to disable readahead Adds new config option to turn off readahead * also allows it to be passed in through openFile(), * extends ITestAbfsReadWriteAndSeek to use the option, including one replicated test...that shows that turning it off is slower. Important: this does not address the critical data corruption issue HADOOP-18521. ABFS ReadBufferManager buffer sharing across concurrent HTTP requests What is does do is provide a way to completely bypass the ReadBufferManager. To mitigate the problem, either fs.azure.enable.readahead needs to be set to false, or set "fs.azure.readaheadqueue.depth" to 0 -this still goes near the (broken) ReadBufferManager code, but does't trigger the bug. For safe reading of files through the ABFS connector, readahead MUST be disabled or the followup fix to HADOOP-18521 applied Contributed by Steve Loughran
1 parent 845cf8b commit 7f9ca10

File tree

8 files changed

+66
-12
lines changed

8 files changed

+66
-12
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,11 @@ public class AbfsConfiguration{
302302
DefaultValue = DEFAULT_ABFS_LATENCY_TRACK)
303303
private boolean trackLatency;
304304

305+
@BooleanConfigurationValidatorAnnotation(
306+
ConfigurationKey = FS_AZURE_ENABLE_READAHEAD,
307+
DefaultValue = DEFAULT_ENABLE_READAHEAD)
308+
private boolean enabledReadAhead;
309+
305310
@LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS,
306311
MinValue = 0,
307312
DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS)
@@ -915,6 +920,15 @@ public SASTokenProvider getSASTokenProvider() throws AzureBlobFileSystemExceptio
915920
}
916921
}
917922

923+
public boolean isReadAheadEnabled() {
924+
return this.enabledReadAhead;
925+
}
926+
927+
@VisibleForTesting
928+
void setReadAheadEnabled(final boolean enabledReadAhead) {
929+
this.enabledReadAhead = enabledReadAhead;
930+
}
931+
918932
public int getReadAheadRange() {
919933
return this.readAheadRange;
920934
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -808,6 +808,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext(
808808
.withReadBufferSize(abfsConfiguration.getReadBufferSize())
809809
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
810810
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
811+
.isReadAheadEnabled(abfsConfiguration.isReadAheadEnabled())
811812
.withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely())
812813
.withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead())
813814
.withReadAheadRange(abfsConfiguration.getReadAheadRange())

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,13 @@ public final class ConfigurationKeys {
186186
public static final String FS_AZURE_SKIP_SUPER_USER_REPLACEMENT = "fs.azure.identity.transformer.skip.superuser.replacement";
187187
public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER = "fs.azure.account.keyprovider";
188188
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
189+
190+
/**
191+
* Enable or disable readahead buffer in AbfsInputStream.
192+
* Value: {@value}.
193+
*/
194+
public static final String FS_AZURE_ENABLE_READAHEAD = "fs.azure.enable.readahead";
195+
189196
/** Setting this true will make the driver use it's own RemoteIterator implementation */
190197
public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator";
191198
/** Server side encryption key */

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ public final class FileSystemConfigurations {
106106
public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false;
107107
public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;
108108

109+
public static final boolean DEFAULT_ENABLE_READAHEAD = true;
109110
public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
110111
public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";
111112

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public AbfsInputStream(
137137
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
138138
this.eTag = eTag;
139139
this.readAheadRange = abfsInputStreamContext.getReadAheadRange();
140-
this.readAheadEnabled = true;
140+
this.readAheadEnabled = abfsInputStreamContext.isReadAheadEnabled();
141141
this.alwaysReadBufferSize
142142
= abfsInputStreamContext.shouldReadBufferSizeAlways();
143143
this.bufferedPreadDisabled = abfsInputStreamContext
@@ -745,6 +745,11 @@ byte[] getBuffer() {
745745
return buffer;
746746
}
747747

748+
@VisibleForTesting
749+
public boolean isReadAheadEnabled() {
750+
return readAheadEnabled;
751+
}
752+
748753
@VisibleForTesting
749754
public int getReadAheadRange() {
750755
return readAheadRange;

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
3535

3636
private boolean tolerateOobAppends;
3737

38+
private boolean isReadAheadEnabled = true;
39+
3840
private boolean alwaysReadBufferSize;
3941

4042
private int readAheadBlockSize;
@@ -72,6 +74,12 @@ public AbfsInputStreamContext withTolerateOobAppends(
7274
return this;
7375
}
7476

77+
public AbfsInputStreamContext isReadAheadEnabled(
78+
final boolean isReadAheadEnabled) {
79+
this.isReadAheadEnabled = isReadAheadEnabled;
80+
return this;
81+
}
82+
7583
public AbfsInputStreamContext withReadAheadRange(
7684
final int readAheadRange) {
7785
this.readAheadRange = readAheadRange;
@@ -141,6 +149,10 @@ public boolean isTolerateOobAppends() {
141149
return tolerateOobAppends;
142150
}
143151

152+
public boolean isReadAheadEnabled() {
153+
return isReadAheadEnabled;
154+
}
155+
144156
public int getReadAheadRange() {
145157
return readAheadRange;
146158
}

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,14 @@
3232
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
3333
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
3434
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
35-
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
3635
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
3736

3837
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
3938
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
4039
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
4140
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE;
4241
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
42+
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
4343

4444
/**
4545
* Test read, write and seek.
@@ -50,18 +50,27 @@
5050
public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
5151
private static final String TEST_PATH = "/testfile";
5252

53-
@Parameterized.Parameters(name = "Size={0}")
53+
/**
54+
* Parameterize on read buffer size and readahead.
55+
* For test performance, a full x*y test matrix is not used.
56+
* @return the test parameters
57+
*/
58+
@Parameterized.Parameters(name = "Size={0}-readahead={1}")
5459
public static Iterable<Object[]> sizes() {
55-
return Arrays.asList(new Object[][]{{MIN_BUFFER_SIZE},
56-
{DEFAULT_READ_BUFFER_SIZE},
57-
{APPENDBLOB_MAX_WRITE_BUFFER_SIZE},
58-
{MAX_BUFFER_SIZE}});
60+
return Arrays.asList(new Object[][]{{MIN_BUFFER_SIZE, true},
61+
{DEFAULT_READ_BUFFER_SIZE, false},
62+
{DEFAULT_READ_BUFFER_SIZE, true},
63+
{APPENDBLOB_MAX_WRITE_BUFFER_SIZE, false},
64+
{MAX_BUFFER_SIZE, true}});
5965
}
6066

6167
private final int size;
68+
private final boolean readaheadEnabled;
6269

63-
public ITestAbfsReadWriteAndSeek(final int size) throws Exception {
70+
public ITestAbfsReadWriteAndSeek(final int size,
71+
final boolean readaheadEnabled) throws Exception {
6472
this.size = size;
73+
this.readaheadEnabled = readaheadEnabled;
6574
}
6675

6776
@Test
@@ -74,6 +83,7 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception {
7483
final AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
7584
abfsConfiguration.setWriteBufferSize(bufferSize);
7685
abfsConfiguration.setReadBufferSize(bufferSize);
86+
abfsConfiguration.setReadAheadEnabled(readaheadEnabled);
7787

7888
final byte[] b = new byte[2 * bufferSize];
7989
new Random().nextBytes(b);
@@ -85,7 +95,7 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception {
8595
} finally{
8696
stream.close();
8797
}
88-
IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream);
98+
logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream);
8999

90100
final byte[] readBuffer = new byte[2 * bufferSize];
91101
int result;
@@ -109,7 +119,7 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception {
109119
inputStream.seek(0);
110120
result = inputStream.read(readBuffer, 0, bufferSize);
111121
}
112-
IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource);
122+
logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource);
113123

114124
assertNotEquals("data read in final read()", -1, result);
115125
assertArrayEquals(readBuffer, b);
@@ -121,6 +131,7 @@ public void testReadAheadRequestID() throws java.io.IOException {
121131
final AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
122132
int bufferSize = MIN_BUFFER_SIZE;
123133
abfsConfiguration.setReadBufferSize(bufferSize);
134+
abfsConfiguration.setReadAheadEnabled(readaheadEnabled);
124135

125136
final byte[] b = new byte[bufferSize * 10];
126137
new Random().nextBytes(b);
@@ -132,8 +143,10 @@ public void testReadAheadRequestID() throws java.io.IOException {
132143
((AbfsOutputStream) stream.getWrappedStream())
133144
.getStreamID()));
134145
stream.write(b);
146+
logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream);
135147
}
136148

149+
137150
final byte[] readBuffer = new byte[4 * bufferSize];
138151
int result;
139152
fs.registerListener(
@@ -146,6 +159,7 @@ public void testReadAheadRequestID() throws java.io.IOException {
146159
((AbfsInputStream) inputStream.getWrappedStream())
147160
.getStreamID()));
148161
result = inputStream.read(readBuffer, 0, bufferSize*4);
162+
logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, inputStream);
149163
}
150164
fs.registerListener(null);
151165
}

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,10 @@ public void runCorrelationTestForAllMethods() throws Exception {
130130

131131
testClasses.put(new ITestAzureBlobFileSystemListStatus(), //liststatus
132132
ITestAzureBlobFileSystemListStatus.class.getMethod("testListPath"));
133-
testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE), //open,
133+
testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE, true), //open,
134134
// read, write
135135
ITestAbfsReadWriteAndSeek.class.getMethod("testReadAheadRequestID"));
136-
testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE), //read (bypassreadahead)
136+
testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE, false), //read (bypassreadahead)
137137
ITestAbfsReadWriteAndSeek.class
138138
.getMethod("testReadAndWriteWithDifferentBufferSizesAndSeek"));
139139
testClasses.put(new ITestAzureBlobFileSystemAppend(), //append

0 commit comments

Comments
 (0)