Skip to content

Commit 419a4d7

Browse files
saxenapranavsteveloughran
authored andcommitted
HADOOP-18546. ABFS. disable purging list of in progress reads in abfs stream close() (#5176)
This addresses HADOOP-18521, "ABFS ReadBufferManager buffer sharing across concurrent HTTP requests" by not trying to cancel in progress reads. It supercedes HADOOP-18528, which disables the prefetching. If that patch is applied *after* this one, prefetching will be disabled. As well as changing the default value in the code, core-default.xml is updated to set fs.azure.enable.readahead = true As a result, if Configuration.get("fs.azure.enable.readahead") returns a non-null value, then it can be inferred that it was set in or core-default.xml (the fix is present) or in core-site.xml (someone asked for it). Note: this commit contains the followup commit: HADOOP-18546. Followup: ITestReadBufferManager fix (#5198) That is needed to avoid race conditions in the test. Contributed by Pranav Saxena.
1 parent 79aa584 commit 419a4d7

File tree

7 files changed

+87
-25
lines changed

7 files changed

+87
-25
lines changed

hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2143,9 +2143,8 @@ The switch to turn S3A auditing on or off.
21432143

21442144
<property>
21452145
<name>fs.azure.enable.readahead</name>
2146-
<value>false</value>
2147-
<description>Disable readahead/prefetching in AbfsInputStream.
2148-
See HADOOP-18521</description>
2146+
<value>true</value>
2147+
<description>Enabled readahead/prefetching in AbfsInputStream.</description>
21492148
</property>
21502149

21512150
<property>

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public final class FileSystemConfigurations {
109109
public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false;
110110
public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;
111111

112-
public static final boolean DEFAULT_ENABLE_READAHEAD = false;
112+
public static final boolean DEFAULT_ENABLE_READAHEAD = true;
113113
public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
114114
public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";
115115

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
3535

3636
private boolean tolerateOobAppends;
3737

38-
private boolean isReadAheadEnabled = false;
38+
private boolean isReadAheadEnabled = true;
3939

4040
private boolean alwaysReadBufferSize;
4141

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,6 @@ public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
544544
LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
545545
readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream);
546546
purgeList(stream, completedReadList);
547-
purgeList(stream, inProgressList);
548547
}
549548

550549
/**
@@ -642,4 +641,9 @@ void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) {
642641
freeList.clear();
643642
completedReadList.add(buf);
644643
}
644+
645+
@VisibleForTesting
646+
int getNumBuffers() {
647+
return NUM_BUFFERS;
648+
}
645649
}

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434

3535
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_AHEAD_RANGE;
3636
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE;
37-
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD;
3837
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
3938
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
4039
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
@@ -69,7 +68,6 @@ protected Configuration createConfiguration() {
6968
protected AbstractFSContract createContract(final Configuration conf) {
7069
conf.setInt(AZURE_READ_AHEAD_RANGE, MIN_BUFFER_SIZE);
7170
conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE);
72-
conf.setBoolean(FS_AZURE_ENABLE_READAHEAD, true);
7371
return new AbfsFileSystemContract(conf, isSecure);
7472
}
7573

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.Callable;
2626
import java.util.concurrent.ExecutorService;
2727
import java.util.concurrent.Executors;
28+
import java.util.concurrent.TimeUnit;
2829

2930
import org.apache.hadoop.conf.Configuration;
3031
import org.apache.hadoop.fs.FSDataInputStream;
@@ -74,17 +75,14 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception {
7475
}
7576
} finally {
7677
executorService.shutdown();
78+
// wait for all tasks to finish
79+
executorService.awaitTermination(1, TimeUnit.MINUTES);
7780
}
7881

7982
ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
80-
assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy());
83+
// verify there is no work in progress or the readahead queue.
8184
assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
8285
assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
83-
Assertions.assertThat(bufferManager.getFreeListCopy())
84-
.describedAs("After closing all streams free list contents should match with " + freeList)
85-
.hasSize(numBuffers)
86-
.containsExactlyInAnyOrderElementsOf(freeList);
87-
8886
}
8987

9088
private void assertListEmpty(String listName, List<ReadBuffer> list) {
@@ -116,22 +114,18 @@ public void testPurgeBufferManagerForSequentialStream() throws Exception {
116114
try {
117115
iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream();
118116
iStream2.read();
119-
// After closing stream1, none of the buffers associated with stream1 should be present.
120-
assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream1);
121-
assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream1);
117+
// After closing stream1, no queued buffers of stream1 should be present
118+
// assertions can't be made about the state of the other lists as it is
119+
// too prone to race conditions.
122120
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1);
123121
} finally {
124122
// closing the stream later.
125123
IOUtils.closeStream(iStream2);
126124
}
127-
// After closing stream2, none of the buffers associated with stream2 should be present.
128-
assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream2);
129-
assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream2);
125+
// After closing stream2, no queued buffers of stream2 should be present.
130126
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2);
131127

132-
// After closing both the streams, all lists should be empty.
133-
assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy());
134-
assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
128+
// After closing both the streams, read queue should be empty.
135129
assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
136130

137131
}

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ public class TestAbfsInputStream extends
8282
REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec
8383
private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * ONE_MB;
8484

85+
@Override
86+
public void teardown() throws Exception {
87+
super.teardown();
88+
ReadBufferManager.getBufferManager().testResetReadBufferManager();
89+
}
90+
8591
private AbfsRestOperation getMockRestOp() {
8692
AbfsRestOperation op = mock(AbfsRestOperation.class);
8793
AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class);
@@ -106,7 +112,6 @@ private AbfsClient getMockAbfsClient() {
106112
private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient,
107113
String fileName) throws IOException {
108114
AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
109-
inputStreamContext.isReadAheadEnabled(true);
110115
// Create AbfsInputStream with the client instance
111116
AbfsInputStream inputStream = new AbfsInputStream(
112117
mockAbfsClient,
@@ -132,7 +137,6 @@ public AbfsInputStream getAbfsInputStream(AbfsClient abfsClient,
132137
boolean alwaysReadBufferSize,
133138
int readAheadBlockSize) throws IOException {
134139
AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
135-
inputStreamContext.isReadAheadEnabled(true);
136140
// Create AbfsInputStream with the client instance
137141
AbfsInputStream inputStream = new AbfsInputStream(
138142
abfsClient,
@@ -495,6 +499,69 @@ public void testSuccessfulReadAhead() throws Exception {
495499
checkEvictedStatus(inputStream, 0, true);
496500
}
497501

502+
/**
503+
* This test expects InProgressList is not purged by the inputStream close.
504+
*/
505+
@Test
506+
public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
507+
AbfsClient client = getMockAbfsClient();
508+
AbfsRestOperation successOp = getMockRestOp();
509+
final Long serverCommunicationMockLatency = 3_000L;
510+
final Long readBufferTransferToInProgressProbableTime = 1_000L;
511+
final Integer readBufferQueuedCount = 3;
512+
513+
Mockito.doAnswer(invocationOnMock -> {
514+
//sleeping thread to mock the network latency from client to backend.
515+
Thread.sleep(serverCommunicationMockLatency);
516+
return successOp;
517+
})
518+
.when(client)
519+
.read(any(String.class), any(Long.class), any(byte[].class),
520+
any(Integer.class), any(Integer.class), any(String.class),
521+
any(String.class), any(TracingContext.class));
522+
523+
final ReadBufferManager readBufferManager
524+
= ReadBufferManager.getBufferManager();
525+
526+
final int readBufferTotal = readBufferManager.getNumBuffers();
527+
final int expectedFreeListBufferCount = readBufferTotal
528+
- readBufferQueuedCount;
529+
530+
try (AbfsInputStream inputStream = getAbfsInputStream(client,
531+
"testSuccessfulReadAhead.txt")) {
532+
// As this is try-with-resources block, the close() method of the created
533+
// abfsInputStream object shall be called on the end of the block.
534+
queueReadAheads(inputStream);
535+
536+
//Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
537+
Thread.sleep(readBufferTransferToInProgressProbableTime);
538+
539+
Assertions.assertThat(readBufferManager.getInProgressCopiedList())
540+
.describedAs(String.format("InProgressList should have %d elements",
541+
readBufferQueuedCount))
542+
.hasSize(readBufferQueuedCount);
543+
Assertions.assertThat(readBufferManager.getFreeListCopy())
544+
.describedAs(String.format("FreeList should have %d elements",
545+
expectedFreeListBufferCount))
546+
.hasSize(expectedFreeListBufferCount);
547+
Assertions.assertThat(readBufferManager.getCompletedReadListCopy())
548+
.describedAs("CompletedList should have 0 elements")
549+
.hasSize(0);
550+
}
551+
552+
Assertions.assertThat(readBufferManager.getInProgressCopiedList())
553+
.describedAs(String.format("InProgressList should have %d elements",
554+
readBufferQueuedCount))
555+
.hasSize(readBufferQueuedCount);
556+
Assertions.assertThat(readBufferManager.getFreeListCopy())
557+
.describedAs(String.format("FreeList should have %d elements",
558+
expectedFreeListBufferCount))
559+
.hasSize(expectedFreeListBufferCount);
560+
Assertions.assertThat(readBufferManager.getCompletedReadListCopy())
561+
.describedAs("CompletedList should have 0 elements")
562+
.hasSize(0);
563+
}
564+
498565
/**
499566
* This test expects ReadAheadManager to throw exception if the read ahead
500567
* thread had failed within the last thresholdAgeMilliseconds.

0 commit comments

Comments
 (0)