Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 5903] Support compact all partitions of a partitioned topic #6537

Merged
merged 2 commits into from
Mar 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1976,7 +1976,82 @@ private void internalExpireMessagesForSinglePartition(String subName, int expire
}
}

protected void internalTriggerCompaction(boolean authoritative) {
protected void internalTriggerCompaction(AsyncResponse asyncResponse, boolean authoritative) {
log.info("[{}] Trigger compaction on topic {}", clientAppId(), topicName);
try {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
} catch (Exception e) {
log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}

// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
try {
internalTriggerCompactionNonPartitionedTopic(authoritative);
} catch (Exception e) {
log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
asyncResponse.resume(Response.noContent().build());
} else {
getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();

for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics().triggerCompactionAsync(topicNamePartition.toString()));
} catch (Exception e) {
log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicNamePartition, e);
asyncResponse.resume(new RestException(e));
return;
}
}

FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable th = exception.getCause();
if (th instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, th.getMessage()));
return null;
} else if (th instanceof WebApplicationException) {
asyncResponse.resume(th);
return null;
} else {
log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, exception);
asyncResponse.resume(new RestException(exception));
return null;
}
}
asyncResponse.resume(Response.noContent().build());
return null;
});
} else {
try {
internalTriggerCompactionNonPartitionedTopic(authoritative);
} catch (Exception e) {
log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
asyncResponse.resume(Response.noContent().build());
}
}).exceptionally(ex -> {
log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}

protected void internalTriggerCompactionNonPartitionedTopic(boolean authoritative) {
validateWriteOperationOnTopic(authoritative);

PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
Expand All @@ -1985,6 +2060,7 @@ protected void internalTriggerCompaction(boolean authoritative) {
} catch (AlreadyRunningException e) {
throw new RestException(Status.CONFLICT, e.getMessage());
} catch (Exception e) {
log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, e);
throw new RestException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,11 +582,18 @@ public MessageId terminate(@PathParam("property") String property, @PathParam("c
@ApiResponse(code = 405, message = "Operation not allowed on persistent topic"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 409, message = "Compaction already running")})
public void compact(@PathParam("property") String property, @PathParam("cluster") String cluster,
public void compact(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
internalTriggerCompaction(authoritative);
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalTriggerCompaction(asyncResponse, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,7 @@ public MessageId terminate(
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public void compact(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
Expand All @@ -981,8 +982,14 @@ public void compact(
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
internalTriggerCompaction(authoritative);
try {
validateTopicName(tenant, namespace, encodedTopic);
internalTriggerCompaction(asyncResponse, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2234,6 +2234,49 @@ public void testTriggerCompaction() throws Exception {
verify(compactor, times(2)).compact(topicName);
}

@Test
public void testTriggerCompactionPartitionedTopic() throws Exception {
String topicName = "persistent://prop-xyz/ns1/test-part";
int numPartitions = 2;
admin.topics().createPartitionedTopic(topicName, numPartitions);

// create a partitioned topic by creating a producer
pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));

// mock actual compaction, we don't need to really run it
CompletableFuture<Long> promise = new CompletableFuture<>();
Compactor compactor = pulsar.getCompactor();
doReturn(promise).when(compactor).compact(topicName + "-partition-0");

CompletableFuture<Long> promise1 = new CompletableFuture<>();
doReturn(promise1).when(compactor).compact(topicName + "-partition-1");
admin.topics().triggerCompaction(topicName);

// verify compact called once by each partition topic
verify(compactor).compact(topicName + "-partition-0");
verify(compactor).compact(topicName + "-partition-1");
try {
admin.topics().triggerCompaction(topicName);

fail("Shouldn't be able to run while already running");
} catch (PulsarAdminException e) {
// expected
}
// compact shouldn't have been called again
verify(compactor).compact(topicName + "-partition-0");
verify(compactor).compact(topicName + "-partition-1");

// complete first compaction, and trigger again
promise.complete(1L);
promise1.complete(1L);
admin.topics().triggerCompaction(topicName);

// verify compact was called again
verify(compactor, times(2)).compact(topicName + "-partition-0");
verify(compactor, times(2)).compact(topicName + "-partition-1");
}

@Test
public void testCompactionStatus() throws Exception {
String topicName = "persistent://prop-xyz/ns1/topic1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,35 @@ public void testRevokePartitionedTopic() {
}
}

@Test
public void testTriggerCompactionTopic() {
final String partitionTopicName = "test-part";
final String nonPartitionTopicName = "test-non-part";

// trigger compaction on non-existing topic
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.compact(response, testTenant, testNamespace, "non-existing-topic", true);
ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());

// create non partitioned topic and compaction on it
response = mock(AsyncResponse.class);
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, nonPartitionTopicName, true);
persistentTopics.compact(response, testTenant, testNamespace, nonPartitionTopicName, true);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());

// create partitioned topic and compaction on it
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@murong00 thanks for the contribute, Would you please help add a test that really called into and execute the compact? Seems this is only the top level method call, and not called into the methods, and some of the code in PersistentTopicsBase.java seems not covered.
If there is no ut framework for this, we could add a integration test for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a unit test to cover the compact logic and just keep this top level method call test.

response = mock(AsyncResponse.class);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 2);
persistentTopics.compact(response, testTenant, testNamespace, partitionTopicName, true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
}

@Test()
public void testGetLastMessageId() throws Exception {
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,14 @@ void createSubscription(String topic, String subscriptionName, MessageId message
*/
void triggerCompaction(String topic) throws PulsarAdminException;

/**
* Trigger compaction to run for a topic asynchronously.
*
* @param topic
* The topic on which to trigger compaction
*/
CompletableFuture<Void> triggerCompactionAsync(String topic);

/**
* Check the status of an ongoing compaction for a topic.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -904,17 +904,26 @@ public void failed(Throwable throwable) {
}

@Override
public void triggerCompaction(String topic)
throws PulsarAdminException {
public void triggerCompaction(String topic) throws PulsarAdminException {
try {
TopicName tn = validateTopic(topic);
request(topicPath(tn, "compaction"))
.put(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
triggerCompactionAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Void> triggerCompactionAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "compaction");
return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}

@Override
public LongRunningProcessStatus compactionStatus(String topic)
throws PulsarAdminException {
Expand Down