diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index c5dabb40523ab..bb5e579187b08 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1976,7 +1976,82 @@ private void internalExpireMessagesForSinglePartition(String subName, int expire } } - protected void internalTriggerCompaction(boolean authoritative) { + protected void internalTriggerCompaction(AsyncResponse asyncResponse, boolean authoritative) { + log.info("[{}] Trigger compaction on topic {}", clientAppId(), topicName); + try { + if (topicName.isGlobal()) { + validateGlobalNamespaceOwnership(namespaceName); + } + } catch (Exception e) { + log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, e); + resumeAsyncResponseExceptionally(asyncResponse, e); + return; + } + + // If the topic name is a partition name, no need to get partition topic metadata again + if (topicName.isPartitioned()) { + try { + internalTriggerCompactionNonPartitionedTopic(authoritative); + } catch (Exception e) { + log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, e); + resumeAsyncResponseExceptionally(asyncResponse, e); + return; + } + asyncResponse.resume(Response.noContent().build()); + } else { + getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> { + final int numPartitions = partitionMetadata.partitions; + if (numPartitions > 0) { + final List> futures = Lists.newArrayList(); + + for (int i = 0; i < numPartitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + futures.add(pulsar().getAdminClient().topics().triggerCompactionAsync(topicNamePartition.toString())); + } catch (Exception e) { + log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicNamePartition, e); + asyncResponse.resume(new RestException(e)); + return; + } + } + + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable th = exception.getCause(); + if (th instanceof NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, th.getMessage())); + return null; + } else if (th instanceof WebApplicationException) { + asyncResponse.resume(th); + return null; + } else { + log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, exception); + asyncResponse.resume(new RestException(exception)); + return null; + } + } + asyncResponse.resume(Response.noContent().build()); + return null; + }); + } else { + try { + internalTriggerCompactionNonPartitionedTopic(authoritative); + } catch (Exception e) { + log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, e); + resumeAsyncResponseExceptionally(asyncResponse, e); + return; + } + asyncResponse.resume(Response.noContent().build()); + } + }).exceptionally(ex -> { + log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + } + + protected void internalTriggerCompactionNonPartitionedTopic(boolean authoritative) { validateWriteOperationOnTopic(authoritative); PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); @@ -1985,6 +2060,7 @@ protected void internalTriggerCompaction(boolean authoritative) { } catch (AlreadyRunningException e) { throw new RestException(Status.CONFLICT, e.getMessage()); } catch (Exception e) { + log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, e); throw new RestException(e); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index 7fca8797bacb8..7cf93b7f0e754 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -582,11 +582,18 @@ public MessageId terminate(@PathParam("property") String property, @PathParam("c @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 409, message = "Compaction already running")}) - public void compact(@PathParam("property") String property, @PathParam("cluster") String cluster, + public void compact(@Suspended final AsyncResponse asyncResponse, + @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateTopicName(property, cluster, namespace, encodedTopic); - internalTriggerCompaction(authoritative); + try { + validateTopicName(property, cluster, namespace, encodedTopic); + internalTriggerCompaction(asyncResponse, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 471a2af5dee57..8a6efdc437626 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -973,6 +973,7 @@ public MessageId terminate( @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration") }) public void compact( + @Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Specify the tenant", required = true) @PathParam("tenant") String tenant, @ApiParam(value = "Specify the namespace", required = true) @@ -981,8 +982,14 @@ public void compact( @PathParam("topic") @Encoded String encodedTopic, @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateTopicName(tenant, namespace, encodedTopic); - internalTriggerCompaction(authoritative); + try { + validateTopicName(tenant, namespace, encodedTopic); + internalTriggerCompaction(asyncResponse, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } } @GET diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index c5bb62af53629..15e23fd1a4eb9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -2234,6 +2234,49 @@ public void testTriggerCompaction() throws Exception { verify(compactor, times(2)).compact(topicName); } + @Test + public void testTriggerCompactionPartitionedTopic() throws Exception { + String topicName = "persistent://prop-xyz/ns1/test-part"; + int numPartitions = 2; + admin.topics().createPartitionedTopic(topicName, numPartitions); + + // create a partitioned topic by creating a producer + pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close(); + assertNotNull(pulsar.getBrokerService().getTopicReference(topicName)); + + // mock actual compaction, we don't need to really run it + CompletableFuture promise = new CompletableFuture<>(); + Compactor compactor = pulsar.getCompactor(); + doReturn(promise).when(compactor).compact(topicName + "-partition-0"); + + CompletableFuture promise1 = new CompletableFuture<>(); + doReturn(promise1).when(compactor).compact(topicName + "-partition-1"); + admin.topics().triggerCompaction(topicName); + + // verify compact called once by each partition topic + verify(compactor).compact(topicName + "-partition-0"); + verify(compactor).compact(topicName + "-partition-1"); + try { + admin.topics().triggerCompaction(topicName); + + fail("Shouldn't be able to run while already running"); + } catch (PulsarAdminException e) { + // expected + } + // compact shouldn't have been called again + verify(compactor).compact(topicName + "-partition-0"); + verify(compactor).compact(topicName + "-partition-1"); + + // complete first compaction, and trigger again + promise.complete(1L); + promise1.complete(1L); + admin.topics().triggerCompaction(topicName); + + // verify compact was called again + verify(compactor, times(2)).compact(topicName + "-partition-0"); + verify(compactor, times(2)).compact(topicName + "-partition-1"); + } + @Test public void testCompactionStatus() throws Exception { String topicName = "persistent://prop-xyz/ns1/topic1"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index fe1ad87a7136d..2d2d0cfa2b097 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -443,6 +443,35 @@ public void testRevokePartitionedTopic() { } } + @Test + public void testTriggerCompactionTopic() { + final String partitionTopicName = "test-part"; + final String nonPartitionTopicName = "test-non-part"; + + // trigger compaction on non-existing topic + AsyncResponse response = mock(AsyncResponse.class); + persistentTopics.compact(response, testTenant, testNamespace, "non-existing-topic", true); + ArgumentCaptor errCaptor = ArgumentCaptor.forClass(RestException.class); + verify(response, timeout(5000).times(1)).resume(errCaptor.capture()); + Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode()); + + // create non partitioned topic and compaction on it + response = mock(AsyncResponse.class); + persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, nonPartitionTopicName, true); + persistentTopics.compact(response, testTenant, testNamespace, nonPartitionTopicName, true); + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + // create partitioned topic and compaction on it + response = mock(AsyncResponse.class); + persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 2); + persistentTopics.compact(response, testTenant, testNamespace, partitionTopicName, true); + responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + } + @Test() public void testGetLastMessageId() throws Exception { TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java index 0a004628de6b0..1b2f3fc735ab2 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -1069,6 +1069,14 @@ void createSubscription(String topic, String subscriptionName, MessageId message */ void triggerCompaction(String topic) throws PulsarAdminException; + /** + * Trigger compaction to run for a topic asynchronously. + * + * @param topic + * The topic on which to trigger compaction + */ + CompletableFuture triggerCompactionAsync(String topic); + /** * Check the status of an ongoing compaction for a topic. * diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 3a1e6851fe21e..aebdbb2e2c8cf 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -904,17 +904,26 @@ public void failed(Throwable throwable) { } @Override - public void triggerCompaction(String topic) - throws PulsarAdminException { + public void triggerCompaction(String topic) throws PulsarAdminException { try { - TopicName tn = validateTopic(topic); - request(topicPath(tn, "compaction")) - .put(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); - } catch (Exception e) { - throw getApiException(e); + triggerCompactionAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); } } + @Override + public CompletableFuture triggerCompactionAsync(String topic) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "compaction"); + return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + @Override public LongRunningProcessStatus compactionStatus(String topic) throws PulsarAdminException {