Skip to content

Commit 1adf3fc

Browse files
author
Jiang Haiting
committed
Fix blocking check backlog quota in Producer Creation
1 parent fc8d50e commit 1adf3fc

File tree

5 files changed

+106
-75
lines changed

5 files changed

+106
-75
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
*/
1919
package org.apache.pulsar.broker.service;
2020

21+
import lombok.Getter;
2122
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
2223
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
2324
import org.apache.pulsar.common.api.proto.ServerError;
25+
import org.apache.pulsar.common.policies.data.BacklogQuota;
2426
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
2527
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
2628

@@ -191,6 +193,16 @@ public TopicPoliciesCacheNotInitException() {
191193
}
192194
}
193195

196+
public static class TopicBacklogQuotaExceededException extends BrokerServiceException {
197+
@Getter
198+
private final BacklogQuota.RetentionPolicy retentionPolicy;
199+
200+
public TopicBacklogQuotaExceededException(BacklogQuota.RetentionPolicy retentionPolicy) {
201+
super("Cannot create producer on topic with backlog quota exceeded");
202+
this.retentionPolicy = retentionPolicy;
203+
}
204+
}
205+
194206
public static org.apache.pulsar.common.api.proto.ServerError getClientErrorCode(Throwable t) {
195207
return getClientErrorCode(t, true);
196208
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 56 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@
132132
import org.apache.pulsar.common.naming.NamespaceName;
133133
import org.apache.pulsar.common.naming.TopicName;
134134
import org.apache.pulsar.common.policies.data.BacklogQuota;
135+
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
135136
import org.apache.pulsar.common.policies.data.NamespaceOperation;
136137
import org.apache.pulsar.common.policies.data.TopicOperation;
137138
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
@@ -1189,65 +1190,70 @@ protected void handleProducer(final CommandProducer cmdProducer) {
11891190
service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> {
11901191
// Before creating producer, check if backlog quota exceeded
11911192
// on topic for size based limit and time based limit
1192-
for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) {
1193-
if (topic.isBacklogQuotaExceeded(producerName, backlogQuotaType)) {
1194-
IllegalStateException illegalStateException = new IllegalStateException(
1195-
"Cannot create producer on topic with backlog quota exceeded");
1196-
BacklogQuota.RetentionPolicy retentionPolicy = topic
1197-
.getBacklogQuota(backlogQuotaType).getPolicy();
1198-
if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
1199-
commandSender.sendErrorResponse(requestId,
1200-
ServerError.ProducerBlockedQuotaExceededError,
1201-
illegalStateException.getMessage());
1202-
} else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
1203-
commandSender.sendErrorResponse(requestId,
1204-
ServerError.ProducerBlockedQuotaExceededException,
1205-
illegalStateException.getMessage());
1206-
}
1207-
producerFuture.completeExceptionally(illegalStateException);
1208-
producers.remove(producerId, producerFuture);
1209-
return;
1193+
CompletableFuture<Void> backlogQuotaCheckFuture = CompletableFuture.allOf(
1194+
topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.destination_storage),
1195+
topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.message_age));
1196+
backlogQuotaCheckFuture.exceptionally(throwable -> {
1197+
//throwable should be CompletionException holding TopicBacklogQuotaExceededException
1198+
BrokerServiceException.TopicBacklogQuotaExceededException exception =
1199+
(BrokerServiceException.TopicBacklogQuotaExceededException) throwable.getCause();
1200+
IllegalStateException illegalStateException = new IllegalStateException(exception);
1201+
BacklogQuota.RetentionPolicy retentionPolicy = exception.getRetentionPolicy();
1202+
if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
1203+
commandSender.sendErrorResponse(requestId,
1204+
ServerError.ProducerBlockedQuotaExceededError,
1205+
illegalStateException.getMessage());
1206+
} else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
1207+
commandSender.sendErrorResponse(requestId,
1208+
ServerError.ProducerBlockedQuotaExceededException,
1209+
illegalStateException.getMessage());
12101210
}
1211-
}
1212-
// Check whether the producer will publish encrypted messages or not
1213-
if ((topic.isEncryptionRequired() || encryptionRequireOnProducer) && !isEncrypted) {
1214-
String msg = String.format("Encryption is required in %s", topicName);
1215-
log.warn("[{}] {}", remoteAddress, msg);
1216-
commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);
1211+
producerFuture.completeExceptionally(illegalStateException);
12171212
producers.remove(producerId, producerFuture);
1218-
return;
1219-
}
1213+
return null;
1214+
});
12201215

1221-
disableTcpNoDelayIfNeeded(topicName.toString(), producerName);
1216+
backlogQuotaCheckFuture.thenRun(() -> {
1217+
// Check whether the producer will publish encrypted messages or not
1218+
if ((topic.isEncryptionRequired() || encryptionRequireOnProducer) && !isEncrypted) {
1219+
String msg = String.format("Encryption is required in %s", topicName);
1220+
log.warn("[{}] {}", remoteAddress, msg);
1221+
commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);
1222+
producers.remove(producerId, producerFuture);
1223+
return;
1224+
}
12221225

1223-
CompletableFuture<SchemaVersion> schemaVersionFuture = tryAddSchema(topic, schema);
1226+
disableTcpNoDelayIfNeeded(topicName.toString(), producerName);
12241227

1225-
schemaVersionFuture.exceptionally(exception -> {
1226-
String message = exception.getMessage();
1227-
if (exception.getCause() != null) {
1228-
message += (" caused by " + exception.getCause());
1229-
}
1230-
commandSender.sendErrorResponse(requestId,
1231-
BrokerServiceException.getClientErrorCode(exception),
1232-
message);
1233-
producers.remove(producerId, producerFuture);
1234-
return null;
1235-
});
1228+
CompletableFuture<SchemaVersion> schemaVersionFuture = tryAddSchema(topic, schema);
12361229

1237-
schemaVersionFuture.thenAccept(schemaVersion -> {
1238-
topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future -> {
1239-
buildProducerAndAddTopic(topic, producerId, producerName, requestId, isEncrypted,
1240-
metadata, schemaVersion, epoch, userProvidedProducerName, topicName,
1241-
producerAccessMode, topicEpoch, producerFuture);
1242-
}).exceptionally(exception -> {
1243-
Throwable cause = exception.getCause();
1244-
log.error("producerId {}, requestId {} : TransactionBuffer recover failed",
1245-
producerId, requestId, exception);
1230+
schemaVersionFuture.exceptionally(exception -> {
1231+
String message = exception.getMessage();
1232+
if (exception.getCause() != null) {
1233+
message += (" caused by " + exception.getCause());
1234+
}
12461235
commandSender.sendErrorResponse(requestId,
1247-
ServiceUnitNotReadyException.getClientErrorCode(cause),
1248-
cause.getMessage());
1236+
BrokerServiceException.getClientErrorCode(exception),
1237+
message);
1238+
producers.remove(producerId, producerFuture);
12491239
return null;
12501240
});
1241+
1242+
schemaVersionFuture.thenAccept(schemaVersion -> {
1243+
topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future -> {
1244+
buildProducerAndAddTopic(topic, producerId, producerName, requestId, isEncrypted,
1245+
metadata, schemaVersion, epoch, userProvidedProducerName, topicName,
1246+
producerAccessMode, topicEpoch, producerFuture);
1247+
}).exceptionally(exception -> {
1248+
Throwable cause = exception.getCause();
1249+
log.error("producerId {}, requestId {} : TransactionBuffer recover failed",
1250+
producerId, requestId, exception);
1251+
commandSender.sendErrorResponse(requestId,
1252+
ServiceUnitNotReadyException.getClientErrorCode(cause),
1253+
cause.getMessage());
1254+
return null;
1255+
});
1256+
});
12511257
});
12521258
}).exceptionally(exception -> {
12531259
Throwable cause = exception.getCause();

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init
194194

195195
CompletableFuture<Void> onPoliciesUpdate(Policies data);
196196

197-
boolean isBacklogQuotaExceeded(String producerName, BacklogQuota.BacklogQuotaType backlogQuotaType);
197+
CompletableFuture<Void> checkBacklogQuotaExceeded(String producerName, BacklogQuota.BacklogQuotaType backlogQuotaType);
198198

199199
boolean isEncryptionRequired();
200200

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -995,9 +995,9 @@ public BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaTy
995995
* @return quota exceeded status for blocking producer creation
996996
*/
997997
@Override
998-
public boolean isBacklogQuotaExceeded(String producerName, BacklogQuota.BacklogQuotaType backlogQuotaType) {
998+
public CompletableFuture<Void> checkBacklogQuotaExceeded(String producerName, BacklogQuota.BacklogQuotaType backlogQuotaType) {
999999
// No-op
1000-
return false;
1000+
return CompletableFuture.completedFuture(null);
10011001
}
10021002

10031003
@Override

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
8888
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
8989
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
90+
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBacklogQuotaExceededException;
9091
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
9192
import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
9293
import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException;
@@ -126,6 +127,7 @@
126127
import org.apache.pulsar.common.events.EventsTopicNames;
127128
import org.apache.pulsar.common.naming.TopicName;
128129
import org.apache.pulsar.common.policies.data.BacklogQuota;
130+
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
129131
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
130132
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
131133
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.LedgerInfo;
@@ -2463,7 +2465,7 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
24632465
* @return Backlog quota for topic
24642466
*/
24652467
@Override
2466-
public BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType) {
2468+
public BacklogQuota getBacklogQuota(BacklogQuotaType backlogQuotaType) {
24672469
TopicName topicName = TopicName.get(this.getName());
24682470
return brokerService.getBacklogQuotaManager().getBacklogQuota(topicName, backlogQuotaType);
24692471
}
@@ -2473,24 +2475,33 @@ public BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaTy
24732475
* @return quota exceeded status for blocking producer creation
24742476
*/
24752477
@Override
2476-
public boolean isBacklogQuotaExceeded(String producerName, BacklogQuota.BacklogQuotaType backlogQuotaType) {
2478+
public CompletableFuture<Void> checkBacklogQuotaExceeded(String producerName, BacklogQuotaType backlogQuotaType) {
24772479
BacklogQuota backlogQuota = getBacklogQuota(backlogQuotaType);
2478-
24792480
if (backlogQuota != null) {
24802481
BacklogQuota.RetentionPolicy retentionPolicy = backlogQuota.getPolicy();
2481-
24822482
if ((retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold
24832483
|| retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception)) {
2484-
if (backlogQuotaType == BacklogQuota.BacklogQuotaType.destination_storage && isSizeBacklogExceeded()
2485-
|| backlogQuotaType == BacklogQuota.BacklogQuotaType.message_age && isTimeBacklogExceeded()){
2486-
log.info("[{}] Backlog quota exceeded. Cannot create producer [{}]", this.getName(), producerName);
2487-
return true;
2484+
if (backlogQuotaType == BacklogQuotaType.destination_storage && isSizeBacklogExceeded()) {
2485+
log.info("[{}] Size backlog quota exceeded. Cannot create producer [{}]", this.getName(),
2486+
producerName);
2487+
return FutureUtil.failedFuture(new TopicBacklogQuotaExceededException(retentionPolicy));
2488+
}
2489+
if (backlogQuotaType == BacklogQuotaType.message_age) {
2490+
return checkTimeBacklogExceeded().thenCompose(isExceeded -> {
2491+
if (isExceeded) {
2492+
log.info("[{}] Time backlog quota exceeded. Cannot create producer [{}]", this.getName(),
2493+
producerName);
2494+
return FutureUtil.failedFuture(new TopicBacklogQuotaExceededException(retentionPolicy));
2495+
} else {
2496+
return CompletableFuture.completedFuture(null);
2497+
}
2498+
});
24882499
}
24892500
} else {
2490-
return false;
2501+
return CompletableFuture.completedFuture(null);
24912502
}
24922503
}
2493-
return false;
2504+
return CompletableFuture.completedFuture(null);
24942505
}
24952506

24962507
/**
@@ -2519,17 +2530,25 @@ public boolean isSizeBacklogExceeded() {
25192530
* @return determine if backlog quota enforcement needs to be done for topic based on time limit
25202531
*/
25212532
public boolean isTimeBacklogExceeded() {
2533+
try {
2534+
return checkTimeBacklogExceeded().get();
2535+
} catch (Throwable e) {
2536+
log.error("[{}] checkTimeBacklogExceeded failed.", topic, e);
2537+
return false;
2538+
}
2539+
}
2540+
public CompletableFuture<Boolean> checkTimeBacklogExceeded() {
25222541
TopicName topicName = TopicName.get(getName());
2523-
CompletableFuture<Boolean> future = new CompletableFuture<>();
25242542
int backlogQuotaLimitInSecond = brokerService.getBacklogQuotaManager().getBacklogQuotaLimitInTime(topicName);
25252543

25262544
// If backlog quota by time is not set and we have no durable cursor.
25272545
if (backlogQuotaLimitInSecond <= 0
25282546
|| ((ManagedCursorContainer) ledger.getCursors()).getSlowestReaderPosition() == null) {
2529-
return false;
2547+
return CompletableFuture.completedFuture(false);
25302548
}
25312549

25322550
if (brokerService.pulsar().getConfiguration().isPreciseTimeBasedBacklogQuotaCheck()) {
2551+
CompletableFuture<Boolean> future = new CompletableFuture<>();
25332552
// Check if first unconsumed message(first message after mark delete position)
25342553
// for slowest cursor's has expired.
25352554
PositionImpl position = ((ManagedLedgerImpl) ledger).getNextValidPosition(((ManagedCursorContainer)
@@ -2562,13 +2581,7 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
25622581
future.complete(false);
25632582
}
25642583
}, null);
2565-
2566-
try {
2567-
return future.get();
2568-
} catch (Exception e) {
2569-
log.error("[{}][{}] Error reading entry for precise time based backlog check", topicName, e);
2570-
return false;
2571-
}
2584+
return future;
25722585
} else {
25732586
Long ledgerId = ((ManagedCursorContainer) ledger.getCursors()).getSlowestReaderPosition().getLedgerId();
25742587
try {
@@ -2582,13 +2595,13 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
25822595
+ "slowest cursor currently on {}", backlogQuotaLimitInSecond * 1000,
25832596
((ManagedLedgerImpl) ledger).getClock().millis() - ledgerInfo.getTimestamp());
25842597
}
2585-
return true;
2598+
return CompletableFuture.completedFuture(true);
25862599
} else {
2587-
return false;
2600+
return CompletableFuture.completedFuture(false);
25882601
}
25892602
} catch (Exception e) {
25902603
log.error("[{}][{}] Error reading entry for precise time based backlog check", topicName, e);
2591-
return false;
2604+
return CompletableFuture.completedFuture(false);
25922605
}
25932606
}
25942607
}

0 commit comments

Comments
 (0)