Skip to content

Commit

Permalink
Share Paging Termination Between PagedFlux and PagedIterable (#26139)
Browse files Browse the repository at this point in the history
Update PagedIterable to Use Continuation Predicate in PagedFlux
  • Loading branch information
alzimmermsft authored Dec 21, 2021
1 parent 176a985 commit 6b0c426
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Predicate;

/**
* Internal class that is a blocking iterable for {@link ContinuablePagedIterable}.
Expand All @@ -24,17 +25,21 @@
final class ContinuablePagedByItemIterable<C, T, P extends ContinuablePage<C, T>> implements Iterable<T> {
private final PageRetriever<C, P> pageRetriever;
private final C continuationToken;
private final Predicate<C> continuationPredicate;
private final Integer preferredPageSize;

ContinuablePagedByItemIterable(PageRetriever<C, P> pageRetriever, C continuationToken, Integer preferredPageSize) {
ContinuablePagedByItemIterable(PageRetriever<C, P> pageRetriever, C continuationToken,
Predicate<C> continuationPredicate, Integer preferredPageSize) {
this.pageRetriever = pageRetriever;
this.continuationToken = continuationToken;
this.continuationPredicate = continuationPredicate;
this.preferredPageSize = preferredPageSize;
}

@Override
public Iterator<T> iterator() {
return new ContinuablePagedByItemIterator<>(pageRetriever, continuationToken, preferredPageSize);
return new ContinuablePagedByItemIterator<>(pageRetriever, continuationToken, continuationPredicate,
preferredPageSize);
}

private static final class ContinuablePagedByItemIterator<C, T, P extends ContinuablePage<C, T>>
Expand All @@ -43,8 +48,8 @@ private static final class ContinuablePagedByItemIterator<C, T, P extends Contin
private volatile Iterator<T> currentPage;

ContinuablePagedByItemIterator(PageRetriever<C, P> pageRetriever, C continuationToken,
Integer preferredPageSize) {
super(pageRetriever, new ContinuationState<>(continuationToken), preferredPageSize,
Predicate<C> continuationPredicate, Integer preferredPageSize) {
super(pageRetriever, new ContinuationState<>(continuationToken, continuationPredicate), preferredPageSize,
new ClientLogger(ContinuablePagedByItemIterator.class));

requestPage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Predicate;

/**
* Internal class that is a blocking iterable for {@link ContinuablePagedIterable}.
Expand All @@ -24,26 +25,30 @@
final class ContinuablePagedByPageIterable<C, T, P extends ContinuablePage<C, T>> implements Iterable<P> {
private final PageRetriever<C, P> pageRetriever;
private final C continuationToken;
private final Predicate<C> continuationPredicate;
private final Integer preferredPageSize;

ContinuablePagedByPageIterable(PageRetriever<C, P> pageRetriever, C continuationToken, Integer preferredPageSize) {
ContinuablePagedByPageIterable(PageRetriever<C, P> pageRetriever, C continuationToken,
Predicate<C> continuationPredicate, Integer preferredPageSize) {
this.pageRetriever = pageRetriever;
this.continuationToken = continuationToken;
this.continuationPredicate = continuationPredicate;
this.preferredPageSize = preferredPageSize;
}

@Override
public Iterator<P> iterator() {
return new ContinuablePagedByPageIterator<>(pageRetriever, continuationToken, preferredPageSize);
return new ContinuablePagedByPageIterator<>(pageRetriever, continuationToken, continuationPredicate,
preferredPageSize);
}

private static final class ContinuablePagedByPageIterator<C, T, P extends ContinuablePage<C, T>>
extends ContinuablePagedByIteratorBase<C, T, P, P> {
private volatile Queue<P> pages = new ConcurrentLinkedQueue<>();

ContinuablePagedByPageIterator(PageRetriever<C, P> pageRetriever, C continuationToken,
Integer preferredPageSize) {
super(pageRetriever, new ContinuationState<>(continuationToken), preferredPageSize,
Predicate<C> continuationPredicate, Integer preferredPageSize) {
super(pageRetriever, new ContinuationState<>(continuationToken, continuationPredicate), preferredPageSize,
new ClientLogger(ContinuablePagedByPageIterator.class));

requestPage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public void subscribe(CoreSubscriber<? super T> coreSubscriber) {
private Flux<P> byPage(Supplier<PageRetriever<C, P>> provider, C continuationToken, Integer pageSize) {
return Flux.defer(() -> {
final PageRetriever<C, P> pageRetriever = provider.get();
final ContinuationState<C> state = new ContinuationState<>(continuationToken);
final ContinuationState<C> state = new ContinuationState<>(continuationToken, getContinuationPredicate());
return retrievePages(state, pageRetriever, pageSize);
});
}
Expand All @@ -278,7 +278,7 @@ private Flux<P> retrievePages(ContinuationState<C> state, PageRetriever<C, P> pa
*/
return retrievePage(state, pageRetriever, pageSize)
.expand(page -> {
state.setLastContinuationToken(page.getContinuationToken(), t -> !getContinuationPredicate().test(t));
state.setLastContinuationToken(page.getContinuationToken());
return Flux.defer(() -> retrievePage(state, pageRetriever, pageSize));
}, 4);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
* @see IterableStream
* @see ContinuablePagedFlux
*/
public abstract class ContinuablePagedIterable<C, T, P extends ContinuablePage<C, T>> extends IterableStream<T> {
public class ContinuablePagedIterable<C, T, P extends ContinuablePage<C, T>> extends IterableStream<T> {
private final ContinuablePagedFlux<C, T, P> pagedFlux;
private final int batchSize;

Expand Down Expand Up @@ -172,7 +172,7 @@ private Iterable<P> iterableByPageInternal(C continuationToken, Integer preferre
if (pagedFlux instanceof ContinuablePagedFluxCore) {
ContinuablePagedFluxCore<C, T, P> pagedFluxCore = (ContinuablePagedFluxCore<C, T, P>) pagedFlux;
return new ContinuablePagedByPageIterable<>(pagedFluxCore.pageRetrieverProvider.get(), continuationToken,
preferredPageSize);
pagedFluxCore.getContinuationPredicate(), preferredPageSize);
} else {
return nonPagedFluxCoreIterableSupplier.get();
}
Expand All @@ -181,7 +181,8 @@ private Iterable<P> iterableByPageInternal(C continuationToken, Integer preferre
private Iterable<T> iterableByItemInternal() {
if (pagedFlux instanceof ContinuablePagedFluxCore) {
ContinuablePagedFluxCore<C, T, P> pagedFluxCore = (ContinuablePagedFluxCore<C, T, P>) pagedFlux;
return new ContinuablePagedByItemIterable<>(pagedFluxCore.pageRetrieverProvider.get(), null, null);
return new ContinuablePagedByItemIterable<>(pagedFluxCore.pageRetrieverProvider.get(), null,
pagedFluxCore.getContinuationPredicate(), null);
} else {
return this.pagedFlux.toIterable(this.batchSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
* @param <C> The type of the continuation token.
*/
class ContinuationState<C> {
private final Predicate<C> continuationPredicate;

// The last seen continuation token
private C lastContinuationToken;
// Indicate whether to call the PageRetrieval Function
Expand All @@ -20,31 +22,22 @@ class ContinuationState<C> {
* Creates ContinuationState.
*
* @param token An optional continuation token for the beginning state.
* @param continuationPredicate The predicate that tests if paging should continue.
*/
ContinuationState(C token) {
ContinuationState(C token, Predicate<C> continuationPredicate) {
this.lastContinuationToken = token;
this.continuationPredicate = continuationPredicate;
}

/**
* Store the last seen continuation token.
* <p>
* Determination for continuation being done is checking if the continuation token is null.
* Determining if paging should continue is done by checking the token against the continuation predicate.
*
* @param token The continuation token.
*/
void setLastContinuationToken(C token) {
this.isDone = (token == null);
this.lastContinuationToken = token;
}

/**
* Store the last seen continuation token and apply the predicate to determine if continuation is done.
*
* @param token The continuation token.
* @param isDonePredicate The predicate that tests if continuation is done.
*/
void setLastContinuationToken(C token, Predicate<C> isDonePredicate) {
this.isDone = isDonePredicate.test(token);
this.isDone = !continuationPredicate.test(token);
this.lastContinuationToken = token;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ public <C, T, P extends ContinuablePage<C, T>> void pagingTerminatesOn(Continuab
}

@SuppressWarnings("deprecation")
private static Stream<Arguments> pagingTerminatesOnSupplier() {
public static Stream<Arguments> pagingTerminatesOnSupplier() {
PageRetriever<String, PagedResponse<String>> pfEndsWithNullPageRetriever = new GetPagesUntil(null);
PagedFlux<String> pfEndsWithNull = PagedFlux.create(() -> pfEndsWithNullPageRetriever);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
import com.azure.core.http.HttpHeaders;
import com.azure.core.http.HttpMethod;
import com.azure.core.http.HttpRequest;
import com.azure.core.util.paging.ContinuablePage;
import com.azure.core.util.paging.ContinuablePagedFlux;
import com.azure.core.util.paging.ContinuablePagedIterable;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -417,6 +421,55 @@ public void iterateByPageNextOnlyRetrievesOnePage() {
assertEquals(1, pageRetriever.getGetCount() - DEFAULT_PAGE_COUNT);
}

@ParameterizedTest
@MethodSource("com.azure.core.http.rest.PagedFluxTest#pagingTerminatesOnSupplier")
public <C, T, P extends ContinuablePage<C, T>> void streamingTerminatesOn(ContinuablePagedFlux<C, T, P> pagedFlux,
List<T> expectedItems) {
List<T> actualItems = new ContinuablePagedIterable<>(pagedFlux).stream().collect(Collectors.toList());
assertEquals(expectedItems.size(), actualItems.size());
for (int i = 0; i < expectedItems.size(); i++) {
assertEquals(expectedItems.get(i), actualItems.get(i));
}
}

@ParameterizedTest
@MethodSource("com.azure.core.http.rest.PagedFluxTest#pagingTerminatesOnSupplier")
public <C, T, P extends ContinuablePage<C, T>> void iteratingTerminatesOn(ContinuablePagedFlux<C, T, P> pagedFlux,
List<T> expectedItems) {
List<T> actualItems = new ArrayList<>();
new ContinuablePagedIterable<>(pagedFlux).iterator().forEachRemaining(actualItems::add);
assertEquals(expectedItems.size(), actualItems.size());
for (int i = 0; i < expectedItems.size(); i++) {
assertEquals(expectedItems.get(i), actualItems.get(i));
}
}

@ParameterizedTest
@MethodSource("com.azure.core.http.rest.PagedFluxTest#pagingTerminatesOnSupplier")
public <C, T, P extends ContinuablePage<C, T>> void streamingByPageTerminatesOn(
ContinuablePagedFlux<C, T, P> pagedFlux, List<T> expectedItems) {
List<T> actualItems = new ArrayList<>();
new ContinuablePagedIterable<>(pagedFlux).streamByPage().map(page -> page.getElements())
.forEach(iterableStream -> iterableStream.forEach(actualItems::add));
assertEquals(expectedItems.size(), actualItems.size());
for (int i = 0; i < expectedItems.size(); i++) {
assertEquals(expectedItems.get(i), actualItems.get(i));
}
}

@ParameterizedTest
@MethodSource("com.azure.core.http.rest.PagedFluxTest#pagingTerminatesOnSupplier")
public <C, T, P extends ContinuablePage<C, T>> void iteratingByPageTerminatesOn(
ContinuablePagedFlux<C, T, P> pagedFlux, List<T> expectedItems) {
List<T> actualItems = new ArrayList<>();
new ContinuablePagedIterable<>(pagedFlux).iterableByPage().iterator()
.forEachRemaining(page -> page.getElements().forEach(actualItems::add));
assertEquals(expectedItems.size(), actualItems.size());
for (int i = 0; i < expectedItems.size(); i++) {
assertEquals(expectedItems.get(i), actualItems.get(i));
}
}

private static void sleep() {
try {
Thread.sleep(500);
Expand Down

0 comments on commit 6b0c426

Please sign in to comment.