@@ -65,6 +65,7 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception {
6565 ExecutorService executorService = Executors .newFixedThreadPool (4 );
6666 AzureBlobFileSystem fs = getABFSWithReadAheadConfig ();
6767 final Set <ReadBuffer > inProgressBuffers = new HashSet <>();
68+ final Set <AbfsInputStream > streamsInTest = new HashSet <>();
6869 final ReadBufferManager bufferManager = ReadBufferManager .getBufferManager ();
6970 final Boolean [] executionCompletion = new Boolean [4 ];
7071 Arrays .fill (executionCompletion , false );
@@ -76,10 +77,9 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception {
7677 byte [] fileContent = getRandomBytesArray (ONE_MB );
7778 Path testFilePath = createFileWithContent (fs , fileName , fileContent );
7879 try (FSDataInputStream iStream = fs .open (testFilePath )) {
80+ streamsInTest .add ((AbfsInputStream ) iStream .getWrappedStream ());
7981 iStream .read ();
80- for (ReadBuffer buffer : bufferManager .getInProgressCopiedList ()) {
81- inProgressBuffers .add (buffer );
82- }
82+ inProgressBuffers .addAll (bufferManager .getInProgressCopiedList ());
8383 }
8484 executionCompletion [iteration ] = true ;
8585 return null ;
@@ -93,13 +93,18 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception {
9393 Thread .sleep (checkExecutionWaitTime );
9494 }
9595
96- assertCompletedListContainsSubSetOfCertainSet (bufferManager .getCompletedReadListCopy (), inProgressBuffers );
97- assertListEmpty ("ReadAheadQueue" , bufferManager .getReadAheadQueueCopy ());
96+ assertCompletedListContainsSubSetOfCertainSet (bufferManager .getCompletedReadListCopy (), inProgressBuffers , streamsInTest );
97+ for (AbfsInputStream stream : streamsInTest ) {
98+ assertListDoesnotContainBuffersForIstream (bufferManager .getReadAheadQueueCopy (), stream );
99+ }
98100 }
99101
100102 private void assertCompletedListContainsSubSetOfCertainSet (final List <ReadBuffer > completedList ,
101- Set <ReadBuffer > bufferSet ) {
103+ Set <ReadBuffer > bufferSet , final Set < AbfsInputStream > streamsInTest ) {
102104 for (ReadBuffer buffer : completedList ) {
105+ if (!streamsInTest .contains (buffer .getStream ())) {
106+ return ;
107+ }
103108 Assertions .assertThat (bufferSet )
104109 .describedAs (
105110 "CompletedList contains a buffer which is not part of bufferSet." )
@@ -132,11 +137,13 @@ public void testPurgeBufferManagerForSequentialStream() throws Exception {
132137 Path testFilePath = createFileWithContent (fs , fileName , fileContent );
133138 Set <ReadBuffer > inProgressBufferSet = new HashSet <>();
134139 ReadBufferManager bufferManager = ReadBufferManager .getBufferManager ();
140+ final Set <AbfsInputStream > streamsInTest = new HashSet <>();
135141
136142 AbfsInputStream iStream1 = null ;
137143 // stream1 will be closed right away.
138144 try {
139145 iStream1 = (AbfsInputStream ) fs .open (testFilePath ).getWrappedStream ();
146+ streamsInTest .add (iStream1 );
140147 // Just reading one byte will trigger all read ahead calls.
141148 iStream1 .read ();
142149 } finally {
@@ -146,6 +153,7 @@ public void testPurgeBufferManagerForSequentialStream() throws Exception {
146153 AbfsInputStream iStream2 = null ;
147154 try {
148155 iStream2 = (AbfsInputStream ) fs .open (testFilePath ).getWrappedStream ();
156+ streamsInTest .add (iStream2 );
149157 iStream2 .read ();
150158 // After closing stream1, none of the buffers associated with stream1 should be present.
151159 assertListDoesnotContainBuffersForIstream (bufferManager .getReadAheadQueueCopy (), iStream1 );
@@ -157,10 +165,8 @@ public void testPurgeBufferManagerForSequentialStream() throws Exception {
157165 // After closing stream2, none of the buffers associated with stream2 should be present.
158166 assertListDoesnotContainBuffersForIstream (bufferManager .getReadAheadQueueCopy (), iStream2 );
159167
160- // After closing both the streams, all lists should be empty.
161- assertListEmpty ("ReadAheadQueue" , bufferManager .getReadAheadQueueCopy ());
162-
163- assertCompletedListContainsSubSetOfCertainSet (bufferManager .getCompletedReadListCopy (), inProgressBufferSet );
168+ assertCompletedListContainsSubSetOfCertainSet (bufferManager .getCompletedReadListCopy (), inProgressBufferSet ,
169+ streamsInTest );
164170
165171 }
166172
0 commit comments