@@ -56,6 +56,7 @@ public ITestReadBufferManager() throws Exception {
5656 public void testPurgeBufferManagerForParallelStreams () throws Exception {
5757 describe ("Testing purging of buffers from ReadBufferManager for "
5858 + "parallel input streams" );
59+ final long checkExecutionWaitTime = 1_000L ;
5960 final int numBuffers = 16 ;
6061 final LinkedList <Integer > freeList = new LinkedList <>();
6162 for (int i =0 ; i < numBuffers ; i ++) {
@@ -81,7 +82,6 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception {
8182 }
8283 }
8384 executionCompletion [iteration ] = true ;
84-
8585 return null ;
8686 });
8787 }
@@ -90,7 +90,7 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception {
9090 }
9191
9292 while (!checkIfAllExecutionCompleted (executionCompletion )) {
93- Thread .sleep (1_000L );
93+ Thread .sleep (checkExecutionWaitTime );
9494 }
9595
9696 assertCompletedListContainsSubSetOfCertainSet (bufferManager .getCompletedReadListCopy (), inProgressBuffers );
@@ -130,6 +130,8 @@ public void testPurgeBufferManagerForSequentialStream() throws Exception {
130130 final String fileName = methodName .getMethodName ();
131131 byte [] fileContent = getRandomBytesArray (ONE_MB );
132132 Path testFilePath = createFileWithContent (fs , fileName , fileContent );
133+ Set <ReadBuffer > inProgressBufferSet = new HashSet <>();
134+ ReadBufferManager bufferManager = ReadBufferManager .getBufferManager ();
133135
134136 AbfsInputStream iStream1 = null ;
135137 // stream1 will be closed right away.
@@ -139,30 +141,27 @@ public void testPurgeBufferManagerForSequentialStream() throws Exception {
139141 iStream1 .read ();
140142 } finally {
141143 IOUtils .closeStream (iStream1 );
144+ inProgressBufferSet .addAll (bufferManager .getInProgressCopiedList ());
142145 }
143- ReadBufferManager bufferManager = ReadBufferManager .getBufferManager ();
144146 AbfsInputStream iStream2 = null ;
145147 try {
146148 iStream2 = (AbfsInputStream ) fs .open (testFilePath ).getWrappedStream ();
147149 iStream2 .read ();
148150 // After closing stream1, none of the buffers associated with stream1 should be present.
149- assertListDoesnotContainBuffersForIstream (bufferManager .getInProgressCopiedList (), iStream1 );
150- assertListDoesnotContainBuffersForIstream (bufferManager .getCompletedReadListCopy (), iStream1 );
151151 assertListDoesnotContainBuffersForIstream (bufferManager .getReadAheadQueueCopy (), iStream1 );
152152 } finally {
153153 // closing the stream later.
154154 IOUtils .closeStream (iStream2 );
155+ inProgressBufferSet .addAll (bufferManager .getInProgressCopiedList ());
155156 }
156157 // After closing stream2, none of the buffers associated with stream2 should be present.
157- assertListDoesnotContainBuffersForIstream (bufferManager .getInProgressCopiedList (), iStream2 );
158- assertListDoesnotContainBuffersForIstream (bufferManager .getCompletedReadListCopy (), iStream2 );
159158 assertListDoesnotContainBuffersForIstream (bufferManager .getReadAheadQueueCopy (), iStream2 );
160159
161160 // After closing both the streams, all lists should be empty.
162- assertListEmpty ("CompletedList" , bufferManager .getCompletedReadListCopy ());
163- assertListEmpty ("InProgressList" , bufferManager .getInProgressCopiedList ());
164161 assertListEmpty ("ReadAheadQueue" , bufferManager .getReadAheadQueueCopy ());
165162
163+ assertCompletedListContainsSubSetOfCertainSet (bufferManager .getCompletedReadListCopy (), inProgressBufferSet );
164+
166165 }
167166
168167
0 commit comments