Skip to content

Commit 3c6aae3

Browse files
[ServerCnx] Only reply to client when completing producerFuture (#13949)
### Motivation We should only send the error response to the client when the code is able to complete the `producerFuture`. This logic is described here: https://github.com/apache/pulsar/blob/2285d02aa9957af7877b9d3d3c628a750d813ca7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1286-L1293 Edit: in a previous version of this motivation section, I attributed the current behavior to #12874. That PR did not introduce this behavior, though. ### Modifications * Move the response to the client into a conditional block that only runs when this section of the code is able to complete the future.
1 parent 7ea2448 commit 3c6aae3

File tree

1 file changed

+10
-9
lines changed
  • pulsar-broker/src/main/java/org/apache/pulsar/broker/service

1 file changed

+10
-9
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1259,16 +1259,17 @@ protected void handleProducer(final CommandProducer cmdProducer) {
12591259
(BrokerServiceException.TopicBacklogQuotaExceededException) cause;
12601260
IllegalStateException illegalStateException = new IllegalStateException(tbqe);
12611261
BacklogQuota.RetentionPolicy retentionPolicy = tbqe.getRetentionPolicy();
1262-
if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
1263-
commandSender.sendErrorResponse(requestId,
1264-
ServerError.ProducerBlockedQuotaExceededError,
1265-
illegalStateException.getMessage());
1266-
} else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
1267-
commandSender.sendErrorResponse(requestId,
1268-
ServerError.ProducerBlockedQuotaExceededException,
1269-
illegalStateException.getMessage());
1262+
if (producerFuture.completeExceptionally(illegalStateException)) {
1263+
if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
1264+
commandSender.sendErrorResponse(requestId,
1265+
ServerError.ProducerBlockedQuotaExceededError,
1266+
illegalStateException.getMessage());
1267+
} else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
1268+
commandSender.sendErrorResponse(requestId,
1269+
ServerError.ProducerBlockedQuotaExceededException,
1270+
illegalStateException.getMessage());
1271+
}
12701272
}
1271-
producerFuture.completeExceptionally(illegalStateException);
12721273
producers.remove(producerId, producerFuture);
12731274
return null;
12741275
}

0 commit comments

Comments
 (0)