Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure the deletion consistency of topic and schema #14608

Merged
merged 3 commits into from
Apr 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,12 @@ protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse
});
}

protected void internalDeleteTopicForcefully(boolean authoritative, boolean deleteSchema) {
protected void internalDeleteTopicForcefully(boolean authoritative) {
validateTopicOwnership(topicName, authoritative);
validateNamespaceOperation(topicName.getNamespaceObject(), NamespaceOperation.DELETE_TOPIC);

try {
pulsar().getBrokerService().deleteTopic(topicName.toString(), true, deleteSchema).get();
pulsar().getBrokerService().deleteTopic(topicName.toString(), true).get();
} catch (Exception e) {
if (isManagedLedgerNotFoundException(e)) {
log.info("[{}] Topic was already not existing {}", clientAppId(), topicName, e);
Expand Down Expand Up @@ -580,8 +580,9 @@ protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean author
return metadata;
}

protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative,
boolean force, boolean deleteSchema) {
protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse,
boolean authoritative,
boolean force) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateNamespaceOperationAsync(topicName.getNamespaceObject(),
NamespaceOperation.DELETE_TOPIC))
Expand All @@ -592,14 +593,6 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole
if (numPartitions < 1){
return CompletableFuture.completedFuture(null);
}
if (deleteSchema) {
return pulsar().getBrokerService()
.deleteSchemaStorage(topicName.getPartition(0).toString())
.thenCompose(unused ->
internalRemovePartitionsAuthenticationPoliciesAsync(numPartitions))
.thenCompose(unused2 ->
internalRemovePartitionsTopicAsync(numPartitions, force));
}
Comment on lines -595 to -602
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If remove these lines, how does the method remove the schema?

Copy link
Contributor Author

@yuruguo yuruguo Mar 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return internalRemovePartitionsAuthenticationPoliciesAsync(numPartitions)
.thenCompose(unused -> internalRemovePartitionsTopicAsync(numPartitions, force));
})
Expand Down Expand Up @@ -1015,20 +1008,20 @@ private void internalUnloadTransactionCoordinatorAsync(AsyncResponse asyncRespon
});
}

protected void internalDeleteTopic(boolean authoritative, boolean force, boolean deleteSchema) {
protected void internalDeleteTopic(boolean authoritative, boolean force) {
if (force) {
internalDeleteTopicForcefully(authoritative, deleteSchema);
internalDeleteTopicForcefully(authoritative);
} else {
internalDeleteTopic(authoritative, deleteSchema);
internalDeleteTopic(authoritative);
}
}

protected void internalDeleteTopic(boolean authoritative, boolean deleteSchema) {
protected void internalDeleteTopic(boolean authoritative) {
validateNamespaceOperation(topicName.getNamespaceObject(), NamespaceOperation.DELETE_TOPIC);
validateTopicOwnership(topicName, authoritative);

try {
pulsar().getBrokerService().deleteTopic(topicName.toString(), false, deleteSchema).get();
pulsar().getBrokerService().deleteTopic(topicName.toString(), false).get();
log.info("[{}] Successfully removed topic {}", clientAppId(), topicName);
} catch (Exception e) {
Throwable t = e.getCause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,10 @@ public void deletePartitionedTopic(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("force") @DefaultValue("false") boolean force,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("deleteSchema") @DefaultValue("false") boolean deleteSchema) {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalDeletePartitionedTopic(asyncResponse, authoritative, force, deleteSchema);
internalDeletePartitionedTopic(asyncResponse, authoritative, force);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
Expand Down Expand Up @@ -333,10 +332,9 @@ public void unloadTopic(@Suspended final AsyncResponse asyncResponse, @PathParam
public void deleteTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("force") @DefaultValue("false") boolean force,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("deleteSchema") @DefaultValue("false") boolean deleteSchema) {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
internalDeleteTopic(authoritative, force, deleteSchema);
internalDeleteTopic(authoritative, force);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -911,12 +911,10 @@ public void deletePartitionedTopic(
defaultValue = "false", type = "boolean")
@QueryParam("force") @DefaultValue("false") boolean force,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Delete the topic's schema storage")
@QueryParam("deleteSchema") @DefaultValue("false") boolean deleteSchema) {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validatePartitionedTopicName(tenant, namespace, encodedTopic);
internalDeletePartitionedTopic(asyncResponse, authoritative, force, deleteSchema);
internalDeletePartitionedTopic(asyncResponse, authoritative, force);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
Expand Down Expand Up @@ -979,11 +977,9 @@ public void deleteTopic(
defaultValue = "false", type = "boolean")
@QueryParam("force") @DefaultValue("false") boolean force,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Delete the topic's schema storage")
@QueryParam("deleteSchema") @DefaultValue("false") boolean deleteSchema) {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
internalDeleteTopic(authoritative, force, deleteSchema);
internalDeleteTopic(authoritative, force);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FieldParser;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -1013,32 +1012,12 @@ public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean c
}
}

public CompletableFuture<SchemaVersion> deleteSchemaStorage(String topic) {
Optional<Topic> optTopic = getTopicReference(topic);
if (optTopic.isPresent()) {
return optTopic.get().deleteSchema();
} else {
return CompletableFuture.completedFuture(null);
}
}

public CompletableFuture<Void> deleteTopic(String topic, boolean forceDelete) {
return deleteTopic(topic, forceDelete, false);
}

public CompletableFuture<Void> deleteTopic(String topic, boolean forceDelete, boolean deleteSchema) {
Optional<Topic> optTopic = getTopicReference(topic);
if (optTopic.isPresent()) {
Topic t = optTopic.get();
if (forceDelete) {
if (deleteSchema) {
return t.deleteSchema().thenCompose(schemaVersion -> {
log.info("Successfully delete topic {}'s schema of version {}", t.getName(), schemaVersion);
return t.deleteForcefully();
});
} else {
return t.deleteForcefully();
}
return t.deleteForcefully();
}

// v2 topics have a global name so check if the topic is replicated.
Expand All @@ -1050,14 +1029,7 @@ public CompletableFuture<Void> deleteTopic(String topic, boolean forceDelete, bo
new IllegalStateException("Delete forbidden topic is replicated on clusters " + clusters));
}

if (deleteSchema) {
return t.deleteSchema().thenCompose(schemaVersion -> {
log.info("Successfully delete topic {}'s schema of version {}", t.getName(), schemaVersion);
return t.delete();
});
Comment on lines -1054 to -1057
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If remove these lines how do we delete the schema?

Copy link
Contributor Author

@yuruguo yuruguo Mar 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t.delete() will delete schema by default.

details: delete() -> delete(boolean,boolean,boolean) -> deleteSchema()

} else {
return t.delete();
}
return t.delete();
}

if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ public CompletableFuture<Subscription> createSubscription(String subscriptionNam

@Override
public CompletableFuture<Void> delete() {
return delete(false, false, false);
return delete(false, false);
}

/**
Expand All @@ -380,11 +380,10 @@ public CompletableFuture<Void> delete() {
*/
@Override
public CompletableFuture<Void> deleteForcefully() {
return delete(false, true, false);
return delete(false, true);
}

private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean closeIfClientsConnected,
boolean deleteSchema) {
private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean closeIfClientsConnected) {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();

lock.writeLock().lock();
Expand Down Expand Up @@ -429,9 +428,8 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean c
} else {
subscriptions.forEach((s, sub) -> futures.add(sub.delete()));
}
if (deleteSchema) {
futures.add(deleteSchema().thenApply(schemaVersion -> null));
}

futures.add(deleteSchema().thenApply(schemaVersion -> null));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens to topics with multiple versions fo schema? Will all versions be cleaned?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will delete all the versions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, it's a little confusing for using persistent schema storage for a non-persistent topic, after the broker restart and the producer/consumer will not connect to a non-persistent topic, the non-persistent topic is equivalent to being deleted, In some cases that users using a random name for non-persistent topic, after the application restarts, the topic name will be changes, looks like here will introduce a schema resource leak.

futures.add(deleteTopicPolicies());
FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
if (ex != null) {
Expand Down Expand Up @@ -920,7 +918,7 @@ public void checkGC() {
maxInactiveDurationInSec);
}

stopReplProducers().thenCompose(v -> delete(true, false, true))
stopReplProducers().thenCompose(v -> delete(true, false))
.thenCompose(__ -> tryToDeletePartitionedMetadata())
.thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", topic))
.exceptionally(e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1094,11 +1094,6 @@ public CompletableFuture<Void> delete() {
return delete(false, false, false);
}

private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
boolean failIfHasBacklogs, boolean deleteSchema) {
return delete(failIfHasSubscriptions, failIfHasBacklogs, false, deleteSchema);
}

/**
* Forcefully close all producers/consumers/replicators and deletes the topic. this function is used when local
* cluster is removed from global-namespace replication list. Because broker doesn't allow lookup if local cluster
Expand All @@ -1108,7 +1103,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
*/
@Override
public CompletableFuture<Void> deleteForcefully() {
return delete(false, false, true, false);
return delete(false, false, true);
}

/**
Expand All @@ -1122,16 +1117,13 @@ public CompletableFuture<Void> deleteForcefully() {
* Flag indicate whether explicitly close connected
* producers/consumers/replicators before trying to delete topic.
* If any client is connected to a topic and if this flag is disable then this operation fails.
* @param deleteSchema
* Flag indicating whether delete the schema defined for topic if exist.
*
* @return Completable future indicating completion of delete operation Completed exceptionally with:
* IllegalStateException if topic is still active ManagedLedgerException if ledger delete operation fails
*/
private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
boolean failIfHasBacklogs,
boolean closeIfClientsConnected,
boolean deleteSchema) {
boolean closeIfClientsConnected) {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();

lock.writeLock().lock();
Expand Down Expand Up @@ -1173,8 +1165,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>();
brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);

deleteTopicAuthenticationFuture.thenCompose(
__ -> deleteSchema ? deleteSchema() : CompletableFuture.completedFuture(null))
deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema())
.thenAccept(__ -> deleteTopicPolicies())
.thenCompose(__ -> transactionBufferCleanupAndClose())
.whenComplete((v, ex) -> {
Expand Down Expand Up @@ -2245,7 +2236,7 @@ public void checkGC() {
}

replCloseFuture.thenCompose(v -> delete(deleteMode == InactiveTopicDeleteMode.delete_when_no_subscriptions,
deleteMode == InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, true))
deleteMode == InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, false))
.thenApply((res) -> tryToDeletePartitionedMetadata())
.thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", topic))
.exceptionally(e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public void testGetSubscriptions() {

// 9) Delete the partitioned topic
response = mock(AsyncResponse.class);
persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, true, true, false);
persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, true, true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
Expand Down Expand Up @@ -577,7 +577,7 @@ public void testUnloadTopic() {

// 4) delete partitioned topic
response = mock(AsyncResponse.class);
persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, partitionTopicName, true, true, false);
persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, partitionTopicName, true, true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
Expand Down Expand Up @@ -1040,7 +1040,7 @@ public void testSetReplicatedSubscriptionStatus() {

// 10) Delete the partitioned topic
response = mock(AsyncResponse.class);
persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, topicName, true, true, false);
persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, topicName, true, true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(10000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
Expand Down Expand Up @@ -1231,23 +1231,23 @@ public void testDeleteTopic() throws Exception {
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, false, null);
CompletableFuture<Void> deleteTopicFuture = new CompletableFuture<>();
deleteTopicFuture.completeExceptionally(new MetadataStoreException.NotFoundException());
doReturn(deleteTopicFuture).when(brokerService).deleteTopic(anyString(), anyBoolean(), anyBoolean());
persistentTopics.deleteTopic(testTenant, testNamespace, topicName, true, true, true);
doReturn(deleteTopicFuture).when(brokerService).deleteTopic(anyString(), anyBoolean());
persistentTopics.deleteTopic(testTenant, testNamespace, topicName, true, true);
//
CompletableFuture<Void> deleteTopicFuture2 = new CompletableFuture<>();
deleteTopicFuture2.completeExceptionally(new MetadataStoreException("test exception"));
doReturn(deleteTopicFuture2).when(brokerService).deleteTopic(anyString(), anyBoolean(), anyBoolean());
doReturn(deleteTopicFuture2).when(brokerService).deleteTopic(anyString(), anyBoolean());
try {
persistentTopics.deleteTopic(testTenant, testNamespace, topicName, true, true, true);
persistentTopics.deleteTopic(testTenant, testNamespace, topicName, true, true);
} catch (Exception e) {
Assert.assertTrue(e instanceof RestException);
}
//
CompletableFuture<Void> deleteTopicFuture3 = new CompletableFuture<>();
deleteTopicFuture3.completeExceptionally(new MetadataStoreException.NotFoundException());
doReturn(deleteTopicFuture3).when(brokerService).deleteTopic(anyString(), anyBoolean(), anyBoolean());
doReturn(deleteTopicFuture3).when(brokerService).deleteTopic(anyString(), anyBoolean());
try {
persistentTopics.deleteTopic(testTenant, testNamespace, topicName, false, true, true);
persistentTopics.deleteTopic(testTenant, testNamespace, topicName, false, true);
} catch (RestException e) {
Assert.assertEquals(e.getResponse().getStatus(), 404);
}
Expand Down
Loading