Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pagination #84

Merged
merged 6 commits into from
Jun 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class ConsumerGroupOperations {
protected static final Logger log = LogManager.getLogger(ConsumerGroupOperations.class);


public static void getGroupList(KafkaAdminClient ac, Promise prom, Pattern pattern, int offset, final int limit, final String groupIdPrefix, Types.OrderByInput orderByInput) {
public static void getGroupList(KafkaAdminClient ac, Promise prom, Pattern pattern, Types.PageRequest pageRequest, final String groupIdPrefix, Types.OrderByInput orderByInput) {
Promise<List<ConsumerGroupListing>> listConsumerGroupsFuture = Promise.promise();

ac.listConsumerGroups(listConsumerGroupsFuture);
Expand Down Expand Up @@ -87,28 +87,37 @@ public static void getGroupList(KafkaAdminClient ac, Promise prom, Pattern patte
.filter(i -> i != null)
.collect(Collectors.toList());

if (offset > list.size()) {
return Future.failedFuture(new InvalidRequestException("Offset (" + offset + ") cannot be greater than consumer group list size (" + list.size() + ")"));
}

if (Types.SortDirectionEnum.DESC.equals(orderByInput.getOrder())) {
list.sort(new CommonHandler.ConsumerGroupComparator(orderByInput.getField()).reversed());
} else {
list.sort(new CommonHandler.ConsumerGroupComparator(orderByInput.getField()));
}

int tmpLimit = limit;
if (tmpLimit == 0) {
tmpLimit = list.size();
Types.ConsumerGroupList response = new Types.ConsumerGroupList();
List<Types.ConsumerGroupDescription> croppedList;
if (pageRequest.isDeprecatedFormat()) {
if (pageRequest.getOffset() > list.size()) {
return Future.failedFuture(new InvalidRequestException("Offset (" + pageRequest.getOffset() + ") cannot be greater than consumer group list size (" + list.size() + ")"));
}
int tmpLimit = pageRequest.getLimit();
if (tmpLimit == 0) {
tmpLimit = list.size();
}
croppedList = list.subList(pageRequest.getOffset(), Math.min(pageRequest.getOffset() + tmpLimit, list.size()));
response.setLimit(pageRequest.getLimit());
response.setOffset(pageRequest.getOffset());
response.setCount(croppedList.size());
} else {
if (list.size() > 0 && pageRequest.getSize() * (pageRequest.getPage() - 1) >= list.size()) {
return Future.failedFuture(new InvalidRequestException("Requested pagination incorrect. Beginning of list greater than full list size (" + list.size() + ")"));
}
croppedList = list.subList((pageRequest.getPage() - 1) * pageRequest.getSize(), Math.min(pageRequest.getSize() * pageRequest.getPage(), list.size()));
response.setSize(pageRequest.getSize());
response.setPage(pageRequest.getPage());
response.setTotal(list.size());
}

List<Types.ConsumerGroupDescription> croppedList = list.subList(offset, Math.min(offset + tmpLimit, list.size()));

Types.ConsumerGroupList response = new Types.ConsumerGroupList();
response.setItems(croppedList);
response.setCount(croppedList.size());
response.setLimit(tmpLimit);
response.setOffset(offset);

return Future.succeededFuture(response);
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private static Promise<Types.Topic> getTopicDescAndConf(KafkaAdminClient ac, Str
return result;
}

public static void getTopicList(KafkaAdminClient ac, Promise prom, Pattern pattern, int offset, final int limit, Types.OrderByInput orderByInput) {
public static void getTopicList(KafkaAdminClient ac, Promise prom, Pattern pattern, Types.PageRequest pageRequest, Types.OrderByInput orderByInput) {
Promise<Set<String>> describeTopicsNamesPromise = Promise.promise();
Promise<Map<String, io.vertx.kafka.admin.TopicDescription>> describeTopicsPromise = Promise.promise();
Promise<Map<ConfigResource, Config>> describeTopicConfigPromise = Promise.promise();
Expand Down Expand Up @@ -151,21 +151,34 @@ public static void getTopicList(KafkaAdminClient ac, Promise prom, Pattern patte
fullTopicDescriptions.sort(new CommonHandler.TopicComparator(orderByInput.getField()));
}

if (offset > fullTopicDescriptions.size()) {
return Future.failedFuture(new InvalidRequestException("Offset (" + offset + ") cannot be greater than topic list size (" + fullTopicDescriptions.size() + ")"));
}
int tmpLimit = limit;
if (tmpLimit == 0) {
tmpLimit = fullTopicDescriptions.size();
}

List<Types.Topic> croppedList = fullTopicDescriptions.subList(offset, Math.min(offset + tmpLimit, fullTopicDescriptions.size()));

Types.TopicList topicList = new Types.TopicList();
List<Types.Topic> croppedList;
if (pageRequest.isDeprecatedFormat()) {
// deprecated
if (pageRequest.getOffset() > fullTopicDescriptions.size()) {
return Future.failedFuture(new InvalidRequestException("Offset (" + pageRequest.getOffset() + ") cannot be greater than topic list size (" + fullTopicDescriptions.size() + ")"));
}
int tmpLimit = pageRequest.getLimit();
if (tmpLimit == 0) {
tmpLimit = fullTopicDescriptions.size();
}
croppedList = fullTopicDescriptions.subList(pageRequest.getOffset(), Math.min(pageRequest.getOffset() + tmpLimit, fullTopicDescriptions.size()));
topicList.setOffset(pageRequest.getOffset());
topicList.setLimit(pageRequest.getLimit());
topicList.setCount(croppedList.size());
} else {
if (fullTopicDescriptions.size() > 0 && (pageRequest.getPage() - 1) * pageRequest.getSize() >= fullTopicDescriptions.size()) {
return Future.failedFuture(new InvalidRequestException("Requested pagination incorrect. Beginning of list greater than full list size (" + fullTopicDescriptions.size() + ")"));
}
croppedList = fullTopicDescriptions.subList((pageRequest.getPage() - 1) * pageRequest.getSize(), Math.min(pageRequest.getPage() * pageRequest.getSize(), fullTopicDescriptions.size()));
topicList.setPage(pageRequest.getPage());
topicList.setSize(pageRequest.getSize());
topicList.setTotal(fullTopicDescriptions.size());
}

topicList.setItems(croppedList);
topicList.setCount(croppedList.size());
topicList.setLimit(tmpLimit);
topicList.setOffset(offset);

return Future.succeededFuture(topicList);
}).onComplete(finalRes -> {
if (finalRes.failed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,6 @@ public void listTopics(RoutingContext routingContext) {
httpMetrics.getListTopicsCounter().increment();
httpMetrics.getRequestsCounter().increment();
String filter = routingContext.queryParams().get("filter");
String limit = routingContext.queryParams().get("limit") == null ? "0" : routingContext.queryParams().get("limit");
String offset = routingContext.queryParams().get("offset") == null ? "0" : routingContext.queryParams().get("offset");
Types.SortDirectionEnum sortReverse = Types.SortDirectionEnum.fromString(routingContext.queryParams().get("order"));
String sortKey = routingContext.queryParams().get("orderKey") == null ? "name" : routingContext.queryParams().get("orderKey");
Types.OrderByInput orderBy = new Types.OrderByInput();
Expand All @@ -235,13 +233,11 @@ public void listTopics(RoutingContext routingContext) {
prom.fail(ac.cause());
} else {
try {
if (Integer.parseInt(offset) < 0 || Integer.parseInt(limit) < 0) {
throw new InvalidRequestException("Offset and limit have to be positive integers.");
}
TopicOperations.getTopicList(ac.result(), prom, pattern, Integer.parseInt(offset), Integer.parseInt(limit), orderBy);
TopicOperations.getTopicList(ac.result(), prom, pattern, parsePageRequest(routingContext), orderBy);
} catch (NumberFormatException | InvalidRequestException e) {
prom.fail(e);
processResponse(prom, routingContext, HttpResponseStatus.BAD_REQUEST, httpMetrics, httpMetrics.getListTopicRequestTimer(), requestTimerSample);
return;
}
}
processResponse(prom, routingContext, HttpResponseStatus.OK, httpMetrics, httpMetrics.getListTopicRequestTimer(), requestTimerSample);
Expand All @@ -256,8 +252,6 @@ public void listGroups(RoutingContext routingContext) {
httpMetrics.getRequestsCounter().increment();
String topicFilter = routingContext.queryParams().get("topic");
String consumerGroupIdFilter = routingContext.queryParams().get("group-id-filter") == null ? "" : routingContext.queryParams().get("group-id-filter");
String limit = routingContext.queryParams().get("limit") == null ? "0" : routingContext.queryParams().get("limit");
String offset = routingContext.queryParams().get("offset") == null ? "0" : routingContext.queryParams().get("offset");

Types.SortDirectionEnum sortReverse = Types.SortDirectionEnum.fromString(routingContext.queryParams().get("order"));
String sortKey = routingContext.queryParams().get("orderKey") == null ? "name" : routingContext.queryParams().get("orderKey");
Expand All @@ -278,13 +272,11 @@ public void listGroups(RoutingContext routingContext) {
prom.fail(ac.cause());
} else {
try {
if (Integer.parseInt(offset) < 0 || Integer.parseInt(limit) < 0) {
throw new InvalidRequestException("Offset and limit have to be positive integers.");
}
ConsumerGroupOperations.getGroupList(ac.result(), prom, pattern, Integer.parseInt(offset), Integer.parseInt(limit), consumerGroupIdFilter, orderBy);
ConsumerGroupOperations.getGroupList(ac.result(), prom, pattern, parsePageRequest(routingContext), consumerGroupIdFilter, orderBy);
} catch (NumberFormatException | InvalidRequestException e) {
prom.fail(e);
processResponse(prom, routingContext, HttpResponseStatus.BAD_REQUEST, httpMetrics, httpMetrics.getListGroupsRequestTimer(), requestTimerSample);
return;
}
}
processResponse(prom, routingContext, HttpResponseStatus.OK, httpMetrics, httpMetrics.getListGroupsRequestTimer(), requestTimerSample);
Expand Down Expand Up @@ -414,4 +406,37 @@ public void errorHandler(RoutingContext routingContext) {
prom.fail(routingContext.failure());
processResponse(prom, routingContext, HttpResponseStatus.OK, httpMetrics, httpMetrics.getOpenApiRequestTimer(), requestTimerSample);
}

private Types.PageRequest parsePageRequest(RoutingContext routingContext) {
Types.PageRequest pageRequest = new Types.PageRequest();

boolean deprecatedPaginationUsed = false;
if (routingContext.queryParams().get("offset") != null || routingContext.queryParams().get("limit") != null) {
deprecatedPaginationUsed = true;
}
pageRequest.setDeprecatedFormat(deprecatedPaginationUsed);

if (deprecatedPaginationUsed) {
String offset = routingContext.queryParams().get("offset") == null ? "0" : routingContext.queryParams().get("offset");
String limit = routingContext.queryParams().get("limit") == null ? "10" : routingContext.queryParams().get("limit");
int offsetInt = Integer.parseInt(offset);
int limitInt = Integer.parseInt(limit);
pageRequest.setOffset(offsetInt);
pageRequest.setLimit(limitInt);
} else {
String size = routingContext.queryParams().get("size") == null ? "10" : routingContext.queryParams().get("size");
String page = routingContext.queryParams().get("page") == null ? "1" : routingContext.queryParams().get("page");

int pageInt = Integer.parseInt(page);
int sizeInt = Integer.parseInt(size);
pageRequest.setPage(pageInt);
pageRequest.setSize(sizeInt);

if (sizeInt < 1 || pageInt < 1) {
throw new InvalidRequestException("Size and page have to be positive integers.");
}
}

return pageRequest;
}
}
129 changes: 81 additions & 48 deletions kafka-admin/src/main/java/org/bf2/admin/kafka/admin/model/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,45 +123,6 @@ public int compareTo(Topic topic) {
}
}

public static class TopicList {
private List<Topic> items;
private Integer offset;
private Integer limit;
private Integer count;

public List<Topic> getItems() {
return items;
}

public void setItems(List<Topic> items) {
this.items = items;
}

public Integer getOffset() {
return offset;
}

public void setOffset(Integer offset) {
this.offset = offset;
}

public Integer getLimit() {
return limit;
}

public void setLimit(Integer limit) {
this.limit = limit;
}

public Integer getCount() {
return count;
}

public void setCount(Integer count) {
this.count = count;
}
}

public static class NewTopicConfigEntry {
private String key;
private String value;
Expand Down Expand Up @@ -258,15 +219,37 @@ public void setConfig(List<NewTopicConfigEntry> config) {
}

public static class PageRequest {
private Integer limit;
private boolean deprecatedFormat;
private Integer page;
private Integer size;

@Deprecated
private Integer offset;
@Deprecated
private Integer limit;

public Integer getLimit() {
return limit;
public boolean isDeprecatedFormat() {
return deprecatedFormat;
}

public void setLimit(Integer limit) {
this.limit = limit;
public void setDeprecatedFormat(boolean deprecatedFormat) {
this.deprecatedFormat = deprecatedFormat;
}

public Integer getPage() {
return page;
}

public void setPage(Integer page) {
this.page = page;
}

public Integer getSize() {
return size;
}

public void setSize(Integer size) {
this.size = size;
}

public Integer getOffset() {
Expand All @@ -276,6 +259,14 @@ public Integer getOffset() {
public void setOffset(Integer offset) {
this.offset = offset;
}

public Integer getLimit() {
return limit;
}

public void setLimit(Integer limit) {
this.limit = limit;
}
}
public enum SortDirectionEnum {
DESC,
Expand Down Expand Up @@ -346,38 +337,80 @@ public void setState(String state) {
}
}

public static class ConsumerGroupList {
private List<ConsumerGroupDescription> items;
public static class PagedResponse<T> {
private List<T> items;
private Integer size;
private Integer page;
private Integer total;
// deprecated
private Integer offset;
private Integer limit;
private Integer count;

public List<ConsumerGroupDescription> getItems() {
public List<T> getItems() {
return items;
}
public void setItems(List<ConsumerGroupDescription> items) {
public void setItems(List<T> items) {
this.items = items;
}

public Integer getSize() {
return size;
}

public void setSize(Integer size) {
this.size = size;
}

public Integer getPage() {
return page;
}

public void setPage(Integer page) {
this.page = page;
}

public Integer getTotal() {
return total;
}

public void setTotal(Integer total) {
this.total = total;
}

//deprecated

public Integer getOffset() {
return offset;
}

public void setOffset(Integer offset) {
this.offset = offset;
}

public Integer getLimit() {
return limit;
}

public void setLimit(Integer limit) {
this.limit = limit;
}

public Integer getCount() {
return count;
}

public void setCount(Integer count) {
this.count = count;
}
}

public static class ConsumerGroupList extends PagedResponse<ConsumerGroupDescription> {
}

public static class TopicList extends PagedResponse<Topic> {
}

public static class Consumer {
private String memberId;
private String groupId;
Expand Down
Loading