diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java index 9652d9a1..7bb53408 100644 --- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java +++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/ConsumerGroupOperations.java @@ -32,7 +32,7 @@ public class ConsumerGroupOperations { protected static final Logger log = LogManager.getLogger(ConsumerGroupOperations.class); - public static void getGroupList(KafkaAdminClient ac, Promise prom, Pattern pattern, int offset, final int limit, final String groupIdPrefix, Types.OrderByInput orderByInput) { + public static void getGroupList(KafkaAdminClient ac, Promise prom, Pattern pattern, Types.PageRequest pageRequest, final String groupIdPrefix, Types.OrderByInput orderByInput) { Promise> listConsumerGroupsFuture = Promise.promise(); ac.listConsumerGroups(listConsumerGroupsFuture); @@ -87,28 +87,37 @@ public static void getGroupList(KafkaAdminClient ac, Promise prom, Pattern patte .filter(i -> i != null) .collect(Collectors.toList()); - if (offset > list.size()) { - return Future.failedFuture(new InvalidRequestException("Offset (" + offset + ") cannot be greater than consumer group list size (" + list.size() + ")")); - } - if (Types.SortDirectionEnum.DESC.equals(orderByInput.getOrder())) { list.sort(new CommonHandler.ConsumerGroupComparator(orderByInput.getField()).reversed()); } else { list.sort(new CommonHandler.ConsumerGroupComparator(orderByInput.getField())); } - int tmpLimit = limit; - if (tmpLimit == 0) { - tmpLimit = list.size(); + Types.ConsumerGroupList response = new Types.ConsumerGroupList(); + List croppedList; + if (pageRequest.isDeprecatedFormat()) { + if (pageRequest.getOffset() > list.size()) { + return Future.failedFuture(new InvalidRequestException("Offset (" + pageRequest.getOffset() + ") cannot be greater than consumer group list size (" + list.size() + ")")); + } + int tmpLimit = pageRequest.getLimit(); + if (tmpLimit == 0) { + tmpLimit = list.size(); + } + croppedList = list.subList(pageRequest.getOffset(), Math.min(pageRequest.getOffset() + tmpLimit, list.size())); + response.setLimit(pageRequest.getLimit()); + response.setOffset(pageRequest.getOffset()); + response.setCount(croppedList.size()); + } else { + if (list.size() > 0 && pageRequest.getSize() * (pageRequest.getPage() - 1) >= list.size()) { + return Future.failedFuture(new InvalidRequestException("Requested pagination incorrect. Beginning of list greater than full list size (" + list.size() + ")")); + } + croppedList = list.subList((pageRequest.getPage() - 1) * pageRequest.getSize(), Math.min(pageRequest.getSize() * pageRequest.getPage(), list.size())); + response.setSize(pageRequest.getSize()); + response.setPage(pageRequest.getPage()); + response.setTotal(list.size()); } - List croppedList = list.subList(offset, Math.min(offset + tmpLimit, list.size())); - - Types.ConsumerGroupList response = new Types.ConsumerGroupList(); response.setItems(croppedList); - response.setCount(croppedList.size()); - response.setLimit(tmpLimit); - response.setOffset(offset); return Future.succeededFuture(response); }) diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java index a0e2c8f0..7478663c 100644 --- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java +++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/TopicOperations.java @@ -111,7 +111,7 @@ private static Promise getTopicDescAndConf(KafkaAdminClient ac, Str return result; } - public static void getTopicList(KafkaAdminClient ac, Promise prom, Pattern pattern, int offset, final int limit, Types.OrderByInput orderByInput) { + public static void getTopicList(KafkaAdminClient ac, Promise prom, Pattern pattern, Types.PageRequest pageRequest, Types.OrderByInput orderByInput) { Promise> describeTopicsNamesPromise = Promise.promise(); Promise> describeTopicsPromise = Promise.promise(); Promise> describeTopicConfigPromise = Promise.promise(); @@ -151,21 +151,34 @@ public static void getTopicList(KafkaAdminClient ac, Promise prom, Pattern patte fullTopicDescriptions.sort(new CommonHandler.TopicComparator(orderByInput.getField())); } - if (offset > fullTopicDescriptions.size()) { - return Future.failedFuture(new InvalidRequestException("Offset (" + offset + ") cannot be greater than topic list size (" + fullTopicDescriptions.size() + ")")); - } - int tmpLimit = limit; - if (tmpLimit == 0) { - tmpLimit = fullTopicDescriptions.size(); - } - - List croppedList = fullTopicDescriptions.subList(offset, Math.min(offset + tmpLimit, fullTopicDescriptions.size())); Types.TopicList topicList = new Types.TopicList(); + List croppedList; + if (pageRequest.isDeprecatedFormat()) { + // deprecated + if (pageRequest.getOffset() > fullTopicDescriptions.size()) { + return Future.failedFuture(new InvalidRequestException("Offset (" + pageRequest.getOffset() + ") cannot be greater than topic list size (" + fullTopicDescriptions.size() + ")")); + } + int tmpLimit = pageRequest.getLimit(); + if (tmpLimit == 0) { + tmpLimit = fullTopicDescriptions.size(); + } + croppedList = fullTopicDescriptions.subList(pageRequest.getOffset(), Math.min(pageRequest.getOffset() + tmpLimit, fullTopicDescriptions.size())); + topicList.setOffset(pageRequest.getOffset()); + topicList.setLimit(pageRequest.getLimit()); + topicList.setCount(croppedList.size()); + } else { + if (fullTopicDescriptions.size() > 0 && (pageRequest.getPage() - 1) * pageRequest.getSize() >= fullTopicDescriptions.size()) { + return Future.failedFuture(new InvalidRequestException("Requested pagination incorrect. Beginning of list greater than full list size (" + fullTopicDescriptions.size() + ")")); + } + croppedList = fullTopicDescriptions.subList((pageRequest.getPage() - 1) * pageRequest.getSize(), Math.min(pageRequest.getPage() * pageRequest.getSize(), fullTopicDescriptions.size())); + topicList.setPage(pageRequest.getPage()); + topicList.setSize(pageRequest.getSize()); + topicList.setTotal(fullTopicDescriptions.size()); + } + topicList.setItems(croppedList); - topicList.setCount(croppedList.size()); - topicList.setLimit(tmpLimit); - topicList.setOffset(offset); + return Future.succeededFuture(topicList); }).onComplete(finalRes -> { if (finalRes.failed()) { diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/RestOperations.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/RestOperations.java index c315bb88..a098cacb 100644 --- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/RestOperations.java +++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/handlers/RestOperations.java @@ -215,8 +215,6 @@ public void listTopics(RoutingContext routingContext) { httpMetrics.getListTopicsCounter().increment(); httpMetrics.getRequestsCounter().increment(); String filter = routingContext.queryParams().get("filter"); - String limit = routingContext.queryParams().get("limit") == null ? "0" : routingContext.queryParams().get("limit"); - String offset = routingContext.queryParams().get("offset") == null ? "0" : routingContext.queryParams().get("offset"); Types.SortDirectionEnum sortReverse = Types.SortDirectionEnum.fromString(routingContext.queryParams().get("order")); String sortKey = routingContext.queryParams().get("orderKey") == null ? "name" : routingContext.queryParams().get("orderKey"); Types.OrderByInput orderBy = new Types.OrderByInput(); @@ -235,13 +233,11 @@ public void listTopics(RoutingContext routingContext) { prom.fail(ac.cause()); } else { try { - if (Integer.parseInt(offset) < 0 || Integer.parseInt(limit) < 0) { - throw new InvalidRequestException("Offset and limit have to be positive integers."); - } - TopicOperations.getTopicList(ac.result(), prom, pattern, Integer.parseInt(offset), Integer.parseInt(limit), orderBy); + TopicOperations.getTopicList(ac.result(), prom, pattern, parsePageRequest(routingContext), orderBy); } catch (NumberFormatException | InvalidRequestException e) { prom.fail(e); processResponse(prom, routingContext, HttpResponseStatus.BAD_REQUEST, httpMetrics, httpMetrics.getListTopicRequestTimer(), requestTimerSample); + return; } } processResponse(prom, routingContext, HttpResponseStatus.OK, httpMetrics, httpMetrics.getListTopicRequestTimer(), requestTimerSample); @@ -256,8 +252,6 @@ public void listGroups(RoutingContext routingContext) { httpMetrics.getRequestsCounter().increment(); String topicFilter = routingContext.queryParams().get("topic"); String consumerGroupIdFilter = routingContext.queryParams().get("group-id-filter") == null ? "" : routingContext.queryParams().get("group-id-filter"); - String limit = routingContext.queryParams().get("limit") == null ? "0" : routingContext.queryParams().get("limit"); - String offset = routingContext.queryParams().get("offset") == null ? "0" : routingContext.queryParams().get("offset"); Types.SortDirectionEnum sortReverse = Types.SortDirectionEnum.fromString(routingContext.queryParams().get("order")); String sortKey = routingContext.queryParams().get("orderKey") == null ? "name" : routingContext.queryParams().get("orderKey"); @@ -278,13 +272,11 @@ public void listGroups(RoutingContext routingContext) { prom.fail(ac.cause()); } else { try { - if (Integer.parseInt(offset) < 0 || Integer.parseInt(limit) < 0) { - throw new InvalidRequestException("Offset and limit have to be positive integers."); - } - ConsumerGroupOperations.getGroupList(ac.result(), prom, pattern, Integer.parseInt(offset), Integer.parseInt(limit), consumerGroupIdFilter, orderBy); + ConsumerGroupOperations.getGroupList(ac.result(), prom, pattern, parsePageRequest(routingContext), consumerGroupIdFilter, orderBy); } catch (NumberFormatException | InvalidRequestException e) { prom.fail(e); processResponse(prom, routingContext, HttpResponseStatus.BAD_REQUEST, httpMetrics, httpMetrics.getListGroupsRequestTimer(), requestTimerSample); + return; } } processResponse(prom, routingContext, HttpResponseStatus.OK, httpMetrics, httpMetrics.getListGroupsRequestTimer(), requestTimerSample); @@ -414,4 +406,37 @@ public void errorHandler(RoutingContext routingContext) { prom.fail(routingContext.failure()); processResponse(prom, routingContext, HttpResponseStatus.OK, httpMetrics, httpMetrics.getOpenApiRequestTimer(), requestTimerSample); } + + private Types.PageRequest parsePageRequest(RoutingContext routingContext) { + Types.PageRequest pageRequest = new Types.PageRequest(); + + boolean deprecatedPaginationUsed = false; + if (routingContext.queryParams().get("offset") != null || routingContext.queryParams().get("limit") != null) { + deprecatedPaginationUsed = true; + } + pageRequest.setDeprecatedFormat(deprecatedPaginationUsed); + + if (deprecatedPaginationUsed) { + String offset = routingContext.queryParams().get("offset") == null ? "0" : routingContext.queryParams().get("offset"); + String limit = routingContext.queryParams().get("limit") == null ? "10" : routingContext.queryParams().get("limit"); + int offsetInt = Integer.parseInt(offset); + int limitInt = Integer.parseInt(limit); + pageRequest.setOffset(offsetInt); + pageRequest.setLimit(limitInt); + } else { + String size = routingContext.queryParams().get("size") == null ? "10" : routingContext.queryParams().get("size"); + String page = routingContext.queryParams().get("page") == null ? "1" : routingContext.queryParams().get("page"); + + int pageInt = Integer.parseInt(page); + int sizeInt = Integer.parseInt(size); + pageRequest.setPage(pageInt); + pageRequest.setSize(sizeInt); + + if (sizeInt < 1 || pageInt < 1) { + throw new InvalidRequestException("Size and page have to be positive integers."); + } + } + + return pageRequest; + } } diff --git a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/model/Types.java b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/model/Types.java index f9ee1964..49424619 100644 --- a/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/model/Types.java +++ b/kafka-admin/src/main/java/org/bf2/admin/kafka/admin/model/Types.java @@ -123,45 +123,6 @@ public int compareTo(Topic topic) { } } - public static class TopicList { - private List items; - private Integer offset; - private Integer limit; - private Integer count; - - public List getItems() { - return items; - } - - public void setItems(List items) { - this.items = items; - } - - public Integer getOffset() { - return offset; - } - - public void setOffset(Integer offset) { - this.offset = offset; - } - - public Integer getLimit() { - return limit; - } - - public void setLimit(Integer limit) { - this.limit = limit; - } - - public Integer getCount() { - return count; - } - - public void setCount(Integer count) { - this.count = count; - } - } - public static class NewTopicConfigEntry { private String key; private String value; @@ -258,15 +219,37 @@ public void setConfig(List config) { } public static class PageRequest { - private Integer limit; + private boolean deprecatedFormat; + private Integer page; + private Integer size; + + @Deprecated private Integer offset; + @Deprecated + private Integer limit; - public Integer getLimit() { - return limit; + public boolean isDeprecatedFormat() { + return deprecatedFormat; } - public void setLimit(Integer limit) { - this.limit = limit; + public void setDeprecatedFormat(boolean deprecatedFormat) { + this.deprecatedFormat = deprecatedFormat; + } + + public Integer getPage() { + return page; + } + + public void setPage(Integer page) { + this.page = page; + } + + public Integer getSize() { + return size; + } + + public void setSize(Integer size) { + this.size = size; } public Integer getOffset() { @@ -276,6 +259,14 @@ public Integer getOffset() { public void setOffset(Integer offset) { this.offset = offset; } + + public Integer getLimit() { + return limit; + } + + public void setLimit(Integer limit) { + this.limit = limit; + } } public enum SortDirectionEnum { DESC, @@ -346,38 +337,80 @@ public void setState(String state) { } } - public static class ConsumerGroupList { - private List items; + public static class PagedResponse { + private List items; + private Integer size; + private Integer page; + private Integer total; + // deprecated private Integer offset; private Integer limit; private Integer count; - public List getItems() { + public List getItems() { return items; } - public void setItems(List items) { + public void setItems(List items) { this.items = items; } + + public Integer getSize() { + return size; + } + + public void setSize(Integer size) { + this.size = size; + } + + public Integer getPage() { + return page; + } + + public void setPage(Integer page) { + this.page = page; + } + + public Integer getTotal() { + return total; + } + + public void setTotal(Integer total) { + this.total = total; + } + + //deprecated + public Integer getOffset() { return offset; } + public void setOffset(Integer offset) { this.offset = offset; } + public Integer getLimit() { return limit; } + public void setLimit(Integer limit) { this.limit = limit; } + public Integer getCount() { return count; } + public void setCount(Integer count) { this.count = count; } } + public static class ConsumerGroupList extends PagedResponse { + } + + public static class TopicList extends PagedResponse { + } + public static class Consumer { private String memberId; private String groupId; diff --git a/kafka-admin/src/main/resources/openapi-specs/kafka-admin-rest.yaml b/kafka-admin/src/main/resources/openapi-specs/kafka-admin-rest.yaml index edd636f0..e8f6d2ca 100644 --- a/kafka-admin/src/main/resources/openapi-specs/kafka-admin-rest.yaml +++ b/kafka-admin/src/main/resources/openapi-specs/kafka-admin-rest.yaml @@ -15,8 +15,18 @@ paths: /topics: get: parameters: + - name: offset + description: The page offset + schema: + type: integer + in: query - name: limit description: Maximum number of topics to return + schema: + type: integer + in: query + - name: size + description: Maximum number of topics to return on single page schema: format: int32 type: integer @@ -27,8 +37,8 @@ paths: schema: type: string in: query - - name: offset - description: The page offset when returning the limit of requested topics. + - name: page + description: The page when returning the limit of requested topics. schema: format: int32 type: integer @@ -344,13 +354,23 @@ paths: summary: API endpoints for consumer groups under a Kafka topic get: parameters: + - name: offset + description: The page offset + schema: + type: integer + in: query - name: limit - description: Maximum number of consumer groups to returnd + description: Maximum number of consumer groups to return schema: type: integer in: query - - name: offset - description: The page offset when returning the list of consumer groups + - name: size + description: Maximum number of consumer groups to return on single page + schema: + type: integer + in: query + - name: page + description: The page when returning the list of consumer groups schema: type: integer in: query @@ -541,20 +561,21 @@ components: value: '1' TopicsList: description: A list of topics. - required: - - items - - offset - - limit - - count type: object properties: + page: + description: The page + type: integer + size: + description: number of entries per page + type: integer offset: - description: The page offset + description: Deprecated offset of the topic list type: integer limit: - description: number of entries per page + description: Deprecated maximum of returned topics type: integer - count: + total: description: Total number of topics type: integer items: @@ -727,11 +748,6 @@ components: logEndOffset: 5 ConsumerGroupList: description: A list of consumer groups - required: - - items - - count - - limit - - offset type: object properties: items: @@ -739,14 +755,20 @@ components: type: array items: $ref: '#/components/schemas/ConsumerGroup' - count: + total: description: The total number of consumer groups. type: number - limit: + size: description: The number of consumer groups per page. type: number + page: + description: The page + type: integer offset: - description: The page offset + description: Deprecated offset of the topic list + type: integer + limit: + description: Deprecated maximum of returned topics type: integer example: count: 1 diff --git a/systemtests/src/test/java/org/bf2/admin/kafka/systemtest/plain/RestEndpointTestIT.java b/systemtests/src/test/java/org/bf2/admin/kafka/systemtest/plain/RestEndpointTestIT.java index fb1b6cfe..4e479ef4 100644 --- a/systemtests/src/test/java/org/bf2/admin/kafka/systemtest/plain/RestEndpointTestIT.java +++ b/systemtests/src/test/java/org/bf2/admin/kafka/systemtest/plain/RestEndpointTestIT.java @@ -133,27 +133,40 @@ void testTopicListWithFilter(Vertx vertx, VertxTestContext testContext, Extensio assertThat(testContext.awaitCompletion(1, TimeUnit.MINUTES)).isTrue(); } - @ParallelTest - void testTopicListWithFilterNone(Vertx vertx, VertxTestContext testContext, ExtensionContext extensionContext) throws Exception { + @ParameterizedTest(name = "testTopicListWithPagination-{0}") + @Execution(ExecutionMode.CONCURRENT) + @ValueSource(ints = {1, 2, 3}) + void testTopicListWithPagination(int page, Vertx vertx, VertxTestContext testContext, ExtensionContext extensionContext) throws Exception { AdminClient kafkaClient = AdminClient.create(RequestUtils.getKafkaAdminConfig(DEPLOYMENT_MANAGER .getKafkaContainer(extensionContext).getBootstrapServers())); int publishedAdminPort = DEPLOYMENT_MANAGER.getAdminPort(extensionContext); List topics = new ArrayList<>(); - for (int i = 0; i < 2; i++) topics.add(new NewTopic(UUID.randomUUID().toString(), 1, (short) 1)); + for (int i = 0; i < 5; i++) topics.add(new NewTopic(UUID.randomUUID().toString(), 1, (short) 1)); kafkaClient.createTopics(topics); DynamicWait.waitForTopicsExists(topics.stream().map(NewTopic::name).collect(Collectors.toList()), kafkaClient); HttpClient client = createHttpClient(vertx); - client.request(HttpMethod.GET, publishedAdminPort, "localhost", "/rest/topics?filter=zcfsada.*") + client.request(HttpMethod.GET, publishedAdminPort, "localhost", "/rest/topics?size=3&page=" + page) .compose(req -> req.send().onSuccess(response -> { - if (response.statusCode() != ReturnCodes.SUCCESS.code) { + // we want to get page 3 of 5 topics. The page size is 3, so we have just 2 pages + if (page == 3) { + if (response.statusCode() != ReturnCodes.FAILED_REQUEST.code) { + testContext.failNow("Status code not correct"); + } + } else if (response.statusCode() != ReturnCodes.SUCCESS.code) { testContext.failNow("Status code not correct"); } assertStrictTransportSecurityDisabled(response, testContext); }).onFailure(testContext::failNow).compose(HttpClientResponse::body)) .onComplete(testContext.succeeding(buffer -> testContext.verify(() -> { assertThat(testContext.failed()).isFalse(); - assertThat(MODEL_DESERIALIZER.getNames(buffer).size()).isEqualTo(0); + // 5 topic, size of page is 3. First page should have 3 topic, second page just 2 + if (page == 1) { + assertThat(MODEL_DESERIALIZER.getNames(buffer).size()).isEqualTo(3); + } + if (page == 2) { + assertThat(MODEL_DESERIALIZER.getNames(buffer).size()).isEqualTo(2); + } testContext.completeNow(); }))); assertThat(testContext.awaitCompletion(1, TimeUnit.MINUTES)).isTrue(); @@ -220,6 +233,92 @@ void testTopicListWithOffset(int offset, Vertx vertx, VertxTestContext testConte assertThat(testContext.awaitCompletion(1, TimeUnit.MINUTES)).isTrue(); } + @ParallelTest + void testTopicListWithFilterNone(Vertx vertx, VertxTestContext testContext, ExtensionContext extensionContext) throws Exception { + AdminClient kafkaClient = AdminClient.create(RequestUtils.getKafkaAdminConfig(DEPLOYMENT_MANAGER + .getKafkaContainer(extensionContext).getBootstrapServers())); + int publishedAdminPort = DEPLOYMENT_MANAGER.getAdminPort(extensionContext); + List topics = new ArrayList<>(); + for (int i = 0; i < 2; i++) topics.add(new NewTopic(UUID.randomUUID().toString(), 1, (short) 1)); + kafkaClient.createTopics(topics); + DynamicWait.waitForTopicsExists(topics.stream().map(NewTopic::name).collect(Collectors.toList()), kafkaClient); + + HttpClient client = createHttpClient(vertx); + client.request(HttpMethod.GET, publishedAdminPort, "localhost", "/rest/topics?filter=zcfsada.*") + .compose(req -> req.send().onSuccess(response -> { + if (response.statusCode() != ReturnCodes.SUCCESS.code) { + testContext.failNow("Status code not correct"); + } + assertStrictTransportSecurityDisabled(response, testContext); + }).onFailure(testContext::failNow).compose(HttpClientResponse::body)) + .onComplete(testContext.succeeding(buffer -> testContext.verify(() -> { + assertThat(testContext.failed()).isFalse(); + assertThat(MODEL_DESERIALIZER.getNames(buffer).size()).isEqualTo(0); + testContext.completeNow(); + }))); + assertThat(testContext.awaitCompletion(1, TimeUnit.MINUTES)).isTrue(); + } + + @ParameterizedTest(name = "testTopicListWithSize-{0}") + @Execution(ExecutionMode.CONCURRENT) + @ValueSource(ints = {1, 2, 3, 5}) + void testTopicListWithSize(int size, Vertx vertx, VertxTestContext testContext, ExtensionContext extensionContext) throws Exception { + AdminClient kafkaClient = AdminClient.create(RequestUtils.getKafkaAdminConfig(DEPLOYMENT_MANAGER + .getKafkaContainer(extensionContext).getBootstrapServers())); + int publishedAdminPort = DEPLOYMENT_MANAGER.getAdminPort(extensionContext); + + List topics = new ArrayList<>(); + for (int i = 0; i < 3; i++) topics.add(new NewTopic(UUID.randomUUID().toString(), 1, (short) 1)); + kafkaClient.createTopics(topics); + DynamicWait.waitForTopicsExists(topics.stream().map(NewTopic::name).collect(Collectors.toList()), kafkaClient); + + HttpClient client = createHttpClient(vertx); + client.request(HttpMethod.GET, publishedAdminPort, "localhost", "/rest/topics?size=" + size) + .compose(req -> req.send().onSuccess(response -> { + if (response.statusCode() != ReturnCodes.SUCCESS.code) { + testContext.failNow("Status code not correct"); + } + assertStrictTransportSecurityDisabled(response, testContext); + }).onFailure(testContext::failNow).compose(HttpClientResponse::body)) + .onComplete(testContext.succeeding(buffer -> testContext.verify(() -> { + assertThat(testContext.failed()).isFalse(); + assertThat(MODEL_DESERIALIZER.getNames(buffer).size()).isEqualTo(Math.min(size, 3)); + testContext.completeNow(); + }))); + assertThat(testContext.awaitCompletion(1, TimeUnit.MINUTES)).isTrue(); + } + + @ParameterizedTest(name = "testTopicListWithPage-{0}") + @Execution(ExecutionMode.CONCURRENT) + @ValueSource(ints = {1, 3, 4}) + void testTopicListWithPage(int page, Vertx vertx, VertxTestContext testContext, ExtensionContext extensionContext) throws Exception { + AdminClient kafkaClient = AdminClient.create(RequestUtils.getKafkaAdminConfig(DEPLOYMENT_MANAGER + .getKafkaContainer(extensionContext).getBootstrapServers())); + int publishedAdminPort = DEPLOYMENT_MANAGER.getAdminPort(extensionContext); + List topics = new ArrayList<>(); + for (int i = 0; i < 3; i++) topics.add(new NewTopic(UUID.randomUUID().toString(), 1, (short) 1)); + kafkaClient.createTopics(topics); + DynamicWait.waitForTopicsExists(topics.stream().map(NewTopic::name).collect(Collectors.toList()), kafkaClient); + + HttpClient client = createHttpClient(vertx); + client.request(HttpMethod.GET, publishedAdminPort, "localhost", "/rest/topics?page=" + page) + .compose(req -> req.send().onSuccess(response -> { + if ((response.statusCode() != ReturnCodes.SUCCESS.code && page == 1) + || (response.statusCode() != ReturnCodes.FAILED_REQUEST.code && page != 1)) { + testContext.failNow("Status code not correct"); + } + assertStrictTransportSecurityDisabled(response, testContext); + }).onFailure(testContext::failNow).compose(HttpClientResponse::body)) + .onComplete(testContext.succeeding(buffer -> testContext.verify(() -> { + assertThat(testContext.failed()).isFalse(); + if (page == 1) { + assertThat(MODEL_DESERIALIZER.getNames(buffer).size()).isEqualTo(3); + } + testContext.completeNow(); + }))); + assertThat(testContext.awaitCompletion(1, TimeUnit.MINUTES)).isTrue(); + } + @ParallelTest void testDescribeSingleTopic(Vertx vertx, VertxTestContext testContext, ExtensionContext extensionContext) throws Exception { AdminClient kafkaClient = AdminClient.create(RequestUtils.getKafkaAdminConfig(DEPLOYMENT_MANAGER