Skip to content

Commit c67c2b7

Browse files
authored
HADOOP-18546. ABFS. disable purging list of in progress reads in abfs stream close() (#5176)
This addresses HADOOP-18521, "ABFS ReadBufferManager buffer sharing across concurrent HTTP requests" by not trying to cancel in progress reads. It supercedes HADOOP-18528, which disables the prefetching. If that patch is applied *after* this one, prefetching will be disabled. As well as changing the default value in the code, core-default.xml is updated to set fs.azure.enable.readahead = true As a result, if Configuration.get("fs.azure.enable.readahead") returns a non-null value, then it can be inferred that it was set in or core-default.xml (the fix is present) or in core-site.xml (someone asked for it). Contributed by Pranav Saxena.
1 parent 2e88096 commit c67c2b7

File tree

6 files changed

+78
-10
lines changed

6 files changed

+78
-10
lines changed

hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2168,9 +2168,8 @@ The switch to turn S3A auditing on or off.
21682168

21692169
<property>
21702170
<name>fs.azure.enable.readahead</name>
2171-
<value>false</value>
2172-
<description>Disable readahead/prefetching in AbfsInputStream.
2173-
See HADOOP-18521</description>
2171+
<value>true</value>
2172+
<description>Enabled readahead/prefetching in AbfsInputStream.</description>
21742173
</property>
21752174

21762175
<property>

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public final class FileSystemConfigurations {
109109
public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false;
110110
public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;
111111

112-
public static final boolean DEFAULT_ENABLE_READAHEAD = false;
112+
public static final boolean DEFAULT_ENABLE_READAHEAD = true;
113113
public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
114114
public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";
115115

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
3535

3636
private boolean tolerateOobAppends;
3737

38-
private boolean isReadAheadEnabled = false;
38+
private boolean isReadAheadEnabled = true;
3939

4040
private boolean alwaysReadBufferSize;
4141

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,6 @@ public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
544544
LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
545545
readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream);
546546
purgeList(stream, completedReadList);
547-
purgeList(stream, inProgressList);
548547
}
549548

550549
/**
@@ -642,4 +641,9 @@ void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) {
642641
freeList.clear();
643642
completedReadList.add(buf);
644643
}
644+
645+
@VisibleForTesting
646+
int getNumBuffers() {
647+
return NUM_BUFFERS;
648+
}
645649
}

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434

3535
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_AHEAD_RANGE;
3636
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE;
37-
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD;
3837
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
3938
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
4039
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
@@ -69,7 +68,6 @@ protected Configuration createConfiguration() {
6968
protected AbstractFSContract createContract(final Configuration conf) {
7069
conf.setInt(AZURE_READ_AHEAD_RANGE, MIN_BUFFER_SIZE);
7170
conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE);
72-
conf.setBoolean(FS_AZURE_ENABLE_READAHEAD, true);
7371
return new AbfsFileSystemContract(conf, isSecure);
7472
}
7573

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ public class TestAbfsInputStream extends
8282
REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec
8383
private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * ONE_MB;
8484

85+
@Override
86+
public void teardown() throws Exception {
87+
super.teardown();
88+
ReadBufferManager.getBufferManager().testResetReadBufferManager();
89+
}
90+
8591
private AbfsRestOperation getMockRestOp() {
8692
AbfsRestOperation op = mock(AbfsRestOperation.class);
8793
AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class);
@@ -106,7 +112,6 @@ private AbfsClient getMockAbfsClient() {
106112
private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient,
107113
String fileName) throws IOException {
108114
AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
109-
inputStreamContext.isReadAheadEnabled(true);
110115
// Create AbfsInputStream with the client instance
111116
AbfsInputStream inputStream = new AbfsInputStream(
112117
mockAbfsClient,
@@ -132,7 +137,6 @@ public AbfsInputStream getAbfsInputStream(AbfsClient abfsClient,
132137
boolean alwaysReadBufferSize,
133138
int readAheadBlockSize) throws IOException {
134139
AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
135-
inputStreamContext.isReadAheadEnabled(true);
136140
// Create AbfsInputStream with the client instance
137141
AbfsInputStream inputStream = new AbfsInputStream(
138142
abfsClient,
@@ -495,6 +499,69 @@ public void testSuccessfulReadAhead() throws Exception {
495499
checkEvictedStatus(inputStream, 0, true);
496500
}
497501

502+
/**
503+
* This test expects InProgressList is not purged by the inputStream close.
504+
*/
505+
@Test
506+
public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
507+
AbfsClient client = getMockAbfsClient();
508+
AbfsRestOperation successOp = getMockRestOp();
509+
final Long serverCommunicationMockLatency = 3_000L;
510+
final Long readBufferTransferToInProgressProbableTime = 1_000L;
511+
final Integer readBufferQueuedCount = 3;
512+
513+
Mockito.doAnswer(invocationOnMock -> {
514+
//sleeping thread to mock the network latency from client to backend.
515+
Thread.sleep(serverCommunicationMockLatency);
516+
return successOp;
517+
})
518+
.when(client)
519+
.read(any(String.class), any(Long.class), any(byte[].class),
520+
any(Integer.class), any(Integer.class), any(String.class),
521+
any(String.class), any(TracingContext.class));
522+
523+
final ReadBufferManager readBufferManager
524+
= ReadBufferManager.getBufferManager();
525+
526+
final int readBufferTotal = readBufferManager.getNumBuffers();
527+
final int expectedFreeListBufferCount = readBufferTotal
528+
- readBufferQueuedCount;
529+
530+
try (AbfsInputStream inputStream = getAbfsInputStream(client,
531+
"testSuccessfulReadAhead.txt")) {
532+
// As this is try-with-resources block, the close() method of the created
533+
// abfsInputStream object shall be called on the end of the block.
534+
queueReadAheads(inputStream);
535+
536+
//Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
537+
Thread.sleep(readBufferTransferToInProgressProbableTime);
538+
539+
Assertions.assertThat(readBufferManager.getInProgressCopiedList())
540+
.describedAs(String.format("InProgressList should have %d elements",
541+
readBufferQueuedCount))
542+
.hasSize(readBufferQueuedCount);
543+
Assertions.assertThat(readBufferManager.getFreeListCopy())
544+
.describedAs(String.format("FreeList should have %d elements",
545+
expectedFreeListBufferCount))
546+
.hasSize(expectedFreeListBufferCount);
547+
Assertions.assertThat(readBufferManager.getCompletedReadListCopy())
548+
.describedAs("CompletedList should have 0 elements")
549+
.hasSize(0);
550+
}
551+
552+
Assertions.assertThat(readBufferManager.getInProgressCopiedList())
553+
.describedAs(String.format("InProgressList should have %d elements",
554+
readBufferQueuedCount))
555+
.hasSize(readBufferQueuedCount);
556+
Assertions.assertThat(readBufferManager.getFreeListCopy())
557+
.describedAs(String.format("FreeList should have %d elements",
558+
expectedFreeListBufferCount))
559+
.hasSize(expectedFreeListBufferCount);
560+
Assertions.assertThat(readBufferManager.getCompletedReadListCopy())
561+
.describedAs("CompletedList should have 0 elements")
562+
.hasSize(0);
563+
}
564+
498565
/**
499566
* This test expects ReadAheadManager to throw exception if the read ahead
500567
* thread had failed within the last thresholdAgeMilliseconds.

0 commit comments

Comments
 (0)