@@ -1209,25 +1209,6 @@ protected void handleProducer(final CommandProducer cmdProducer) {
1209
1209
CompletableFuture <Void > backlogQuotaCheckFuture = CompletableFuture .allOf (
1210
1210
topic .checkBacklogQuotaExceeded (producerName , BacklogQuotaType .destination_storage ),
1211
1211
topic .checkBacklogQuotaExceeded (producerName , BacklogQuotaType .message_age ));
1212
- backlogQuotaCheckFuture .exceptionally (throwable -> {
1213
- //throwable should be CompletionException holding TopicBacklogQuotaExceededException
1214
- BrokerServiceException .TopicBacklogQuotaExceededException exception =
1215
- (BrokerServiceException .TopicBacklogQuotaExceededException ) throwable .getCause ();
1216
- IllegalStateException illegalStateException = new IllegalStateException (exception );
1217
- BacklogQuota .RetentionPolicy retentionPolicy = exception .getRetentionPolicy ();
1218
- if (retentionPolicy == BacklogQuota .RetentionPolicy .producer_request_hold ) {
1219
- commandSender .sendErrorResponse (requestId ,
1220
- ServerError .ProducerBlockedQuotaExceededError ,
1221
- illegalStateException .getMessage ());
1222
- } else if (retentionPolicy == BacklogQuota .RetentionPolicy .producer_exception ) {
1223
- commandSender .sendErrorResponse (requestId ,
1224
- ServerError .ProducerBlockedQuotaExceededException ,
1225
- illegalStateException .getMessage ());
1226
- }
1227
- producerFuture .completeExceptionally (illegalStateException );
1228
- producers .remove (producerId , producerFuture );
1229
- return null ;
1230
- });
1231
1212
1232
1213
backlogQuotaCheckFuture .thenRun (() -> {
1233
1214
// Check whether the producer will publish encrypted messages or not
@@ -1274,6 +1255,25 @@ protected void handleProducer(final CommandProducer cmdProducer) {
1274
1255
return backlogQuotaCheckFuture ;
1275
1256
}).exceptionally (exception -> {
1276
1257
Throwable cause = exception .getCause ();
1258
+ if (cause instanceof BrokerServiceException .TopicBacklogQuotaExceededException ) {
1259
+ BrokerServiceException .TopicBacklogQuotaExceededException tbqe =
1260
+ (BrokerServiceException .TopicBacklogQuotaExceededException ) cause ;
1261
+ IllegalStateException illegalStateException = new IllegalStateException (tbqe );
1262
+ BacklogQuota .RetentionPolicy retentionPolicy = tbqe .getRetentionPolicy ();
1263
+ if (retentionPolicy == BacklogQuota .RetentionPolicy .producer_request_hold ) {
1264
+ commandSender .sendErrorResponse (requestId ,
1265
+ ServerError .ProducerBlockedQuotaExceededError ,
1266
+ illegalStateException .getMessage ());
1267
+ } else if (retentionPolicy == BacklogQuota .RetentionPolicy .producer_exception ) {
1268
+ commandSender .sendErrorResponse (requestId ,
1269
+ ServerError .ProducerBlockedQuotaExceededException ,
1270
+ illegalStateException .getMessage ());
1271
+ }
1272
+ producerFuture .completeExceptionally (illegalStateException );
1273
+ producers .remove (producerId , producerFuture );
1274
+ return null ;
1275
+ }
1276
+
1277
1277
if (cause instanceof NoSuchElementException ) {
1278
1278
cause = new TopicNotFoundException ("Topic Not Found." );
1279
1279
}
0 commit comments