From 89735784b833e9f0046b2293662ada756cb7101f Mon Sep 17 00:00:00 2001 From: Stanislav Knot Date: Thu, 17 Jun 2021 11:50:35 +0200 Subject: [PATCH 1/6] Pagination Signed-off-by: Stanislav Knot --- .../kafka/admin/ConsumerGroupOperations.java | 19 ++-- .../admin/kafka/admin/TopicOperations.java | 18 ++-- .../kafka/admin/handlers/RestOperations.java | 33 ++++--- .../bf2/admin/kafka/admin/model/Types.java | 86 ++++++++++--------- .../openapi-specs/kafka-admin-rest.yaml | 40 ++++----- .../systemtest/plain/RestEndpointTestIT.java | 60 ++++++++++--- 6 files changed, 151 insertions(+), 105 deletions(-) diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java index 9652d9a1..836e5735 100644 --- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java +++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java @@ -32,7 +32,7 @@ public class ConsumerGroupOperations { protected static final Logger log = LogManager.getLogger(ConsumerGroupOperations.class); - public static void getGroupList(KafkaAdminClient ac, Promise prom, Pattern pattern, int offset, final int limit, final String groupIdPrefix, Types.OrderByInput orderByInput) { + public static void getGroupList(KafkaAdminClient ac, Promise prom, Pattern pattern, Types.PageRequest pageRequest, final String groupIdPrefix, Types.OrderByInput orderByInput) { Promise> listConsumerGroupsFuture = Promise.promise(); ac.listConsumerGroups(listConsumerGroupsFuture); @@ -87,8 +87,8 @@ public static void getGroupList(KafkaAdminClient ac, Promise prom, Pattern patte .filter(i -> i != null) .collect(Collectors.toList()); - if (offset > list.size()) { - return Future.failedFuture(new InvalidRequestException("Offset (" + offset + ") cannot be greater than consumer group list size (" + list.size() + ")")); + if (pageRequest.getSize() * (pageRequest.getPage() - 1) > list.size()) { + return Future.failedFuture(new InvalidRequestException("Requested pagination incorrect. Beginning of list greater than full list size (" + list.size() + ")")); } if (Types.SortDirectionEnum.DESC.equals(orderByInput.getOrder())) { @@ -97,18 +97,13 @@ public static void getGroupList(KafkaAdminClient ac, Promise prom, Pattern patte list.sort(new CommonHandler.ConsumerGroupComparator(orderByInput.getField())); } - int tmpLimit = limit; - if (tmpLimit == 0) { - tmpLimit = list.size(); - } - - List croppedList = list.subList(offset, Math.min(offset + tmpLimit, list.size())); + List croppedList = list.subList((pageRequest.getPage() - 1) * pageRequest.getSize(), Math.min(pageRequest.getSize() * pageRequest.getPage(), list.size())); Types.ConsumerGroupList response = new Types.ConsumerGroupList(); response.setItems(croppedList); - response.setCount(croppedList.size()); - response.setLimit(tmpLimit); - response.setOffset(offset); + response.setTotal(list.size()); + response.setSize(pageRequest.getSize()); + response.setPage(pageRequest.getPage()); return Future.succeededFuture(response); }) diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java index a0e2c8f0..da579bd7 100644 --- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java +++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java @@ -111,7 +111,7 @@ private static Promise getTopicDescAndConf(KafkaAdminClient ac, Str return result; } - public static void getTopicList(KafkaAdminClient ac, Promise prom, Pattern pattern, int offset, final int limit, Types.OrderByInput orderByInput) { + public static void getTopicList(KafkaAdminClient ac, Promise prom, Pattern pattern, Types.PageRequest pageRequest, Types.OrderByInput orderByInput) { Promise> describeTopicsNamesPromise = Promise.promise(); Promise> describeTopicsPromise = Promise.promise(); Promise> describeTopicConfigPromise = Promise.promise(); @@ -151,21 +151,17 @@ public static void getTopicList(KafkaAdminClient ac, Promise prom, Pattern patte fullTopicDescriptions.sort(new CommonHandler.TopicComparator(orderByInput.getField())); } - if (offset > fullTopicDescriptions.size()) { - return Future.failedFuture(new InvalidRequestException("Offset (" + offset + ") cannot be greater than topic list size (" + fullTopicDescriptions.size() + ")")); - } - int tmpLimit = limit; - if (tmpLimit == 0) { - tmpLimit = fullTopicDescriptions.size(); + if ((pageRequest.getPage() - 1) * pageRequest.getSize() > fullTopicDescriptions.size()) { + return Future.failedFuture(new InvalidRequestException("Requested pagination incorrect. Beginning of list greater than full list size (" + fullTopicDescriptions.size() + ")")); } - List croppedList = fullTopicDescriptions.subList(offset, Math.min(offset + tmpLimit, fullTopicDescriptions.size())); + List croppedList = fullTopicDescriptions.subList((pageRequest.getPage() - 1) * pageRequest.getSize(), Math.min(pageRequest.getPage() * pageRequest.getSize(), fullTopicDescriptions.size())); Types.TopicList topicList = new Types.TopicList(); topicList.setItems(croppedList); - topicList.setCount(croppedList.size()); - topicList.setLimit(tmpLimit); - topicList.setOffset(offset); + topicList.setTotal(fullTopicDescriptions.size()); + topicList.setPage(pageRequest.getPage()); + topicList.setSize(pageRequest.getSize()); return Future.succeededFuture(topicList); }).onComplete(finalRes -> { if (finalRes.failed()) { diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/RestOperations.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/RestOperations.java index c315bb88..241d9f3a 100644 --- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/RestOperations.java +++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/RestOperations.java @@ -215,8 +215,8 @@ public void listTopics(RoutingContext routingContext) { httpMetrics.getListTopicsCounter().increment(); httpMetrics.getRequestsCounter().increment(); String filter = routingContext.queryParams().get("filter"); - String limit = routingContext.queryParams().get("limit") == null ? "0" : routingContext.queryParams().get("limit"); - String offset = routingContext.queryParams().get("offset") == null ? "0" : routingContext.queryParams().get("offset"); + String size = routingContext.queryParams().get("size") == null ? "10" : routingContext.queryParams().get("size"); + String page = routingContext.queryParams().get("page") == null ? "1" : routingContext.queryParams().get("page"); Types.SortDirectionEnum sortReverse = Types.SortDirectionEnum.fromString(routingContext.queryParams().get("order")); String sortKey = routingContext.queryParams().get("orderKey") == null ? "name" : routingContext.queryParams().get("orderKey"); Types.OrderByInput orderBy = new Types.OrderByInput(); @@ -235,13 +235,20 @@ public void listTopics(RoutingContext routingContext) { prom.fail(ac.cause()); } else { try { - if (Integer.parseInt(offset) < 0 || Integer.parseInt(limit) < 0) { - throw new InvalidRequestException("Offset and limit have to be positive integers."); + int pageInt = Integer.parseInt(page); + int sizeInt = Integer.parseInt(size); + if (sizeInt < 1 || pageInt < 1) { + throw new InvalidRequestException("Size and page have to be positive integers."); } - TopicOperations.getTopicList(ac.result(), prom, pattern, Integer.parseInt(offset), Integer.parseInt(limit), orderBy); + Types.PageRequest pageRequest = new Types.PageRequest(); + pageRequest.setPage(pageInt); + pageRequest.setSize(sizeInt); + + TopicOperations.getTopicList(ac.result(), prom, pattern, pageRequest, orderBy); } catch (NumberFormatException | InvalidRequestException e) { prom.fail(e); processResponse(prom, routingContext, HttpResponseStatus.BAD_REQUEST, httpMetrics, httpMetrics.getListTopicRequestTimer(), requestTimerSample); + return; } } processResponse(prom, routingContext, HttpResponseStatus.OK, httpMetrics, httpMetrics.getListTopicRequestTimer(), requestTimerSample); @@ -256,8 +263,8 @@ public void listGroups(RoutingContext routingContext) { httpMetrics.getRequestsCounter().increment(); String topicFilter = routingContext.queryParams().get("topic"); String consumerGroupIdFilter = routingContext.queryParams().get("group-id-filter") == null ? "" : routingContext.queryParams().get("group-id-filter"); - String limit = routingContext.queryParams().get("limit") == null ? "0" : routingContext.queryParams().get("limit"); - String offset = routingContext.queryParams().get("offset") == null ? "0" : routingContext.queryParams().get("offset"); + String size = routingContext.queryParams().get("size") == null ? "10" : routingContext.queryParams().get("size"); + String page = routingContext.queryParams().get("page") == null ? "1" : routingContext.queryParams().get("page"); Types.SortDirectionEnum sortReverse = Types.SortDirectionEnum.fromString(routingContext.queryParams().get("order")); String sortKey = routingContext.queryParams().get("orderKey") == null ? "name" : routingContext.queryParams().get("orderKey"); @@ -278,13 +285,19 @@ public void listGroups(RoutingContext routingContext) { prom.fail(ac.cause()); } else { try { - if (Integer.parseInt(offset) < 0 || Integer.parseInt(limit) < 0) { - throw new InvalidRequestException("Offset and limit have to be positive integers."); + int pageInt = Integer.parseInt(page); + int sizeInt = Integer.parseInt(size); + if (sizeInt < 1 || pageInt < 1) { + throw new InvalidRequestException("Size and page have to be positive integers."); } - ConsumerGroupOperations.getGroupList(ac.result(), prom, pattern, Integer.parseInt(offset), Integer.parseInt(limit), consumerGroupIdFilter, orderBy); + Types.PageRequest pageRequest = new Types.PageRequest(); + pageRequest.setPage(pageInt); + pageRequest.setSize(sizeInt); + ConsumerGroupOperations.getGroupList(ac.result(), prom, pattern, pageRequest, consumerGroupIdFilter, orderBy); } catch (NumberFormatException | InvalidRequestException e) { prom.fail(e); processResponse(prom, routingContext, HttpResponseStatus.BAD_REQUEST, httpMetrics, httpMetrics.getListGroupsRequestTimer(), requestTimerSample); + return; } } processResponse(prom, routingContext, HttpResponseStatus.OK, httpMetrics, httpMetrics.getListGroupsRequestTimer(), requestTimerSample); diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/model/Types.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/model/Types.java index f9ee1964..5f6c6a3c 100644 --- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/model/Types.java +++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/model/Types.java @@ -125,9 +125,9 @@ public int compareTo(Topic topic) { public static class TopicList { private List items; - private Integer offset; - private Integer limit; - private Integer count; + private Integer page; + private Integer size; + private Integer total; public List getItems() { return items; @@ -137,28 +137,28 @@ public void setItems(List items) { this.items = items; } - public Integer getOffset() { - return offset; + public Integer getPage() { + return page; } - public void setOffset(Integer offset) { - this.offset = offset; + public void setPage(Integer page) { + this.page = page; } - public Integer getLimit() { - return limit; + public Integer getSize() { + return size; } - public void setLimit(Integer limit) { - this.limit = limit; + public void setSize(Integer size) { + this.size = size; } - public Integer getCount() { - return count; + public Integer getTotal() { + return total; } - public void setCount(Integer count) { - this.count = count; + public void setTotal(Integer total) { + this.total = total; } } @@ -258,23 +258,23 @@ public void setConfig(List config) { } public static class PageRequest { - private Integer limit; - private Integer offset; + private Integer page; + private Integer size; - public Integer getLimit() { - return limit; + public Integer getPage() { + return page; } - public void setLimit(Integer limit) { - this.limit = limit; + public void setPage(Integer page) { + this.page = page; } - public Integer getOffset() { - return offset; + public Integer getSize() { + return size; } - public void setOffset(Integer offset) { - this.offset = offset; + public void setSize(Integer size) { + this.size = size; } } public enum SortDirectionEnum { @@ -348,9 +348,9 @@ public void setState(String state) { public static class ConsumerGroupList { private List items; - private Integer offset; - private Integer limit; - private Integer count; + private Integer size; + private Integer page; + private Integer total; public List getItems() { return items; @@ -358,23 +358,29 @@ public List getItems() { public void setItems(List items) { this.items = items; } - public Integer getOffset() { - return offset; + + public Integer getSize() { + return size; } - public void setOffset(Integer offset) { - this.offset = offset; + + public void setSize(Integer size) { + this.size = size; } - public Integer getLimit() { - return limit; + + public Integer getPage() { + return page; } - public void setLimit(Integer limit) { - this.limit = limit; + + public void setPage(Integer page) { + this.page = page; } - public Integer getCount() { - return count; + + public Integer getTotal() { + return total; } - public void setCount(Integer count) { - this.count = count; + + public void setTotal(Integer total) { + this.total = total; } } diff --git a/kafka-admin/src/main/resources/openapi-specs/kafka-admin-rest.yaml b/kafka-admin/src/main/resources/openapi-specs/kafka-admin-rest.yaml index edd636f0..5c394967 100644 --- a/kafka-admin/src/main/resources/openapi-specs/kafka-admin-rest.yaml +++ b/kafka-admin/src/main/resources/openapi-specs/kafka-admin-rest.yaml @@ -15,8 +15,8 @@ paths: /topics: get: parameters: - - name: limit - description: Maximum number of topics to return + - name: size + description: Maximum number of topics to return on single page schema: format: int32 type: integer @@ -27,8 +27,8 @@ paths: schema: type: string in: query - - name: offset - description: The page offset when returning the limit of requested topics. + - name: page + description: The page when returning the limit of requested topics. schema: format: int32 type: integer @@ -344,13 +344,13 @@ paths: summary: API endpoints for consumer groups under a Kafka topic get: parameters: - - name: limit - description: Maximum number of consumer groups to returnd + - name: sizr + description: Maximum number of consumer groups to returned on single page schema: type: integer in: query - - name: offset - description: The page offset when returning the list of consumer groups + - name: page + description: The page when returning the list of consumer groups schema: type: integer in: query @@ -543,18 +543,18 @@ components: description: A list of topics. required: - items - - offset - - limit - - count + - page + - size + - total type: object properties: offset: - description: The page offset + description: The page type: integer - limit: + size: description: number of entries per page type: integer - count: + total: description: Total number of topics type: integer items: @@ -729,8 +729,8 @@ components: description: A list of consumer groups required: - items - - count - - limit + - total + - size - offset type: object properties: @@ -739,14 +739,14 @@ components: type: array items: $ref: '#/components/schemas/ConsumerGroup' - count: + total: description: The total number of consumer groups. type: number - limit: + size: description: The number of consumer groups per page. type: number - offset: - description: The page offset + page: + description: The page type: integer example: count: 1 diff --git a/systemtests/src/test/java/org/bf2/admin/kafka/systemtest/plain/RestEndpointTestIT.java b/systemtests/src/test/java/org/bf2/admin/kafka/systemtest/plain/RestEndpointTestIT.java index fb1b6cfe..07d66288 100644 --- a/systemtests/src/test/java/org/bf2/admin/kafka/systemtest/plain/RestEndpointTestIT.java +++ b/systemtests/src/test/java/org/bf2/admin/kafka/systemtest/plain/RestEndpointTestIT.java @@ -133,6 +133,42 @@ void testTopicListWithFilter(Vertx vertx, VertxTestContext testContext, Extensio assertThat(testContext.awaitCompletion(1, TimeUnit.MINUTES)).isTrue(); } + @ParameterizedTest(name = "testTopicListWithPagination-{0}") + @Execution(ExecutionMode.CONCURRENT) + @ValueSource(ints = {1, 2, 3}) + void testTopicListWithPagination(int page, Vertx vertx, VertxTestContext testContext, ExtensionContext extensionContext) throws Exception { + AdminClient kafkaClient = AdminClient.create(RequestUtils.getKafkaAdminConfig(DEPLOYMENT_MANAGER + .getKafkaContainer(extensionContext).getBootstrapServers())); + LOGGER.info("Display name: " + extensionContext.getDisplayName()); + int publishedAdminPort = DEPLOYMENT_MANAGER.getAdminPort(extensionContext); + List topics = new ArrayList<>(); + for (int i = 0; i < 5; i++) topics.add(new NewTopic(UUID.randomUUID().toString(), 1, (short) 1)); + kafkaClient.createTopics(topics); + DynamicWait.waitForTopicsExists(topics.stream().map(NewTopic::name).collect(Collectors.toList()), kafkaClient); + + HttpClient client = createHttpClient(vertx); + client.request(HttpMethod.GET, publishedAdminPort, "localhost", "/rest/topics?size=3&page=" + page) + .compose(req -> req.send().onSuccess(response -> { + // we want to get page 3 of 5 topics. The page size is 3, so we have just 2 pages + if (page == 3 && response.statusCode() != ReturnCodes.FAILED_REQUEST.code) { + testContext.failNow("Status code not correct"); + } + assertStrictTransportSecurityDisabled(response, testContext); + }).onFailure(testContext::failNow).compose(HttpClientResponse::body)) + .onComplete(testContext.succeeding(buffer -> testContext.verify(() -> { + assertThat(testContext.failed()).isFalse(); + // 5 topic, size of page is 3. First page should have 3 topic, second page just 2 + if (page == 1) { + assertThat(MODEL_DESERIALIZER.getNames(buffer).size()).isEqualTo(3); + } + if (page == 2) { + assertThat(MODEL_DESERIALIZER.getNames(buffer).size()).isEqualTo(2); + } + testContext.completeNow(); + }))); + assertThat(testContext.awaitCompletion(1, TimeUnit.MINUTES)).isTrue(); + } + @ParallelTest void testTopicListWithFilterNone(Vertx vertx, VertxTestContext testContext, ExtensionContext extensionContext) throws Exception { AdminClient kafkaClient = AdminClient.create(RequestUtils.getKafkaAdminConfig(DEPLOYMENT_MANAGER @@ -159,10 +195,10 @@ void testTopicListWithFilterNone(Vertx vertx, VertxTestContext testContext, Exte assertThat(testContext.awaitCompletion(1, TimeUnit.MINUTES)).isTrue(); } - @ParameterizedTest(name = "testTopicListWithLimit-{0}") + @ParameterizedTest(name = "testTopicListWithSize-{0}") @Execution(ExecutionMode.CONCURRENT) @ValueSource(ints = {1, 2, 3, 5}) - void testTopicListWithLimit(int limit, Vertx vertx, VertxTestContext testContext, ExtensionContext extensionContext) throws Exception { + void testTopicListWithSize(int size, Vertx vertx, VertxTestContext testContext, ExtensionContext extensionContext) throws Exception { AdminClient kafkaClient = AdminClient.create(RequestUtils.getKafkaAdminConfig(DEPLOYMENT_MANAGER .getKafkaContainer(extensionContext).getBootstrapServers())); int publishedAdminPort = DEPLOYMENT_MANAGER.getAdminPort(extensionContext); @@ -173,7 +209,7 @@ void testTopicListWithLimit(int limit, Vertx vertx, VertxTestContext testContext DynamicWait.waitForTopicsExists(topics.stream().map(NewTopic::name).collect(Collectors.toList()), kafkaClient); HttpClient client = createHttpClient(vertx); - client.request(HttpMethod.GET, publishedAdminPort, "localhost", "/rest/topics?limit=" + limit) + client.request(HttpMethod.GET, publishedAdminPort, "localhost", "/rest/topics?size=" + size) .compose(req -> req.send().onSuccess(response -> { if (response.statusCode() != ReturnCodes.SUCCESS.code) { testContext.failNow("Status code not correct"); @@ -182,16 +218,16 @@ void testTopicListWithLimit(int limit, Vertx vertx, VertxTestContext testContext }).onFailure(testContext::failNow).compose(HttpClientResponse::body)) .onComplete(testContext.succeeding(buffer -> testContext.verify(() -> { assertThat(testContext.failed()).isFalse(); - assertThat(MODEL_DESERIALIZER.getNames(buffer).size()).isEqualTo(Math.min(limit, 3)); + assertThat(MODEL_DESERIALIZER.getNames(buffer).size()).isEqualTo(Math.min(size, 3)); testContext.completeNow(); }))); assertThat(testContext.awaitCompletion(1, TimeUnit.MINUTES)).isTrue(); } - @ParameterizedTest(name = "testTopicListWithOffset-{0}") + @ParameterizedTest(name = "testTopicListWithPage-{0}") @Execution(ExecutionMode.CONCURRENT) - @ValueSource(ints = {0, 1, 3, 4}) - void testTopicListWithOffset(int offset, Vertx vertx, VertxTestContext testContext, ExtensionContext extensionContext) throws Exception { + @ValueSource(ints = {1, 3, 4}) + void testTopicListWithPage(int page, Vertx vertx, VertxTestContext testContext, ExtensionContext extensionContext) throws Exception { AdminClient kafkaClient = AdminClient.create(RequestUtils.getKafkaAdminConfig(DEPLOYMENT_MANAGER .getKafkaContainer(extensionContext).getBootstrapServers())); LOGGER.info("Display name: " + extensionContext.getDisplayName()); @@ -202,18 +238,18 @@ void testTopicListWithOffset(int offset, Vertx vertx, VertxTestContext testConte DynamicWait.waitForTopicsExists(topics.stream().map(NewTopic::name).collect(Collectors.toList()), kafkaClient); HttpClient client = createHttpClient(vertx); - client.request(HttpMethod.GET, publishedAdminPort, "localhost", "/rest/topics?offset=" + offset) + client.request(HttpMethod.GET, publishedAdminPort, "localhost", "/rest/topics?page=" + page) .compose(req -> req.send().onSuccess(response -> { - if ((response.statusCode() != ReturnCodes.SUCCESS.code && offset != 4) - || (response.statusCode() != ReturnCodes.FAILED_REQUEST.code && offset == 4)) { + if ((response.statusCode() != ReturnCodes.SUCCESS.code && page == 1) + || (response.statusCode() != ReturnCodes.FAILED_REQUEST.code && page != 1)) { testContext.failNow("Status code not correct"); } assertStrictTransportSecurityDisabled(response, testContext); }).onFailure(testContext::failNow).compose(HttpClientResponse::body)) .onComplete(testContext.succeeding(buffer -> testContext.verify(() -> { assertThat(testContext.failed()).isFalse(); - if (offset != 4) { - assertThat(MODEL_DESERIALIZER.getNames(buffer).size()).isEqualTo(3 - offset); + if (page == 1) { + assertThat(MODEL_DESERIALIZER.getNames(buffer).size()).isEqualTo(3); } testContext.completeNow(); }))); From 1e4c3d52b246e1535aa783bf415283c82e4945ac Mon Sep 17 00:00:00 2001 From: Stanislav Knot Date: Thu, 17 Jun 2021 14:22:04 +0200 Subject: [PATCH 2/6] suggestions Signed-off-by: Stanislav Knot --- .../kafka/admin/ConsumerGroupOperations.java | 2 +- .../admin/kafka/admin/TopicOperations.java | 2 +- .../kafka/admin/handlers/RestOperations.java | 41 +++++++------- .../bf2/admin/kafka/admin/model/Types.java | 53 ++++--------------- .../openapi-specs/kafka-admin-rest.yaml | 4 +- 5 files changed, 32 insertions(+), 70 deletions(-) diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java index 836e5735..f6c3be45 100644 --- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java +++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java @@ -87,7 +87,7 @@ public static void getGroupList(KafkaAdminClient ac, Promise prom, Pattern patte .filter(i -> i != null) .collect(Collectors.toList()); - if (pageRequest.getSize() * (pageRequest.getPage() - 1) > list.size()) { + if (pageRequest.getSize() * (pageRequest.getPage() - 1) >= list.size()) { return Future.failedFuture(new InvalidRequestException("Requested pagination incorrect. Beginning of list greater than full list size (" + list.size() + ")")); } diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java index da579bd7..4edcbd77 100644 --- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java +++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java @@ -151,7 +151,7 @@ public static void getTopicList(KafkaAdminClient ac, Promise prom, Pattern patte fullTopicDescriptions.sort(new CommonHandler.TopicComparator(orderByInput.getField())); } - if ((pageRequest.getPage() - 1) * pageRequest.getSize() > fullTopicDescriptions.size()) { + if ((pageRequest.getPage() - 1) * pageRequest.getSize() >= fullTopicDescriptions.size()) { return Future.failedFuture(new InvalidRequestException("Requested pagination incorrect. Beginning of list greater than full list size (" + fullTopicDescriptions.size() + ")")); } diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/RestOperations.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/RestOperations.java index 241d9f3a..c6d82f28 100644 --- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/RestOperations.java +++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/RestOperations.java @@ -215,8 +215,6 @@ public void listTopics(RoutingContext routingContext) { httpMetrics.getListTopicsCounter().increment(); httpMetrics.getRequestsCounter().increment(); String filter = routingContext.queryParams().get("filter"); - String size = routingContext.queryParams().get("size") == null ? "10" : routingContext.queryParams().get("size"); - String page = routingContext.queryParams().get("page") == null ? "1" : routingContext.queryParams().get("page"); Types.SortDirectionEnum sortReverse = Types.SortDirectionEnum.fromString(routingContext.queryParams().get("order")); String sortKey = routingContext.queryParams().get("orderKey") == null ? "name" : routingContext.queryParams().get("orderKey"); Types.OrderByInput orderBy = new Types.OrderByInput(); @@ -235,16 +233,7 @@ public void listTopics(RoutingContext routingContext) { prom.fail(ac.cause()); } else { try { - int pageInt = Integer.parseInt(page); - int sizeInt = Integer.parseInt(size); - if (sizeInt < 1 || pageInt < 1) { - throw new InvalidRequestException("Size and page have to be positive integers."); - } - Types.PageRequest pageRequest = new Types.PageRequest(); - pageRequest.setPage(pageInt); - pageRequest.setSize(sizeInt); - - TopicOperations.getTopicList(ac.result(), prom, pattern, pageRequest, orderBy); + TopicOperations.getTopicList(ac.result(), prom, pattern, parsePageRequest(routingContext), orderBy); } catch (NumberFormatException | InvalidRequestException e) { prom.fail(e); processResponse(prom, routingContext, HttpResponseStatus.BAD_REQUEST, httpMetrics, httpMetrics.getListTopicRequestTimer(), requestTimerSample); @@ -263,8 +252,6 @@ public void listGroups(RoutingContext routingContext) { httpMetrics.getRequestsCounter().increment(); String topicFilter = routingContext.queryParams().get("topic"); String consumerGroupIdFilter = routingContext.queryParams().get("group-id-filter") == null ? "" : routingContext.queryParams().get("group-id-filter"); - String size = routingContext.queryParams().get("size") == null ? "10" : routingContext.queryParams().get("size"); - String page = routingContext.queryParams().get("page") == null ? "1" : routingContext.queryParams().get("page"); Types.SortDirectionEnum sortReverse = Types.SortDirectionEnum.fromString(routingContext.queryParams().get("order")); String sortKey = routingContext.queryParams().get("orderKey") == null ? "name" : routingContext.queryParams().get("orderKey"); @@ -285,15 +272,7 @@ public void listGroups(RoutingContext routingContext) { prom.fail(ac.cause()); } else { try { - int pageInt = Integer.parseInt(page); - int sizeInt = Integer.parseInt(size); - if (sizeInt < 1 || pageInt < 1) { - throw new InvalidRequestException("Size and page have to be positive integers."); - } - Types.PageRequest pageRequest = new Types.PageRequest(); - pageRequest.setPage(pageInt); - pageRequest.setSize(sizeInt); - ConsumerGroupOperations.getGroupList(ac.result(), prom, pattern, pageRequest, consumerGroupIdFilter, orderBy); + ConsumerGroupOperations.getGroupList(ac.result(), prom, pattern, parsePageRequest(routingContext), consumerGroupIdFilter, orderBy); } catch (NumberFormatException | InvalidRequestException e) { prom.fail(e); processResponse(prom, routingContext, HttpResponseStatus.BAD_REQUEST, httpMetrics, httpMetrics.getListGroupsRequestTimer(), requestTimerSample); @@ -427,4 +406,20 @@ public void errorHandler(RoutingContext routingContext) { prom.fail(routingContext.failure()); processResponse(prom, routingContext, HttpResponseStatus.OK, httpMetrics, httpMetrics.getOpenApiRequestTimer(), requestTimerSample); } + + private Types.PageRequest parsePageRequest(RoutingContext routingContext) { + String size = routingContext.queryParams().get("size") == null ? "10" : routingContext.queryParams().get("size"); + String page = routingContext.queryParams().get("page") == null ? "1" : routingContext.queryParams().get("page"); + + int pageInt = Integer.parseInt(page); + int sizeInt = Integer.parseInt(size); + if (sizeInt < 1 || pageInt < 1) { + throw new InvalidRequestException("Size and page have to be positive integers."); + } + Types.PageRequest pageRequest = new Types.PageRequest(); + pageRequest.setPage(pageInt); + pageRequest.setSize(sizeInt); + + return pageRequest; + } } diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/model/Types.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/model/Types.java index 5f6c6a3c..56a72fc3 100644 --- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/model/Types.java +++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/model/Types.java @@ -123,45 +123,6 @@ public int compareTo(Topic topic) { } } - public static class TopicList { - private List items; - private Integer page; - private Integer size; - private Integer total; - - public List getItems() { - return items; - } - - public void setItems(List items) { - this.items = items; - } - - public Integer getPage() { - return page; - } - - public void setPage(Integer page) { - this.page = page; - } - - public Integer getSize() { - return size; - } - - public void setSize(Integer size) { - this.size = size; - } - - public Integer getTotal() { - return total; - } - - public void setTotal(Integer total) { - this.total = total; - } - } - public static class NewTopicConfigEntry { private String key; private String value; @@ -346,16 +307,16 @@ public void setState(String state) { } } - public static class ConsumerGroupList { - private List items; + public static class PagedResponse { + private List items; private Integer size; private Integer page; private Integer total; - public List getItems() { + public List getItems() { return items; } - public void setItems(List items) { + public void setItems(List items) { this.items = items; } @@ -384,6 +345,12 @@ public void setTotal(Integer total) { } } + public static class ConsumerGroupList extends PagedResponse { + } + + public static class TopicList extends PagedResponse { + } + public static class Consumer { private String memberId; private String groupId; diff --git a/kafka-admin/src/main/resources/openapi-specs/kafka-admin-rest.yaml b/kafka-admin/src/main/resources/openapi-specs/kafka-admin-rest.yaml index 5c394967..7482c7a8 100644 --- a/kafka-admin/src/main/resources/openapi-specs/kafka-admin-rest.yaml +++ b/kafka-admin/src/main/resources/openapi-specs/kafka-admin-rest.yaml @@ -344,8 +344,8 @@ paths: summary: API endpoints for consumer groups under a Kafka topic get: parameters: - - name: sizr - description: Maximum number of consumer groups to returned on single page + - name: size + description: Maximum number of consumer groups to return on single page schema: type: integer in: query From 8299daa79b879c6533b758e4d80e82aa6a3f3ad5 Mon Sep 17 00:00:00 2001 From: Stanislav Knot Date: Thu, 17 Jun 2021 15:06:58 +0200 Subject: [PATCH 3/6] suggestions Signed-off-by: Stanislav Knot --- .../bf2/admin/kafka/admin/ConsumerGroupOperations.java | 2 +- .../java/org/bf2/admin/kafka/admin/TopicOperations.java | 2 +- .../admin/kafka/systemtest/plain/RestEndpointTestIT.java | 8 +++++--- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java index f6c3be45..0327d69e 100644 --- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java +++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java @@ -87,7 +87,7 @@ public static void getGroupList(KafkaAdminClient ac, Promise prom, Pattern patte .filter(i -> i != null) .collect(Collectors.toList()); - if (pageRequest.getSize() * (pageRequest.getPage() - 1) >= list.size()) { + if (list.size() > 0 && pageRequest.getSize() * (pageRequest.getPage() - 1) >= list.size()) { return Future.failedFuture(new InvalidRequestException("Requested pagination incorrect. Beginning of list greater than full list size (" + list.size() + ")")); } diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java index 4edcbd77..4f04cd17 100644 --- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java +++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java @@ -151,7 +151,7 @@ public static void getTopicList(KafkaAdminClient ac, Promise prom, Pattern patte fullTopicDescriptions.sort(new CommonHandler.TopicComparator(orderByInput.getField())); } - if ((pageRequest.getPage() - 1) * pageRequest.getSize() >= fullTopicDescriptions.size()) { + if (fullTopicDescriptions.size() > 0 && (pageRequest.getPage() - 1) * pageRequest.getSize() >= fullTopicDescriptions.size()) { return Future.failedFuture(new InvalidRequestException("Requested pagination incorrect. Beginning of list greater than full list size (" + fullTopicDescriptions.size() + ")")); } diff --git a/systemtests/src/test/java/org/bf2/admin/kafka/systemtest/plain/RestEndpointTestIT.java b/systemtests/src/test/java/org/bf2/admin/kafka/systemtest/plain/RestEndpointTestIT.java index 07d66288..6c23d4ea 100644 --- a/systemtests/src/test/java/org/bf2/admin/kafka/systemtest/plain/RestEndpointTestIT.java +++ b/systemtests/src/test/java/org/bf2/admin/kafka/systemtest/plain/RestEndpointTestIT.java @@ -139,7 +139,6 @@ void testTopicListWithFilter(Vertx vertx, VertxTestContext testContext, Extensio void testTopicListWithPagination(int page, Vertx vertx, VertxTestContext testContext, ExtensionContext extensionContext) throws Exception { AdminClient kafkaClient = AdminClient.create(RequestUtils.getKafkaAdminConfig(DEPLOYMENT_MANAGER .getKafkaContainer(extensionContext).getBootstrapServers())); - LOGGER.info("Display name: " + extensionContext.getDisplayName()); int publishedAdminPort = DEPLOYMENT_MANAGER.getAdminPort(extensionContext); List topics = new ArrayList<>(); for (int i = 0; i < 5; i++) topics.add(new NewTopic(UUID.randomUUID().toString(), 1, (short) 1)); @@ -150,7 +149,11 @@ void testTopicListWithPagination(int page, Vertx vertx, VertxTestContext testCon client.request(HttpMethod.GET, publishedAdminPort, "localhost", "/rest/topics?size=3&page=" + page) .compose(req -> req.send().onSuccess(response -> { // we want to get page 3 of 5 topics. The page size is 3, so we have just 2 pages - if (page == 3 && response.statusCode() != ReturnCodes.FAILED_REQUEST.code) { + if (page == 3) { + if (response.statusCode() != ReturnCodes.FAILED_REQUEST.code) { + testContext.failNow("Status code not correct"); + } + } else if (response.statusCode() != ReturnCodes.SUCCESS.code) { testContext.failNow("Status code not correct"); } assertStrictTransportSecurityDisabled(response, testContext); @@ -230,7 +233,6 @@ void testTopicListWithSize(int size, Vertx vertx, VertxTestContext testContext, void testTopicListWithPage(int page, Vertx vertx, VertxTestContext testContext, ExtensionContext extensionContext) throws Exception { AdminClient kafkaClient = AdminClient.create(RequestUtils.getKafkaAdminConfig(DEPLOYMENT_MANAGER .getKafkaContainer(extensionContext).getBootstrapServers())); - LOGGER.info("Display name: " + extensionContext.getDisplayName()); int publishedAdminPort = DEPLOYMENT_MANAGER.getAdminPort(extensionContext); List topics = new ArrayList<>(); for (int i = 0; i < 3; i++) topics.add(new NewTopic(UUID.randomUUID().toString(), 1, (short) 1)); From 7b7c12086aa2d184b3c4dce9b56fe5e595373451 Mon Sep 17 00:00:00 2001 From: Stanislav Knot Date: Fri, 18 Jun 2021 10:40:31 +0200 Subject: [PATCH 4/6] support deprecated pagination Signed-off-by: Stanislav Knot --- .../kafka/admin/ConsumerGroupOperations.java | 30 ++++++++--- .../admin/kafka/admin/TopicOperations.java | 29 ++++++++--- .../kafka/admin/handlers/RestOperations.java | 35 +++++++++---- .../bf2/admin/kafka/admin/model/Types.java | 51 +++++++++++++++++++ .../openapi-specs/kafka-admin-rest.yaml | 44 ++++++++++++---- 5 files changed, 155 insertions(+), 34 deletions(-) diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java index 0327d69e..3f967882 100644 --- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java +++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java @@ -87,23 +87,37 @@ public static void getGroupList(KafkaAdminClient ac, Promise prom, Pattern patte .filter(i -> i != null) .collect(Collectors.toList()); - if (list.size() > 0 && pageRequest.getSize() * (pageRequest.getPage() - 1) >= list.size()) { - return Future.failedFuture(new InvalidRequestException("Requested pagination incorrect. Beginning of list greater than full list size (" + list.size() + ")")); - } - if (Types.SortDirectionEnum.DESC.equals(orderByInput.getOrder())) { list.sort(new CommonHandler.ConsumerGroupComparator(orderByInput.getField()).reversed()); } else { list.sort(new CommonHandler.ConsumerGroupComparator(orderByInput.getField())); } - List croppedList = list.subList((pageRequest.getPage() - 1) * pageRequest.getSize(), Math.min(pageRequest.getSize() * pageRequest.getPage(), list.size())); - Types.ConsumerGroupList response = new Types.ConsumerGroupList(); + List croppedList; + if (pageRequest.isDeprecatedFormat()) { + if (pageRequest.getOffset() > list.size()) { + return Future.failedFuture(new InvalidRequestException("Offset (" + pageRequest.getOffset() + ") cannot be greater than consumer group list size (" + list.size() + ")")); + } + int tmpLimit = pageRequest.getLimit(); + if (tmpLimit == 0) { + tmpLimit = list.size(); + } + croppedList = list.subList(pageRequest.getOffset(), Math.min(pageRequest.getOffset() + tmpLimit, list.size())); + response.setLimit(pageRequest.getLimit()); + response.setOffset(pageRequest.getOffset()); + } else { + if (list.size() > 0 && pageRequest.getSize() * (pageRequest.getPage() - 1) >= list.size()) { + return Future.failedFuture(new InvalidRequestException("Requested pagination incorrect. Beginning of list greater than full list size (" + list.size() + ")")); + } + croppedList = list.subList((pageRequest.getPage() - 1) * pageRequest.getSize(), Math.min(pageRequest.getSize() * pageRequest.getPage(), list.size())); + response.setSize(pageRequest.getSize()); + response.setPage(pageRequest.getPage()); + } + response.setItems(croppedList); response.setTotal(list.size()); - response.setSize(pageRequest.getSize()); - response.setPage(pageRequest.getPage()); + return Future.succeededFuture(response); }) diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java index 4f04cd17..2bfa912e 100644 --- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java +++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java @@ -151,17 +151,34 @@ public static void getTopicList(KafkaAdminClient ac, Promise prom, Pattern patte fullTopicDescriptions.sort(new CommonHandler.TopicComparator(orderByInput.getField())); } - if (fullTopicDescriptions.size() > 0 && (pageRequest.getPage() - 1) * pageRequest.getSize() >= fullTopicDescriptions.size()) { - return Future.failedFuture(new InvalidRequestException("Requested pagination incorrect. Beginning of list greater than full list size (" + fullTopicDescriptions.size() + ")")); + + Types.TopicList topicList = new Types.TopicList(); + List croppedList; + if (pageRequest.isDeprecatedFormat()) { + // deprecated + if (pageRequest.getOffset() > fullTopicDescriptions.size()) { + return Future.failedFuture(new InvalidRequestException("Offset (" + pageRequest.getOffset() + ") cannot be greater than topic list size (" + fullTopicDescriptions.size() + ")")); + } + int tmpLimit = pageRequest.getLimit(); + if (tmpLimit == 0) { + tmpLimit = fullTopicDescriptions.size(); + } + croppedList = fullTopicDescriptions.subList(pageRequest.getOffset(), Math.min(pageRequest.getOffset() + tmpLimit, fullTopicDescriptions.size())); + topicList.setOffset(pageRequest.getOffset()); + topicList.setLimit(pageRequest.getLimit()); + } else { + if (fullTopicDescriptions.size() > 0 && (pageRequest.getPage() - 1) * pageRequest.getSize() >= fullTopicDescriptions.size()) { + return Future.failedFuture(new InvalidRequestException("Requested pagination incorrect. Beginning of list greater than full list size (" + fullTopicDescriptions.size() + ")")); + } + croppedList = fullTopicDescriptions.subList((pageRequest.getPage() - 1) * pageRequest.getSize(), Math.min(pageRequest.getPage() * pageRequest.getSize(), fullTopicDescriptions.size())); + topicList.setPage(pageRequest.getPage()); + topicList.setSize(pageRequest.getSize()); } - List croppedList = fullTopicDescriptions.subList((pageRequest.getPage() - 1) * pageRequest.getSize(), Math.min(pageRequest.getPage() * pageRequest.getSize(), fullTopicDescriptions.size())); - Types.TopicList topicList = new Types.TopicList(); topicList.setItems(croppedList); topicList.setTotal(fullTopicDescriptions.size()); - topicList.setPage(pageRequest.getPage()); - topicList.setSize(pageRequest.getSize()); + return Future.succeededFuture(topicList); }).onComplete(finalRes -> { if (finalRes.failed()) { diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/RestOperations.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/RestOperations.java index c6d82f28..a098cacb 100644 --- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/RestOperations.java +++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/RestOperations.java @@ -408,17 +408,34 @@ public void errorHandler(RoutingContext routingContext) { } private Types.PageRequest parsePageRequest(RoutingContext routingContext) { - String size = routingContext.queryParams().get("size") == null ? "10" : routingContext.queryParams().get("size"); - String page = routingContext.queryParams().get("page") == null ? "1" : routingContext.queryParams().get("page"); + Types.PageRequest pageRequest = new Types.PageRequest(); - int pageInt = Integer.parseInt(page); - int sizeInt = Integer.parseInt(size); - if (sizeInt < 1 || pageInt < 1) { - throw new InvalidRequestException("Size and page have to be positive integers."); + boolean deprecatedPaginationUsed = false; + if (routingContext.queryParams().get("offset") != null || routingContext.queryParams().get("limit") != null) { + deprecatedPaginationUsed = true; + } + pageRequest.setDeprecatedFormat(deprecatedPaginationUsed); + + if (deprecatedPaginationUsed) { + String offset = routingContext.queryParams().get("offset") == null ? "0" : routingContext.queryParams().get("offset"); + String limit = routingContext.queryParams().get("limit") == null ? "10" : routingContext.queryParams().get("limit"); + int offsetInt = Integer.parseInt(offset); + int limitInt = Integer.parseInt(limit); + pageRequest.setOffset(offsetInt); + pageRequest.setLimit(limitInt); + } else { + String size = routingContext.queryParams().get("size") == null ? "10" : routingContext.queryParams().get("size"); + String page = routingContext.queryParams().get("page") == null ? "1" : routingContext.queryParams().get("page"); + + int pageInt = Integer.parseInt(page); + int sizeInt = Integer.parseInt(size); + pageRequest.setPage(pageInt); + pageRequest.setSize(sizeInt); + + if (sizeInt < 1 || pageInt < 1) { + throw new InvalidRequestException("Size and page have to be positive integers."); + } } - Types.PageRequest pageRequest = new Types.PageRequest(); - pageRequest.setPage(pageInt); - pageRequest.setSize(sizeInt); return pageRequest; } diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/model/Types.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/model/Types.java index 56a72fc3..49a8684b 100644 --- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/model/Types.java +++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/model/Types.java @@ -219,9 +219,23 @@ public void setConfig(List config) { } public static class PageRequest { + private boolean deprecatedFormat; private Integer page; private Integer size; + @Deprecated + private Integer offset; + @Deprecated + private Integer limit; + + public boolean isDeprecatedFormat() { + return deprecatedFormat; + } + + public void setDeprecatedFormat(boolean deprecatedFormat) { + this.deprecatedFormat = deprecatedFormat; + } + public Integer getPage() { return page; } @@ -237,6 +251,22 @@ public Integer getSize() { public void setSize(Integer size) { this.size = size; } + + public Integer getOffset() { + return offset; + } + + public void setOffset(Integer offset) { + this.offset = offset; + } + + public Integer getLimit() { + return limit; + } + + public void setLimit(Integer limit) { + this.limit = limit; + } } public enum SortDirectionEnum { DESC, @@ -312,6 +342,9 @@ public static class PagedResponse { private Integer size; private Integer page; private Integer total; + // deprecated + private Integer offset; + private Integer limit; public List getItems() { return items; @@ -343,6 +376,24 @@ public Integer getTotal() { public void setTotal(Integer total) { this.total = total; } + + //deprecated + + public Integer getOffset() { + return offset; + } + + public void setOffset(Integer offset) { + this.offset = offset; + } + + public Integer getLimit() { + return limit; + } + + public void setLimit(Integer limit) { + this.limit = limit; + } } public static class ConsumerGroupList extends PagedResponse { diff --git a/kafka-admin/src/main/resources/openapi-specs/kafka-admin-rest.yaml b/kafka-admin/src/main/resources/openapi-specs/kafka-admin-rest.yaml index 7482c7a8..e8f6d2ca 100644 --- a/kafka-admin/src/main/resources/openapi-specs/kafka-admin-rest.yaml +++ b/kafka-admin/src/main/resources/openapi-specs/kafka-admin-rest.yaml @@ -15,6 +15,16 @@ paths: /topics: get: parameters: + - name: offset + description: The page offset + schema: + type: integer + in: query + - name: limit + description: Maximum number of topics to return + schema: + type: integer + in: query - name: size description: Maximum number of topics to return on single page schema: @@ -344,6 +354,16 @@ paths: summary: API endpoints for consumer groups under a Kafka topic get: parameters: + - name: offset + description: The page offset + schema: + type: integer + in: query + - name: limit + description: Maximum number of consumer groups to return + schema: + type: integer + in: query - name: size description: Maximum number of consumer groups to return on single page schema: @@ -541,19 +561,20 @@ components: value: '1' TopicsList: description: A list of topics. - required: - - items - - page - - size - - total type: object properties: - offset: + page: description: The page type: integer size: description: number of entries per page type: integer + offset: + description: Deprecated offset of the topic list + type: integer + limit: + description: Deprecated maximum of returned topics + type: integer total: description: Total number of topics type: integer @@ -727,11 +748,6 @@ components: logEndOffset: 5 ConsumerGroupList: description: A list of consumer groups - required: - - items - - total - - size - - offset type: object properties: items: @@ -748,6 +764,12 @@ components: page: description: The page type: integer + offset: + description: Deprecated offset of the topic list + type: integer + limit: + description: Deprecated maximum of returned topics + type: integer example: count: 1 limit: 10 From 250c990f567638e2975fa99786b152279768c5c1 Mon Sep 17 00:00:00 2001 From: Stanislav Knot Date: Fri, 18 Jun 2021 15:45:46 +0200 Subject: [PATCH 5/6] result list size Signed-off-by: Stanislav Knot --- .../bf2/admin/kafka/admin/ConsumerGroupOperations.java | 4 ++-- .../java/org/bf2/admin/kafka/admin/TopicOperations.java | 4 ++-- .../main/java/org/bf2/admin/kafka/admin/model/Types.java | 9 +++++++++ 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java index 3f967882..7bb53408 100644 --- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java +++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java @@ -106,6 +106,7 @@ public static void getGroupList(KafkaAdminClient ac, Promise prom, Pattern patte croppedList = list.subList(pageRequest.getOffset(), Math.min(pageRequest.getOffset() + tmpLimit, list.size())); response.setLimit(pageRequest.getLimit()); response.setOffset(pageRequest.getOffset()); + response.setCount(croppedList.size()); } else { if (list.size() > 0 && pageRequest.getSize() * (pageRequest.getPage() - 1) >= list.size()) { return Future.failedFuture(new InvalidRequestException("Requested pagination incorrect. Beginning of list greater than full list size (" + list.size() + ")")); @@ -113,11 +114,10 @@ public static void getGroupList(KafkaAdminClient ac, Promise prom, Pattern patte croppedList = list.subList((pageRequest.getPage() - 1) * pageRequest.getSize(), Math.min(pageRequest.getSize() * pageRequest.getPage(), list.size())); response.setSize(pageRequest.getSize()); response.setPage(pageRequest.getPage()); + response.setTotal(list.size()); } response.setItems(croppedList); - response.setTotal(list.size()); - return Future.succeededFuture(response); }) diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java index 2bfa912e..7478663c 100644 --- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java +++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java @@ -166,6 +166,7 @@ public static void getTopicList(KafkaAdminClient ac, Promise prom, Pattern patte croppedList = fullTopicDescriptions.subList(pageRequest.getOffset(), Math.min(pageRequest.getOffset() + tmpLimit, fullTopicDescriptions.size())); topicList.setOffset(pageRequest.getOffset()); topicList.setLimit(pageRequest.getLimit()); + topicList.setCount(croppedList.size()); } else { if (fullTopicDescriptions.size() > 0 && (pageRequest.getPage() - 1) * pageRequest.getSize() >= fullTopicDescriptions.size()) { return Future.failedFuture(new InvalidRequestException("Requested pagination incorrect. Beginning of list greater than full list size (" + fullTopicDescriptions.size() + ")")); @@ -173,11 +174,10 @@ public static void getTopicList(KafkaAdminClient ac, Promise prom, Pattern patte croppedList = fullTopicDescriptions.subList((pageRequest.getPage() - 1) * pageRequest.getSize(), Math.min(pageRequest.getPage() * pageRequest.getSize(), fullTopicDescriptions.size())); topicList.setPage(pageRequest.getPage()); topicList.setSize(pageRequest.getSize()); + topicList.setTotal(fullTopicDescriptions.size()); } - topicList.setItems(croppedList); - topicList.setTotal(fullTopicDescriptions.size()); return Future.succeededFuture(topicList); }).onComplete(finalRes -> { diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/model/Types.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/model/Types.java index 49a8684b..49424619 100644 --- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/model/Types.java +++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/model/Types.java @@ -345,6 +345,7 @@ public static class PagedResponse { // deprecated private Integer offset; private Integer limit; + private Integer count; public List getItems() { return items; @@ -394,6 +395,14 @@ public Integer getLimit() { public void setLimit(Integer limit) { this.limit = limit; } + + public Integer getCount() { + return count; + } + + public void setCount(Integer count) { + this.count = count; + } } public static class ConsumerGroupList extends PagedResponse { From 1c0d7c6ef2887465b6810e4046432ebfc2346ba4 Mon Sep 17 00:00:00 2001 From: Stanislav Knot Date: Fri, 18 Jun 2021 16:56:36 +0200 Subject: [PATCH 6/6] deprecated tests Signed-off-by: Stanislav Knot --- .../systemtest/plain/RestEndpointTestIT.java | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/systemtests/src/test/java/org/bf2/admin/kafka/systemtest/plain/RestEndpointTestIT.java b/systemtests/src/test/java/org/bf2/admin/kafka/systemtest/plain/RestEndpointTestIT.java index 6c23d4ea..4e479ef4 100644 --- a/systemtests/src/test/java/org/bf2/admin/kafka/systemtest/plain/RestEndpointTestIT.java +++ b/systemtests/src/test/java/org/bf2/admin/kafka/systemtest/plain/RestEndpointTestIT.java @@ -172,6 +172,67 @@ void testTopicListWithPagination(int page, Vertx vertx, VertxTestContext testCon assertThat(testContext.awaitCompletion(1, TimeUnit.MINUTES)).isTrue(); } + @ParameterizedTest(name = "testTopicListWithLimit-{0}") + @Execution(ExecutionMode.CONCURRENT) + @ValueSource(ints = {1, 2, 3, 5}) + void testTopicListWithLimit(int limit, Vertx vertx, VertxTestContext testContext, ExtensionContext extensionContext) throws Exception { + AdminClient kafkaClient = AdminClient.create(RequestUtils.getKafkaAdminConfig(DEPLOYMENT_MANAGER + .getKafkaContainer(extensionContext).getBootstrapServers())); + int publishedAdminPort = DEPLOYMENT_MANAGER.getAdminPort(extensionContext); + + List topics = new ArrayList<>(); + for (int i = 0; i < 3; i++) topics.add(new NewTopic(UUID.randomUUID().toString(), 1, (short) 1)); + kafkaClient.createTopics(topics); + DynamicWait.waitForTopicsExists(topics.stream().map(NewTopic::name).collect(Collectors.toList()), kafkaClient); + + HttpClient client = createHttpClient(vertx); + client.request(HttpMethod.GET, publishedAdminPort, "localhost", "/rest/topics?limit=" + limit) + .compose(req -> req.send().onSuccess(response -> { + if (response.statusCode() != ReturnCodes.SUCCESS.code) { + testContext.failNow("Status code not correct"); + } + assertStrictTransportSecurityDisabled(response, testContext); + }).onFailure(testContext::failNow).compose(HttpClientResponse::body)) + .onComplete(testContext.succeeding(buffer -> testContext.verify(() -> { + assertThat(testContext.failed()).isFalse(); + assertThat(MODEL_DESERIALIZER.getNames(buffer).size()).isEqualTo(Math.min(limit, 3)); + testContext.completeNow(); + }))); + assertThat(testContext.awaitCompletion(1, TimeUnit.MINUTES)).isTrue(); + } + + @ParameterizedTest(name = "testTopicListWithOffset-{0}") + @Execution(ExecutionMode.CONCURRENT) + @ValueSource(ints = {0, 1, 3, 4}) + void testTopicListWithOffset(int offset, Vertx vertx, VertxTestContext testContext, ExtensionContext extensionContext) throws Exception { + AdminClient kafkaClient = AdminClient.create(RequestUtils.getKafkaAdminConfig(DEPLOYMENT_MANAGER + .getKafkaContainer(extensionContext).getBootstrapServers())); + LOGGER.info("Display name: " + extensionContext.getDisplayName()); + int publishedAdminPort = DEPLOYMENT_MANAGER.getAdminPort(extensionContext); + List topics = new ArrayList<>(); + for (int i = 0; i < 3; i++) topics.add(new NewTopic(UUID.randomUUID().toString(), 1, (short) 1)); + kafkaClient.createTopics(topics); + DynamicWait.waitForTopicsExists(topics.stream().map(NewTopic::name).collect(Collectors.toList()), kafkaClient); + + HttpClient client = createHttpClient(vertx); + client.request(HttpMethod.GET, publishedAdminPort, "localhost", "/rest/topics?offset=" + offset) + .compose(req -> req.send().onSuccess(response -> { + if ((response.statusCode() != ReturnCodes.SUCCESS.code && offset != 4) + || (response.statusCode() != ReturnCodes.FAILED_REQUEST.code && offset == 4)) { + testContext.failNow("Status code not correct"); + } + assertStrictTransportSecurityDisabled(response, testContext); + }).onFailure(testContext::failNow).compose(HttpClientResponse::body)) + .onComplete(testContext.succeeding(buffer -> testContext.verify(() -> { + assertThat(testContext.failed()).isFalse(); + if (offset != 4) { + assertThat(MODEL_DESERIALIZER.getNames(buffer).size()).isEqualTo(3 - offset); + } + testContext.completeNow(); + }))); + assertThat(testContext.awaitCompletion(1, TimeUnit.MINUTES)).isTrue(); + } + @ParallelTest void testTopicListWithFilterNone(Vertx vertx, VertxTestContext testContext, ExtensionContext extensionContext) throws Exception { AdminClient kafkaClient = AdminClient.create(RequestUtils.getKafkaAdminConfig(DEPLOYMENT_MANAGER