Skip to content

Commit

Permalink
Pagination (#84)
Browse files Browse the repository at this point in the history
* Pagination

Signed-off-by: Stanislav Knot <sknot@redhat.com>

* suggestions

Signed-off-by: Stanislav Knot <sknot@redhat.com>

* suggestions

Signed-off-by: Stanislav Knot <sknot@redhat.com>

* support deprecated pagination

Signed-off-by: Stanislav Knot <sknot@redhat.com>

* result list size

Signed-off-by: Stanislav Knot <sknot@redhat.com>

* deprecated tests

Signed-off-by: Stanislav Knot <sknot@redhat.com>
  • Loading branch information
sknot-rh authored Jun 21, 2021
1 parent 7453ec6 commit 8923a96
Show file tree
Hide file tree
Showing 6 changed files with 315 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class ConsumerGroupOperations {
protected static final Logger log = LogManager.getLogger(ConsumerGroupOperations.class);
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssz");

public static void getGroupList(KafkaAdminClient ac, Promise prom, Pattern pattern, int offset, final int limit, final String groupIdPrefix, Types.OrderByInput orderByInput) {
public static void getGroupList(KafkaAdminClient ac, Promise prom, Pattern pattern, Types.PageRequest pageRequest, final String groupIdPrefix, Types.OrderByInput orderByInput) {
Promise<List<ConsumerGroupListing>> listConsumerGroupsFuture = Promise.promise();

ac.listConsumerGroups(listConsumerGroupsFuture);
Expand Down Expand Up @@ -91,28 +91,37 @@ public static void getGroupList(KafkaAdminClient ac, Promise prom, Pattern patte
.filter(i -> i != null)
.collect(Collectors.toList());

if (offset > list.size()) {
return Future.failedFuture(new InvalidRequestException("Offset (" + offset + ") cannot be greater than consumer group list size (" + list.size() + ")"));
}

if (Types.SortDirectionEnum.DESC.equals(orderByInput.getOrder())) {
list.sort(new CommonHandler.ConsumerGroupComparator(orderByInput.getField()).reversed());
} else {
list.sort(new CommonHandler.ConsumerGroupComparator(orderByInput.getField()));
}

int tmpLimit = limit;
if (tmpLimit == 0) {
tmpLimit = list.size();
Types.ConsumerGroupList response = new Types.ConsumerGroupList();
List<Types.ConsumerGroupDescription> croppedList;
if (pageRequest.isDeprecatedFormat()) {
if (pageRequest.getOffset() > list.size()) {
return Future.failedFuture(new InvalidRequestException("Offset (" + pageRequest.getOffset() + ") cannot be greater than consumer group list size (" + list.size() + ")"));
}
int tmpLimit = pageRequest.getLimit();
if (tmpLimit == 0) {
tmpLimit = list.size();
}
croppedList = list.subList(pageRequest.getOffset(), Math.min(pageRequest.getOffset() + tmpLimit, list.size()));
response.setLimit(pageRequest.getLimit());
response.setOffset(pageRequest.getOffset());
response.setCount(croppedList.size());
} else {
if (list.size() > 0 && pageRequest.getSize() * (pageRequest.getPage() - 1) >= list.size()) {
return Future.failedFuture(new InvalidRequestException("Requested pagination incorrect. Beginning of list greater than full list size (" + list.size() + ")"));
}
croppedList = list.subList((pageRequest.getPage() - 1) * pageRequest.getSize(), Math.min(pageRequest.getSize() * pageRequest.getPage(), list.size()));
response.setSize(pageRequest.getSize());
response.setPage(pageRequest.getPage());
response.setTotal(list.size());
}

List<Types.ConsumerGroupDescription> croppedList = list.subList(offset, Math.min(offset + tmpLimit, list.size()));

Types.ConsumerGroupList response = new Types.ConsumerGroupList();
response.setItems(croppedList);
response.setCount(croppedList.size());
response.setLimit(tmpLimit);
response.setOffset(offset);

return Future.succeededFuture(response);
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private static Promise<Types.Topic> getTopicDescAndConf(KafkaAdminClient ac, Str
return result;
}

public static void getTopicList(KafkaAdminClient ac, Promise prom, Pattern pattern, int offset, final int limit, Types.OrderByInput orderByInput) {
public static void getTopicList(KafkaAdminClient ac, Promise prom, Pattern pattern, Types.PageRequest pageRequest, Types.OrderByInput orderByInput) {
Promise<Set<String>> describeTopicsNamesPromise = Promise.promise();
Promise<Map<String, io.vertx.kafka.admin.TopicDescription>> describeTopicsPromise = Promise.promise();
Promise<Map<ConfigResource, Config>> describeTopicConfigPromise = Promise.promise();
Expand Down Expand Up @@ -151,21 +151,34 @@ public static void getTopicList(KafkaAdminClient ac, Promise prom, Pattern patte
fullTopicDescriptions.sort(new CommonHandler.TopicComparator(orderByInput.getField()));
}

if (offset > fullTopicDescriptions.size()) {
return Future.failedFuture(new InvalidRequestException("Offset (" + offset + ") cannot be greater than topic list size (" + fullTopicDescriptions.size() + ")"));
}
int tmpLimit = limit;
if (tmpLimit == 0) {
tmpLimit = fullTopicDescriptions.size();
}

List<Types.Topic> croppedList = fullTopicDescriptions.subList(offset, Math.min(offset + tmpLimit, fullTopicDescriptions.size()));

Types.TopicList topicList = new Types.TopicList();
List<Types.Topic> croppedList;
if (pageRequest.isDeprecatedFormat()) {
// deprecated
if (pageRequest.getOffset() > fullTopicDescriptions.size()) {
return Future.failedFuture(new InvalidRequestException("Offset (" + pageRequest.getOffset() + ") cannot be greater than topic list size (" + fullTopicDescriptions.size() + ")"));
}
int tmpLimit = pageRequest.getLimit();
if (tmpLimit == 0) {
tmpLimit = fullTopicDescriptions.size();
}
croppedList = fullTopicDescriptions.subList(pageRequest.getOffset(), Math.min(pageRequest.getOffset() + tmpLimit, fullTopicDescriptions.size()));
topicList.setOffset(pageRequest.getOffset());
topicList.setLimit(pageRequest.getLimit());
topicList.setCount(croppedList.size());
} else {
if (fullTopicDescriptions.size() > 0 && (pageRequest.getPage() - 1) * pageRequest.getSize() >= fullTopicDescriptions.size()) {
return Future.failedFuture(new InvalidRequestException("Requested pagination incorrect. Beginning of list greater than full list size (" + fullTopicDescriptions.size() + ")"));
}
croppedList = fullTopicDescriptions.subList((pageRequest.getPage() - 1) * pageRequest.getSize(), Math.min(pageRequest.getPage() * pageRequest.getSize(), fullTopicDescriptions.size()));
topicList.setPage(pageRequest.getPage());
topicList.setSize(pageRequest.getSize());
topicList.setTotal(fullTopicDescriptions.size());
}

topicList.setItems(croppedList);
topicList.setCount(croppedList.size());
topicList.setLimit(tmpLimit);
topicList.setOffset(offset);

return Future.succeededFuture(topicList);
}).onComplete(finalRes -> {
if (finalRes.failed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,6 @@ public void listTopics(RoutingContext routingContext) {
httpMetrics.getListTopicsCounter().increment();
httpMetrics.getRequestsCounter().increment();
String filter = routingContext.queryParams().get("filter");
String limit = routingContext.queryParams().get("limit") == null ? "0" : routingContext.queryParams().get("limit");
String offset = routingContext.queryParams().get("offset") == null ? "0" : routingContext.queryParams().get("offset");
Types.SortDirectionEnum sortReverse = Types.SortDirectionEnum.fromString(routingContext.queryParams().get("order"));
String sortKey = routingContext.queryParams().get("orderKey") == null ? "name" : routingContext.queryParams().get("orderKey");
Types.OrderByInput orderBy = new Types.OrderByInput();
Expand All @@ -235,13 +233,11 @@ public void listTopics(RoutingContext routingContext) {
prom.fail(ac.cause());
} else {
try {
if (Integer.parseInt(offset) < 0 || Integer.parseInt(limit) < 0) {
throw new InvalidRequestException("Offset and limit have to be positive integers.");
}
TopicOperations.getTopicList(ac.result(), prom, pattern, Integer.parseInt(offset), Integer.parseInt(limit), orderBy);
TopicOperations.getTopicList(ac.result(), prom, pattern, parsePageRequest(routingContext), orderBy);
} catch (NumberFormatException | InvalidRequestException e) {
prom.fail(e);
processResponse(prom, routingContext, HttpResponseStatus.BAD_REQUEST, httpMetrics, httpMetrics.getListTopicRequestTimer(), requestTimerSample);
return;
}
}
processResponse(prom, routingContext, HttpResponseStatus.OK, httpMetrics, httpMetrics.getListTopicRequestTimer(), requestTimerSample);
Expand All @@ -256,8 +252,6 @@ public void listGroups(RoutingContext routingContext) {
httpMetrics.getRequestsCounter().increment();
String topicFilter = routingContext.queryParams().get("topic");
String consumerGroupIdFilter = routingContext.queryParams().get("group-id-filter") == null ? "" : routingContext.queryParams().get("group-id-filter");
String limit = routingContext.queryParams().get("limit") == null ? "0" : routingContext.queryParams().get("limit");
String offset = routingContext.queryParams().get("offset") == null ? "0" : routingContext.queryParams().get("offset");

Types.SortDirectionEnum sortReverse = Types.SortDirectionEnum.fromString(routingContext.queryParams().get("order"));
String sortKey = routingContext.queryParams().get("orderKey") == null ? "name" : routingContext.queryParams().get("orderKey");
Expand All @@ -278,13 +272,11 @@ public void listGroups(RoutingContext routingContext) {
prom.fail(ac.cause());
} else {
try {
if (Integer.parseInt(offset) < 0 || Integer.parseInt(limit) < 0) {
throw new InvalidRequestException("Offset and limit have to be positive integers.");
}
ConsumerGroupOperations.getGroupList(ac.result(), prom, pattern, Integer.parseInt(offset), Integer.parseInt(limit), consumerGroupIdFilter, orderBy);
ConsumerGroupOperations.getGroupList(ac.result(), prom, pattern, parsePageRequest(routingContext), consumerGroupIdFilter, orderBy);
} catch (NumberFormatException | InvalidRequestException e) {
prom.fail(e);
processResponse(prom, routingContext, HttpResponseStatus.BAD_REQUEST, httpMetrics, httpMetrics.getListGroupsRequestTimer(), requestTimerSample);
return;
}
}
processResponse(prom, routingContext, HttpResponseStatus.OK, httpMetrics, httpMetrics.getListGroupsRequestTimer(), requestTimerSample);
Expand Down Expand Up @@ -431,4 +423,37 @@ public void errorHandler(RoutingContext routingContext) {
prom.fail(routingContext.failure());
processResponse(prom, routingContext, HttpResponseStatus.OK, httpMetrics, httpMetrics.getOpenApiRequestTimer(), requestTimerSample);
}

private Types.PageRequest parsePageRequest(RoutingContext routingContext) {
Types.PageRequest pageRequest = new Types.PageRequest();

boolean deprecatedPaginationUsed = false;
if (routingContext.queryParams().get("offset") != null || routingContext.queryParams().get("limit") != null) {
deprecatedPaginationUsed = true;
}
pageRequest.setDeprecatedFormat(deprecatedPaginationUsed);

if (deprecatedPaginationUsed) {
String offset = routingContext.queryParams().get("offset") == null ? "0" : routingContext.queryParams().get("offset");
String limit = routingContext.queryParams().get("limit") == null ? "10" : routingContext.queryParams().get("limit");
int offsetInt = Integer.parseInt(offset);
int limitInt = Integer.parseInt(limit);
pageRequest.setOffset(offsetInt);
pageRequest.setLimit(limitInt);
} else {
String size = routingContext.queryParams().get("size") == null ? "10" : routingContext.queryParams().get("size");
String page = routingContext.queryParams().get("page") == null ? "1" : routingContext.queryParams().get("page");

int pageInt = Integer.parseInt(page);
int sizeInt = Integer.parseInt(size);
pageRequest.setPage(pageInt);
pageRequest.setSize(sizeInt);

if (sizeInt < 1 || pageInt < 1) {
throw new InvalidRequestException("Size and page have to be positive integers.");
}
}

return pageRequest;
}
}
129 changes: 81 additions & 48 deletions kafka-admin/src/main/java/org/bf2/admin/kafka/admin/model/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,45 +123,6 @@ public int compareTo(Topic topic) {
}
}

public static class TopicList {
private List<Topic> items;
private Integer offset;
private Integer limit;
private Integer count;

public List<Topic> getItems() {
return items;
}

public void setItems(List<Topic> items) {
this.items = items;
}

public Integer getOffset() {
return offset;
}

public void setOffset(Integer offset) {
this.offset = offset;
}

public Integer getLimit() {
return limit;
}

public void setLimit(Integer limit) {
this.limit = limit;
}

public Integer getCount() {
return count;
}

public void setCount(Integer count) {
this.count = count;
}
}

public static class NewTopicConfigEntry {
private String key;
private String value;
Expand Down Expand Up @@ -351,15 +312,37 @@ public void setConfig(List<NewTopicConfigEntry> config) {
}

public static class PageRequest {
private Integer limit;
private boolean deprecatedFormat;
private Integer page;
private Integer size;

@Deprecated
private Integer offset;
@Deprecated
private Integer limit;

public Integer getLimit() {
return limit;
public boolean isDeprecatedFormat() {
return deprecatedFormat;
}

public void setLimit(Integer limit) {
this.limit = limit;
public void setDeprecatedFormat(boolean deprecatedFormat) {
this.deprecatedFormat = deprecatedFormat;
}

public Integer getPage() {
return page;
}

public void setPage(Integer page) {
this.page = page;
}

public Integer getSize() {
return size;
}

public void setSize(Integer size) {
this.size = size;
}

public Integer getOffset() {
Expand All @@ -369,6 +352,14 @@ public Integer getOffset() {
public void setOffset(Integer offset) {
this.offset = offset;
}

public Integer getLimit() {
return limit;
}

public void setLimit(Integer limit) {
this.limit = limit;
}
}
public enum SortDirectionEnum {
DESC,
Expand Down Expand Up @@ -439,38 +430,80 @@ public void setState(String state) {
}
}

public static class ConsumerGroupList {
private List<ConsumerGroupDescription> items;
public static class PagedResponse<T> {
private List<T> items;
private Integer size;
private Integer page;
private Integer total;
// deprecated
private Integer offset;
private Integer limit;
private Integer count;

public List<ConsumerGroupDescription> getItems() {
public List<T> getItems() {
return items;
}
public void setItems(List<ConsumerGroupDescription> items) {
public void setItems(List<T> items) {
this.items = items;
}

public Integer getSize() {
return size;
}

public void setSize(Integer size) {
this.size = size;
}

public Integer getPage() {
return page;
}

public void setPage(Integer page) {
this.page = page;
}

public Integer getTotal() {
return total;
}

public void setTotal(Integer total) {
this.total = total;
}

//deprecated

public Integer getOffset() {
return offset;
}

public void setOffset(Integer offset) {
this.offset = offset;
}

public Integer getLimit() {
return limit;
}

public void setLimit(Integer limit) {
this.limit = limit;
}

public Integer getCount() {
return count;
}

public void setCount(Integer count) {
this.count = count;
}
}

public static class ConsumerGroupList extends PagedResponse<ConsumerGroupDescription> {
}

public static class TopicList extends PagedResponse<Topic> {
}

public static class Consumer {
private String memberId;
private String groupId;
Expand Down
Loading

0 comments on commit 8923a96

Please sign in to comment.