-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Issue 13854][Broker]Fix call sync method in async rest api for internalExpireMessagesForA… #13905
Conversation
943f4d5
to
bd06261
Compare
bd06261
to
44451b6
Compare
@@ -3445,6 +3449,92 @@ private void internalExpireMessagesByTimestampForSinglePartition(String subName, | |||
} | |||
} | |||
|
|||
private CompletableFuture<Void> internalExpireMessagesByTimestampForSinglePartitionAsync(String subName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
conflict with #13880
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment in #13880
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liudezhi2098 Any difference between these two?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both PR's modify this method,warning: There may be conflict.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I think we'd better close the new one and apply the fix from the old one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I can rebase the code after #13880 merging to master to fix the conflict
} | ||
|
||
asyncResponse.resume(Response.noContent().build()); | ||
}).exceptionally(ex -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this exceptionally
in Line 1863?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This exceptionally
can handle getPartitionedTopicMetadataAsync
exceptions if topicName.isPartitioned()
return false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This
exceptionally
can handlegetPartitionedTopicMetadataAsync
exceptions iftopicName.isPartitioned()
return false
Yes, but exceptionally is also needed there for validateGlobalNamespaceOwnership
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update
|
||
asyncResponse.resume(Response.noContent().build()); | ||
return FutureUtil.waitForAll(futures).thenRun(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use exceptionally
here to handle the exceptions. No need for the exception
in Line 1876
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update
@@ -3445,6 +3449,92 @@ private void internalExpireMessagesByTimestampForSinglePartition(String subName, | |||
} | |||
} | |||
|
|||
private CompletableFuture<Void> internalExpireMessagesByTimestampForSinglePartitionAsync(String subName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liudezhi2098 Any difference between these two?
return future.thenCompose(__ -> { | ||
// If the topic name is a partition name, no need to get partition topic metadata again | ||
if (!topicName.isPartitioned()) { | ||
getPartitionedTopicMetadataAsync(topicName, authoritative, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, there miss return
log.error("[{}] {} {} {}", clientAppId(), msg, topicName, subName); | ||
throw new IllegalStateException(msg); | ||
} else { | ||
return doInternalExpireMessagesByTimestampForSinglePartitionAsync(subName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can return a CompletableFuture.completedFuture(null)
if this check is passed to avoid add another method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If (!topicName.isPartitioned()) && metadata.partitions == 0
, we should also doInternalExpireMessagesByTimestampForSinglePartitionAsync
44451b6
to
7f18dc6
Compare
} | ||
|
||
asyncResponse.resume(Response.noContent().build()); | ||
}).exceptionally(ex -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
.thenCompose(__ -> getTopicReferenceAsync(topicName)) | ||
.thenCompose(t -> { | ||
PersistentTopic topic = (PersistentTopic) t; | ||
final AtomicReference<Throwable> exception = new AtomicReference<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this line? You can get the exception from the futures
from line 1879 directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks the same as #13905 (comment)? I will update there code
@@ -3445,6 +3449,92 @@ private void internalExpireMessagesByTimestampForSinglePartition(String subName, | |||
} | |||
} | |||
|
|||
private CompletableFuture<Void> internalExpireMessagesByTimestampForSinglePartitionAsync(String subName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I think we'd better close the new one and apply the fix from the old one.
7f18dc6
to
4ead34d
Compare
/pulsarbot run-failure-checks |
Master Issue: #13854
Motivation
See Issue: #13854
Modifications
Verifying this change
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
no-need-doc