Skip to content

Commit

Permalink
[fix] [broker] Fix race-condition causing repeated delete topic (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode authored Oct 29, 2024
1 parent 266e705 commit 7b80f01
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,13 @@ private static ManagedLedgerInfo updateMLInfoTimestamp(ManagedLedgerInfo info) {
}

private static MetaStoreException getException(Throwable t) {
if (t.getCause() instanceof MetadataStoreException.BadVersionException) {
return new ManagedLedgerException.BadVersionException(t.getMessage());
Throwable actEx = FutureUtil.unwrapCompletionException(t);
if (actEx instanceof MetadataStoreException.BadVersionException badVersionException) {
return new ManagedLedgerException.BadVersionException(badVersionException);
} else if (actEx instanceof MetaStoreException metaStoreException){
return metaStoreException;
} else {
return new MetaStoreException(t);
return new MetaStoreException(actEx);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1465,6 +1465,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
this.closeFutures =
new CloseFutures(new CompletableFuture(), new CompletableFuture(), new CompletableFuture());

AtomicBoolean alreadyUnFenced = new AtomicBoolean();
CompletableFuture<Void> res = getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), () -> {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
Expand All @@ -1488,6 +1489,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
}
}).exceptionally(ex -> {
log.error("[{}] Error closing clients", topic, ex);
alreadyUnFenced.set(true);
unfenceTopicToResume();
closeClientFuture.completeExceptionally(ex);
return null;
Expand All @@ -1503,6 +1505,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
.whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(ex);
} else {
Expand All @@ -1512,6 +1515,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
if (e != null) {
log.error("[{}] Error deleting topic", topic, e);
alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(e);
} else {
Expand Down Expand Up @@ -1542,6 +1546,7 @@ public void deleteLedgerComplete(Object ctx) {
} else {
log.error("[{}] Error deleting topic",
topic, exception);
alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(
new PersistenceException(exception));
Expand All @@ -1554,6 +1559,7 @@ public void deleteLedgerComplete(Object ctx) {
}
});
}).exceptionally(ex->{
alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(
new TopicBusyException("Failed to close clients before deleting topic.",
Expand All @@ -1565,7 +1571,9 @@ public void deleteLedgerComplete(Object ctx) {
}).whenComplete((value, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
unfenceTopicToResume();
if (!alreadyUnFenced.get()) {
unfenceTopicToResume();
}
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "flaky")
public class SameAuthParamsLookupAutoClusterFailoverTest extends OneWayReplicatorTestBase {

public void setup() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "flaky")
@Test(groups = "broker")
public class DisabledCreateTopicToRemoteClusterForReplicationTest extends OneWayReplicatorTestBase {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "flaky")
@Test(groups = "broker")
public class OneWayReplicatorTest extends OneWayReplicatorTestBase {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "flaky")
@Test(groups = "broker")
public class OneWayReplicatorUsingGlobalZKTest extends OneWayReplicatorTest {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "flaky")
@Test(groups = "broker")
public class ReplicationTxnTest extends OneWayReplicatorTestBase {

private boolean transactionBufferSegmentedSnapshotEnabled = false;
Expand Down

0 comments on commit 7b80f01

Please sign in to comment.