Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][transaction] Fix potentially unfinishable future. #15208

Merged
merged 1 commit into from
Apr 19, 2022
Merged

[fix][transaction] Fix potentially unfinishable future. #15208

merged 1 commit into from
Apr 19, 2022

Conversation

mattisonchao
Copy link
Member

Motivation

The relative code is as below:

public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tcId) {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
internalPinnedExecutor.execute(() -> {
if (stores.get(tcId) != null) {
completableFuture.complete(null);
} else {
pulsarService.getBrokerService().checkTopicNsOwnership(TopicName
.TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId()).toString())
.thenRun(() -> internalPinnedExecutor.execute(() -> {
final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
.computeIfAbsent(tcId.getId(), (id) -> new Semaphore(1));
Deque<CompletableFuture<Void>> deque = pendingConnectRequests
.computeIfAbsent(tcId.getId(), (id) -> new ConcurrentLinkedDeque<>());
if (tcLoadSemaphore.tryAcquire()) {
// when tcLoadSemaphore.release(), this command will acquire semaphore,
// so we should jude the store exist again.
if (stores.get(tcId) != null) {
completableFuture.complete(null);
tcLoadSemaphore.release();
return;
}
openTransactionMetadataStore(tcId).thenAccept((store) -> internalPinnedExecutor.execute(() -> {
stores.put(tcId, store);
LOG.info("Added new transaction meta store {}", tcId);
long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
while (true) {
// prevent thread in a busy loop.
if (System.currentTimeMillis() < endTime) {
CompletableFuture<Void> future = deque.poll();
if (future != null) {
// complete queue request future
future.complete(null);
} else {
break;
}
} else {
deque.clear();
break;
}
}
completableFuture.complete(null);
tcLoadSemaphore.release();
})).exceptionally(e -> {
internalPinnedExecutor.execute(() -> {
completableFuture.completeExceptionally(e.getCause());
// release before handle request queue,
//in order to client reconnect infinite loop
tcLoadSemaphore.release();
long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
while (true) {
// prevent thread in a busy loop.
if (System.currentTimeMillis() < endTime) {
CompletableFuture<Void> future = deque.poll();
if (future != null) {
// this means that this tc client connection connect fail
future.completeExceptionally(e);
} else {
break;
}
} else {
deque.clear();
break;
}
}
LOG.error("Add transaction metadata store with id {} error", tcId.getId(), e);
});
return null;
});
} else {
// only one command can open transaction metadata store,
// other will be added to the deque, when the op of openTransactionMetadataStore finished
// then handle the requests witch in the queue
deque.add(completableFuture);
if (LOG.isDebugEnabled()) {
LOG.debug("Handle tc client connect added into pending queue! tcId : {}", tcId.toString());
}
}
}));
}
});
return completableFuture;
}

When checkTopicNsOwnership throws an exception, it will cause the completableFuture never complete.

This PR is just to fix this unfinishable future issue, after that, I will try to refactor this part of the code to make it not so complex.

Modifications

  • Add checkTopicNsOwnership exception catch.

Verifying this change

  • Make sure that the change passes the CI checks.

Documentation

  • no-need-doc

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Apr 19, 2022
@eolivelli eolivelli merged commit 6aaabdb into apache:master Apr 19, 2022
@mattisonchao mattisonchao deleted the txn_fix_unfinished_future branch April 19, 2022 06:47
Nicklee007 pushed a commit to Nicklee007/pulsar that referenced this pull request Apr 20, 2022
codelipenghui pushed a commit that referenced this pull request Apr 28, 2022
@codelipenghui codelipenghui added this to the 2.11.0 milestone Apr 28, 2022
codelipenghui pushed a commit that referenced this pull request Apr 29, 2022
@codelipenghui codelipenghui added the cherry-picked/branch-2.9 Archived: 2.9 is end of life label Apr 29, 2022
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request May 9, 2022
(cherry picked from commit 6aaabdb)
(cherry picked from commit 84a0894)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants