diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuablePagedByItemIterable.java b/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuablePagedByItemIterable.java index 5f27a86fd93e0..049e937698663 100644 --- a/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuablePagedByItemIterable.java +++ b/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuablePagedByItemIterable.java @@ -8,6 +8,7 @@ import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Predicate; /** * Internal class that is a blocking iterable for {@link ContinuablePagedIterable}. @@ -24,17 +25,21 @@ final class ContinuablePagedByItemIterable> implements Iterable { private final PageRetriever pageRetriever; private final C continuationToken; + private final Predicate continuationPredicate; private final Integer preferredPageSize; - ContinuablePagedByItemIterable(PageRetriever pageRetriever, C continuationToken, Integer preferredPageSize) { + ContinuablePagedByItemIterable(PageRetriever pageRetriever, C continuationToken, + Predicate continuationPredicate, Integer preferredPageSize) { this.pageRetriever = pageRetriever; this.continuationToken = continuationToken; + this.continuationPredicate = continuationPredicate; this.preferredPageSize = preferredPageSize; } @Override public Iterator iterator() { - return new ContinuablePagedByItemIterator<>(pageRetriever, continuationToken, preferredPageSize); + return new ContinuablePagedByItemIterator<>(pageRetriever, continuationToken, continuationPredicate, + preferredPageSize); } private static final class ContinuablePagedByItemIterator> @@ -43,8 +48,8 @@ private static final class ContinuablePagedByItemIterator currentPage; ContinuablePagedByItemIterator(PageRetriever pageRetriever, C continuationToken, - Integer preferredPageSize) { - super(pageRetriever, new ContinuationState<>(continuationToken), preferredPageSize, + Predicate continuationPredicate, Integer preferredPageSize) { + super(pageRetriever, new ContinuationState<>(continuationToken, continuationPredicate), preferredPageSize, new ClientLogger(ContinuablePagedByItemIterator.class)); requestPage(); diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuablePagedByPageIterable.java b/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuablePagedByPageIterable.java index 78ace4e5b6593..b1ab8321f3f59 100644 --- a/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuablePagedByPageIterable.java +++ b/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuablePagedByPageIterable.java @@ -8,6 +8,7 @@ import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Predicate; /** * Internal class that is a blocking iterable for {@link ContinuablePagedIterable}. @@ -24,17 +25,21 @@ final class ContinuablePagedByPageIterable> implements Iterable

{ private final PageRetriever pageRetriever; private final C continuationToken; + private final Predicate continuationPredicate; private final Integer preferredPageSize; - ContinuablePagedByPageIterable(PageRetriever pageRetriever, C continuationToken, Integer preferredPageSize) { + ContinuablePagedByPageIterable(PageRetriever pageRetriever, C continuationToken, + Predicate continuationPredicate, Integer preferredPageSize) { this.pageRetriever = pageRetriever; this.continuationToken = continuationToken; + this.continuationPredicate = continuationPredicate; this.preferredPageSize = preferredPageSize; } @Override public Iterator

iterator() { - return new ContinuablePagedByPageIterator<>(pageRetriever, continuationToken, preferredPageSize); + return new ContinuablePagedByPageIterator<>(pageRetriever, continuationToken, continuationPredicate, + preferredPageSize); } private static final class ContinuablePagedByPageIterator> @@ -42,8 +47,8 @@ private static final class ContinuablePagedByPageIterator pages = new ConcurrentLinkedQueue<>(); ContinuablePagedByPageIterator(PageRetriever pageRetriever, C continuationToken, - Integer preferredPageSize) { - super(pageRetriever, new ContinuationState<>(continuationToken), preferredPageSize, + Predicate continuationPredicate, Integer preferredPageSize) { + super(pageRetriever, new ContinuationState<>(continuationToken, continuationPredicate), preferredPageSize, new ClientLogger(ContinuablePagedByPageIterator.class)); requestPage(); diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuablePagedFluxCore.java b/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuablePagedFluxCore.java index e18d0dcaf8ab2..f7b69b94712a4 100644 --- a/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuablePagedFluxCore.java +++ b/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuablePagedFluxCore.java @@ -255,7 +255,7 @@ public void subscribe(CoreSubscriber coreSubscriber) { private Flux

byPage(Supplier> provider, C continuationToken, Integer pageSize) { return Flux.defer(() -> { final PageRetriever pageRetriever = provider.get(); - final ContinuationState state = new ContinuationState<>(continuationToken); + final ContinuationState state = new ContinuationState<>(continuationToken, getContinuationPredicate()); return retrievePages(state, pageRetriever, pageSize); }); } @@ -278,7 +278,7 @@ private Flux

retrievePages(ContinuationState state, PageRetriever pa */ return retrievePage(state, pageRetriever, pageSize) .expand(page -> { - state.setLastContinuationToken(page.getContinuationToken(), t -> !getContinuationPredicate().test(t)); + state.setLastContinuationToken(page.getContinuationToken()); return Flux.defer(() -> retrievePage(state, pageRetriever, pageSize)); }, 4); } diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuablePagedIterable.java b/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuablePagedIterable.java index 76024d9083931..08a18e3151d60 100644 --- a/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuablePagedIterable.java +++ b/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuablePagedIterable.java @@ -20,7 +20,7 @@ * @see IterableStream * @see ContinuablePagedFlux */ -public abstract class ContinuablePagedIterable> extends IterableStream { +public class ContinuablePagedIterable> extends IterableStream { private final ContinuablePagedFlux pagedFlux; private final int batchSize; @@ -172,7 +172,7 @@ private Iterable

iterableByPageInternal(C continuationToken, Integer preferre if (pagedFlux instanceof ContinuablePagedFluxCore) { ContinuablePagedFluxCore pagedFluxCore = (ContinuablePagedFluxCore) pagedFlux; return new ContinuablePagedByPageIterable<>(pagedFluxCore.pageRetrieverProvider.get(), continuationToken, - preferredPageSize); + pagedFluxCore.getContinuationPredicate(), preferredPageSize); } else { return nonPagedFluxCoreIterableSupplier.get(); } @@ -181,7 +181,8 @@ private Iterable

iterableByPageInternal(C continuationToken, Integer preferre private Iterable iterableByItemInternal() { if (pagedFlux instanceof ContinuablePagedFluxCore) { ContinuablePagedFluxCore pagedFluxCore = (ContinuablePagedFluxCore) pagedFlux; - return new ContinuablePagedByItemIterable<>(pagedFluxCore.pageRetrieverProvider.get(), null, null); + return new ContinuablePagedByItemIterable<>(pagedFluxCore.pageRetrieverProvider.get(), null, + pagedFluxCore.getContinuationPredicate(), null); } else { return this.pagedFlux.toIterable(this.batchSize); } diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuationState.java b/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuationState.java index 25f0777d19693..45fcbe6f66226 100644 --- a/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuationState.java +++ b/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuationState.java @@ -11,6 +11,8 @@ * @param The type of the continuation token. */ class ContinuationState { + private final Predicate continuationPredicate; + // The last seen continuation token private C lastContinuationToken; // Indicate whether to call the PageRetrieval Function @@ -20,31 +22,22 @@ class ContinuationState { * Creates ContinuationState. * * @param token An optional continuation token for the beginning state. + * @param continuationPredicate The predicate that tests if paging should continue. */ - ContinuationState(C token) { + ContinuationState(C token, Predicate continuationPredicate) { this.lastContinuationToken = token; + this.continuationPredicate = continuationPredicate; } /** * Store the last seen continuation token. *

- * Determination for continuation being done is checking if the continuation token is null. + * Determining if paging should continue is done by checking the token against the continuation predicate. * * @param token The continuation token. */ void setLastContinuationToken(C token) { - this.isDone = (token == null); - this.lastContinuationToken = token; - } - - /** - * Store the last seen continuation token and apply the predicate to determine if continuation is done. - * - * @param token The continuation token. - * @param isDonePredicate The predicate that tests if continuation is done. - */ - void setLastContinuationToken(C token, Predicate isDonePredicate) { - this.isDone = isDonePredicate.test(token); + this.isDone = !continuationPredicate.test(token); this.lastContinuationToken = token; } diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/http/rest/PagedFluxTest.java b/sdk/core/azure-core/src/test/java/com/azure/core/http/rest/PagedFluxTest.java index 887a30543a3fc..0367729d0dba1 100644 --- a/sdk/core/azure-core/src/test/java/com/azure/core/http/rest/PagedFluxTest.java +++ b/sdk/core/azure-core/src/test/java/com/azure/core/http/rest/PagedFluxTest.java @@ -379,7 +379,7 @@ public > void pagingTerminatesOn(Continuab } @SuppressWarnings("deprecation") - private static Stream pagingTerminatesOnSupplier() { + public static Stream pagingTerminatesOnSupplier() { PageRetriever> pfEndsWithNullPageRetriever = new GetPagesUntil(null); PagedFlux pfEndsWithNull = PagedFlux.create(() -> pfEndsWithNullPageRetriever); diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/http/rest/PagedIterableTest.java b/sdk/core/azure-core/src/test/java/com/azure/core/http/rest/PagedIterableTest.java index c6edd76b0de71..38d77f7881cdf 100644 --- a/sdk/core/azure-core/src/test/java/com/azure/core/http/rest/PagedIterableTest.java +++ b/sdk/core/azure-core/src/test/java/com/azure/core/http/rest/PagedIterableTest.java @@ -6,8 +6,12 @@ import com.azure.core.http.HttpHeaders; import com.azure.core.http.HttpMethod; import com.azure.core.http.HttpRequest; +import com.azure.core.util.paging.ContinuablePage; +import com.azure.core.util.paging.ContinuablePagedFlux; +import com.azure.core.util.paging.ContinuablePagedIterable; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -417,6 +421,55 @@ public void iterateByPageNextOnlyRetrievesOnePage() { assertEquals(1, pageRetriever.getGetCount() - DEFAULT_PAGE_COUNT); } + @ParameterizedTest + @MethodSource("com.azure.core.http.rest.PagedFluxTest#pagingTerminatesOnSupplier") + public > void streamingTerminatesOn(ContinuablePagedFlux pagedFlux, + List expectedItems) { + List actualItems = new ContinuablePagedIterable<>(pagedFlux).stream().collect(Collectors.toList()); + assertEquals(expectedItems.size(), actualItems.size()); + for (int i = 0; i < expectedItems.size(); i++) { + assertEquals(expectedItems.get(i), actualItems.get(i)); + } + } + + @ParameterizedTest + @MethodSource("com.azure.core.http.rest.PagedFluxTest#pagingTerminatesOnSupplier") + public > void iteratingTerminatesOn(ContinuablePagedFlux pagedFlux, + List expectedItems) { + List actualItems = new ArrayList<>(); + new ContinuablePagedIterable<>(pagedFlux).iterator().forEachRemaining(actualItems::add); + assertEquals(expectedItems.size(), actualItems.size()); + for (int i = 0; i < expectedItems.size(); i++) { + assertEquals(expectedItems.get(i), actualItems.get(i)); + } + } + + @ParameterizedTest + @MethodSource("com.azure.core.http.rest.PagedFluxTest#pagingTerminatesOnSupplier") + public > void streamingByPageTerminatesOn( + ContinuablePagedFlux pagedFlux, List expectedItems) { + List actualItems = new ArrayList<>(); + new ContinuablePagedIterable<>(pagedFlux).streamByPage().map(page -> page.getElements()) + .forEach(iterableStream -> iterableStream.forEach(actualItems::add)); + assertEquals(expectedItems.size(), actualItems.size()); + for (int i = 0; i < expectedItems.size(); i++) { + assertEquals(expectedItems.get(i), actualItems.get(i)); + } + } + + @ParameterizedTest + @MethodSource("com.azure.core.http.rest.PagedFluxTest#pagingTerminatesOnSupplier") + public > void iteratingByPageTerminatesOn( + ContinuablePagedFlux pagedFlux, List expectedItems) { + List actualItems = new ArrayList<>(); + new ContinuablePagedIterable<>(pagedFlux).iterableByPage().iterator() + .forEachRemaining(page -> page.getElements().forEach(actualItems::add)); + assertEquals(expectedItems.size(), actualItems.size()); + for (int i = 0; i < expectedItems.size(); i++) { + assertEquals(expectedItems.get(i), actualItems.get(i)); + } + } + private static void sleep() { try { Thread.sleep(500);