Skip to content

Commit

Permalink
flowctl: Keep track of next page offset considering that responses ma…
Browse files Browse the repository at this point in the history
…y sometimes include more than `page_size` rows
  • Loading branch information
jshearer committed Sep 18, 2024
1 parent 9d65aa5 commit bc73e19
Showing 1 changed file with 19 additions and 15 deletions.
34 changes: 19 additions & 15 deletions crates/flowctl/src/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::api_exec;
/// it's currently on. Used in [`page_turner::PageTurner`] to implement pagination.
pub struct PaginationRequest {
builder: postgrest::Builder,
page: usize,
next_offset: usize,
page_size: usize,
/// The original Builder's range value, if set. This is used to make sure we
/// don't paginate beyond the requested range
Expand Down Expand Up @@ -51,17 +51,19 @@ impl PaginationRequest {

Self {
builder,
page: 0,
next_offset: 0,
page_size: 1000,
range,
}
.set_page(0)
.set_next_offset(0)
}

fn set_page(mut self, page: usize) -> Self {
self.page = page;
fn set_next_offset(mut self, next_offset: usize) -> Self {
self.next_offset = next_offset;

let (lower_page, upper_page) = ((page * self.page_size), ((page + 1) * self.page_size));
// PostgREST ranges are 0-based, and inclusive on both ends.
// So a request for range 0-19 would return the first 20 rows.
let (lower_page, upper_page) = ((next_offset), (next_offset + self.page_size - 1));

match self.range {
Some((offset, limit)) => {
Expand Down Expand Up @@ -120,27 +122,29 @@ where
request: PaginationRequest,
) -> PageTurnerOutput<Self, PaginationRequest> {
let resp: Vec<Item> = api_exec::<Vec<Item>>(request.builder.clone()).await?;
let response_length = resp.len();

// Sometimes, it seems, we can get back more than the requested page size.
// So far I've only seen this on a request of 1,000 and a response of 1,001.
if resp.len() >= request.page_size
if response_length >= request.page_size
// If the original builder had a limit set to the same value as page_size
// this ensures that we stop right at the limit, instead of issuing an extra
// request for 0 rows.
&& request.range.map_or(true, |(_, limit)| {
((request.page + 1) * request.page_size) < limit
(request.next_offset + response_length) < limit
})
{
let current_page = request.page;
let current_offset = request.next_offset;
tracing::debug!(
?current_page,
row_count = resp.len(),
?current_offset,
row_count = response_length,
"Got back a full response, progressing to the next page"
);
Ok(TurnedPage::next(resp, request.set_page(current_page + 1)))
Ok(TurnedPage::next(
resp,
request.set_next_offset(current_offset + response_length),
))
} else {
tracing::debug!(
current_page = request.page,
current_page = request.next_offset,
row_count = resp.len(),
"Got back a non-full response or we reached the builder's original limit so we're done"
);
Expand Down

0 comments on commit bc73e19

Please sign in to comment.