2727import java .util .concurrent .ArrayBlockingQueue ;
2828import java .util .concurrent .CompletableFuture ;
2929import java .util .concurrent .TimeUnit ;
30- import javax .activation .UnsupportedDataTypeException ;
3130
3231import org .slf4j .Logger ;
3332import org .slf4j .LoggerFactory ;
@@ -48,7 +47,7 @@ public class AbfsListStatusRemoteIterator
4847
4948 private final FileStatus fileStatus ;
5049 private final ListingSupport listingSupport ;
51- private final ArrayBlockingQueue <Object > iteratorsQueue ;
50+ private final ArrayBlockingQueue <AbfsListResult > listResultQueue ;
5251 private final TracingContext tracingContext ;
5352
5453 private volatile boolean isAsyncInProgress = false ;
@@ -61,7 +60,7 @@ public AbfsListStatusRemoteIterator(final FileStatus fileStatus,
6160 this .fileStatus = fileStatus ;
6261 this .listingSupport = listingSupport ;
6362 this .tracingContext = tracingContext ;
64- iteratorsQueue = new ArrayBlockingQueue <>(MAX_QUEUE_SIZE );
63+ listResultQueue = new ArrayBlockingQueue <>(MAX_QUEUE_SIZE );
6564 currIterator = Collections .emptyIterator ();
6665 fetchBatchesAsync ();
6766 }
@@ -86,19 +85,17 @@ public FileStatus next() throws IOException {
8685 private Iterator <FileStatus > getNextIterator () throws IOException {
8786 fetchBatchesAsync ();
8887 try {
89- Object obj = null ;
90- while (obj == null
91- && (!isIterationComplete || !iteratorsQueue .isEmpty ())) {
92- obj = iteratorsQueue .poll (POLL_WAIT_TIME_IN_MS , TimeUnit .MILLISECONDS );
88+ AbfsListResult listResult = null ;
89+ while (listResult == null
90+ && (!isIterationComplete || !listResultQueue .isEmpty ())) {
91+ listResult = listResultQueue .poll (POLL_WAIT_TIME_IN_MS , TimeUnit .MILLISECONDS );
9392 }
94- if (obj == null ) {
93+ if (listResult == null ) {
9594 return Collections .emptyIterator ();
96- } else if (obj instanceof Iterator ) {
97- return (Iterator <FileStatus >) obj ;
98- } else if (obj instanceof IOException ) {
99- throw (IOException ) obj ;
95+ } else if (listResult .isFailedListing ()) {
96+ throw listResult .getListingException ();
10097 } else {
101- throw new UnsupportedDataTypeException ();
98+ return listResult . getFileStatusIterator ();
10299 }
103100 } catch (InterruptedException e ) {
104101 Thread .currentThread ().interrupt ();
@@ -122,13 +119,13 @@ private void fetchBatchesAsync() {
122119
123120 private void asyncOp () {
124121 try {
125- while (!isIterationComplete && iteratorsQueue .size () <= MAX_QUEUE_SIZE ) {
122+ while (!isIterationComplete && listResultQueue .size () <= MAX_QUEUE_SIZE ) {
126123 addNextBatchIteratorToQueue ();
127124 }
128125 } catch (IOException ioe ) {
129126 LOG .error ("Fetching filestatuses failed" , ioe );
130127 try {
131- iteratorsQueue .put (ioe );
128+ listResultQueue .put (new AbfsListResult ( ioe ) );
132129 } catch (InterruptedException interruptedException ) {
133130 Thread .currentThread ().interrupt ();
134131 LOG .error ("Thread got interrupted: {}" , interruptedException );
@@ -143,19 +140,17 @@ private void asyncOp() {
143140 }
144141 }
145142
146- private void addNextBatchIteratorToQueue ()
143+ private synchronized void addNextBatchIteratorToQueue ()
147144 throws IOException , InterruptedException {
148145 List <FileStatus > fileStatuses = new ArrayList <>();
149146 continuation = listingSupport
150147 .listStatus (fileStatus .getPath (), null , fileStatuses , FETCH_ALL_FALSE ,
151148 continuation , tracingContext );
152149 if (!fileStatuses .isEmpty ()) {
153- iteratorsQueue .put (fileStatuses .iterator ());
150+ listResultQueue .put (new AbfsListResult ( fileStatuses .iterator () ));
154151 }
155- synchronized (this ) {
156- if (continuation == null || continuation .isEmpty ()) {
157- isIterationComplete = true ;
158- }
152+ if (continuation == null || continuation .isEmpty ()) {
153+ isIterationComplete = true ;
159154 }
160155 }
161156
0 commit comments