Skip to content

Commit

Permalink
Fix blocking check backlog quota in Producer Creation
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason918 committed Nov 22, 2021
1 parent 793a8b9 commit b918092
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
*/
package org.apache.pulsar.broker.service;

import lombok.Getter;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;

Expand Down Expand Up @@ -191,6 +193,16 @@ public TopicPoliciesCacheNotInitException() {
}
}

public static class TopicBacklogQuotaExceededException extends BrokerServiceException {
@Getter
private final BacklogQuota.RetentionPolicy retentionPolicy;

public TopicBacklogQuotaExceededException(BacklogQuota.RetentionPolicy retentionPolicy) {
super("Cannot create producer on topic with backlog quota exceeded");
this.retentionPolicy = retentionPolicy;
}
}

public static org.apache.pulsar.common.api.proto.ServerError getClientErrorCode(Throwable t) {
return getClientErrorCode(t, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
Expand Down Expand Up @@ -1189,65 +1190,70 @@ protected void handleProducer(final CommandProducer cmdProducer) {
service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> {
// Before creating producer, check if backlog quota exceeded
// on topic for size based limit and time based limit
for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) {
if (topic.isBacklogQuotaExceeded(producerName, backlogQuotaType)) {
IllegalStateException illegalStateException = new IllegalStateException(
"Cannot create producer on topic with backlog quota exceeded");
BacklogQuota.RetentionPolicy retentionPolicy = topic
.getBacklogQuota(backlogQuotaType).getPolicy();
if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
commandSender.sendErrorResponse(requestId,
ServerError.ProducerBlockedQuotaExceededError,
illegalStateException.getMessage());
} else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
commandSender.sendErrorResponse(requestId,
ServerError.ProducerBlockedQuotaExceededException,
illegalStateException.getMessage());
}
producerFuture.completeExceptionally(illegalStateException);
producers.remove(producerId, producerFuture);
return;
CompletableFuture<Void> backlogQuotaCheckFuture = CompletableFuture.allOf(
topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.destination_storage),
topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.message_age));
backlogQuotaCheckFuture.exceptionally(throwable -> {
//throwable should be CompletionException holding TopicBacklogQuotaExceededException
BrokerServiceException.TopicBacklogQuotaExceededException exception =
(BrokerServiceException.TopicBacklogQuotaExceededException) throwable.getCause();
IllegalStateException illegalStateException = new IllegalStateException(exception);
BacklogQuota.RetentionPolicy retentionPolicy = exception.getRetentionPolicy();
if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
commandSender.sendErrorResponse(requestId,
ServerError.ProducerBlockedQuotaExceededError,
illegalStateException.getMessage());
} else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
commandSender.sendErrorResponse(requestId,
ServerError.ProducerBlockedQuotaExceededException,
illegalStateException.getMessage());
}
}
// Check whether the producer will publish encrypted messages or not
if ((topic.isEncryptionRequired() || encryptionRequireOnProducer) && !isEncrypted) {
String msg = String.format("Encryption is required in %s", topicName);
log.warn("[{}] {}", remoteAddress, msg);
commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);
producerFuture.completeExceptionally(illegalStateException);
producers.remove(producerId, producerFuture);
return;
}
return null;
});

disableTcpNoDelayIfNeeded(topicName.toString(), producerName);
backlogQuotaCheckFuture.thenRun(() -> {
// Check whether the producer will publish encrypted messages or not
if ((topic.isEncryptionRequired() || encryptionRequireOnProducer) && !isEncrypted) {
String msg = String.format("Encryption is required in %s", topicName);
log.warn("[{}] {}", remoteAddress, msg);
commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);
producers.remove(producerId, producerFuture);
return;
}

CompletableFuture<SchemaVersion> schemaVersionFuture = tryAddSchema(topic, schema);
disableTcpNoDelayIfNeeded(topicName.toString(), producerName);

schemaVersionFuture.exceptionally(exception -> {
String message = exception.getMessage();
if (exception.getCause() != null) {
message += (" caused by " + exception.getCause());
}
commandSender.sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(exception),
message);
producers.remove(producerId, producerFuture);
return null;
});
CompletableFuture<SchemaVersion> schemaVersionFuture = tryAddSchema(topic, schema);

schemaVersionFuture.thenAccept(schemaVersion -> {
topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future -> {
buildProducerAndAddTopic(topic, producerId, producerName, requestId, isEncrypted,
metadata, schemaVersion, epoch, userProvidedProducerName, topicName,
producerAccessMode, topicEpoch, producerFuture);
}).exceptionally(exception -> {
Throwable cause = exception.getCause();
log.error("producerId {}, requestId {} : TransactionBuffer recover failed",
producerId, requestId, exception);
schemaVersionFuture.exceptionally(exception -> {
String message = exception.getMessage();
if (exception.getCause() != null) {
message += (" caused by " + exception.getCause());
}
commandSender.sendErrorResponse(requestId,
ServiceUnitNotReadyException.getClientErrorCode(cause),
cause.getMessage());
BrokerServiceException.getClientErrorCode(exception),
message);
producers.remove(producerId, producerFuture);
return null;
});

schemaVersionFuture.thenAccept(schemaVersion -> {
topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future -> {
buildProducerAndAddTopic(topic, producerId, producerName, requestId, isEncrypted,
metadata, schemaVersion, epoch, userProvidedProducerName, topicName,
producerAccessMode, topicEpoch, producerFuture);
}).exceptionally(exception -> {
Throwable cause = exception.getCause();
log.error("producerId {}, requestId {} : TransactionBuffer recover failed",
producerId, requestId, exception);
commandSender.sendErrorResponse(requestId,
ServiceUnitNotReadyException.getClientErrorCode(cause),
cause.getMessage());
return null;
});
});
});
}).exceptionally(exception -> {
Throwable cause = exception.getCause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
Expand Down Expand Up @@ -194,15 +195,15 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init

CompletableFuture<Void> onPoliciesUpdate(Policies data);

boolean isBacklogQuotaExceeded(String producerName, BacklogQuota.BacklogQuotaType backlogQuotaType);
CompletableFuture<Void> checkBacklogQuotaExceeded(String producerName, BacklogQuotaType backlogQuotaType);

boolean isEncryptionRequired();

boolean getSchemaValidationEnforced();

boolean isReplicated();

BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType);
BacklogQuota getBacklogQuota(BacklogQuotaType backlogQuotaType);

void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats,
StatsOutputStream topicStatsStream, ClusterReplicationMetrics clusterReplicationMetrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.mledger.impl.EntryCacheManager.create;
import static org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import com.carrotsearch.hppc.ObjectObjectHashMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -986,7 +987,7 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
* @return Backlog quota for topic
*/
@Override
public BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType) {
public BacklogQuota getBacklogQuota(BacklogQuotaType backlogQuotaType) {
// No-op
throw new UnsupportedOperationException("getBacklogQuota method is not supported on non-persistent topic");
}
Expand All @@ -996,9 +997,9 @@ public BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaTy
* @return quota exceeded status for blocking producer creation
*/
@Override
public boolean isBacklogQuotaExceeded(String producerName, BacklogQuota.BacklogQuotaType backlogQuotaType) {
public CompletableFuture<Void> checkBacklogQuotaExceeded(String producerName, BacklogQuotaType backlogQuotaType) {
// No-op
return false;
return CompletableFuture.completedFuture(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBacklogQuotaExceededException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException;
Expand Down Expand Up @@ -2473,24 +2474,33 @@ public BacklogQuota getBacklogQuota(BacklogQuotaType backlogQuotaType) {
* @return quota exceeded status for blocking producer creation
*/
@Override
public boolean isBacklogQuotaExceeded(String producerName, BacklogQuota.BacklogQuotaType backlogQuotaType) {
public CompletableFuture<Void> checkBacklogQuotaExceeded(String producerName, BacklogQuotaType backlogQuotaType) {
BacklogQuota backlogQuota = getBacklogQuota(backlogQuotaType);

if (backlogQuota != null) {
BacklogQuota.RetentionPolicy retentionPolicy = backlogQuota.getPolicy();

if ((retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold
|| retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception)) {
if (backlogQuotaType == BacklogQuota.BacklogQuotaType.destination_storage && isSizeBacklogExceeded()
|| backlogQuotaType == BacklogQuota.BacklogQuotaType.message_age && isTimeBacklogExceeded()){
log.info("[{}] Backlog quota exceeded. Cannot create producer [{}]", this.getName(), producerName);
return true;
if (backlogQuotaType == BacklogQuotaType.destination_storage && isSizeBacklogExceeded()) {
log.info("[{}] Size backlog quota exceeded. Cannot create producer [{}]", this.getName(),
producerName);
return FutureUtil.failedFuture(new TopicBacklogQuotaExceededException(retentionPolicy));
}
if (backlogQuotaType == BacklogQuotaType.message_age) {
return checkTimeBacklogExceeded().thenCompose(isExceeded -> {
if (isExceeded) {
log.info("[{}] Time backlog quota exceeded. Cannot create producer [{}]", this.getName(),
producerName);
return FutureUtil.failedFuture(new TopicBacklogQuotaExceededException(retentionPolicy));
} else {
return CompletableFuture.completedFuture(null);
}
});
}
} else {
return false;
return CompletableFuture.completedFuture(null);
}
}
return false;
return CompletableFuture.completedFuture(null);
}

/**
Expand Down Expand Up @@ -2518,17 +2528,26 @@ public boolean isSizeBacklogExceeded() {
* @return determine if backlog quota enforcement needs to be done for topic based on time limit
*/
public boolean isTimeBacklogExceeded() {
try {
return checkTimeBacklogExceeded().get();
} catch (Throwable e) {
log.error("[{}] checkTimeBacklogExceeded failed.", topic, e);
return false;
}
}

public CompletableFuture<Boolean> checkTimeBacklogExceeded() {
TopicName topicName = TopicName.get(getName());
CompletableFuture<Boolean> future = new CompletableFuture<>();
int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();

// If backlog quota by time is not set and we have no durable cursor.
if (backlogQuotaLimitInSecond <= 0
|| ((ManagedCursorContainer) ledger.getCursors()).getSlowestReaderPosition() == null) {
return false;
return CompletableFuture.completedFuture(false);
}

if (brokerService.pulsar().getConfiguration().isPreciseTimeBasedBacklogQuotaCheck()) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
// Check if first unconsumed message(first message after mark delete position)
// for slowest cursor's has expired.
PositionImpl position = ((ManagedLedgerImpl) ledger).getNextValidPosition(((ManagedCursorContainer)
Expand Down Expand Up @@ -2561,13 +2580,7 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
future.complete(false);
}
}, null);

try {
return future.get();
} catch (Exception e) {
log.error("[{}][{}] Error reading entry for precise time based backlog check", topicName, e);
return false;
}
return future;
} else {
Long ledgerId = ((ManagedCursorContainer) ledger.getCursors()).getSlowestReaderPosition().getLedgerId();
try {
Expand All @@ -2581,13 +2594,13 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+ "slowest cursor currently on {}", backlogQuotaLimitInSecond * 1000,
((ManagedLedgerImpl) ledger).getClock().millis() - ledgerInfo.getTimestamp());
}
return true;
return CompletableFuture.completedFuture(true);
} else {
return false;
return CompletableFuture.completedFuture(false);
}
} catch (Exception e) {
log.error("[{}][{}] Error reading entry for precise time based backlog check", topicName, e);
return false;
return CompletableFuture.completedFuture(false);
}
}
}
Expand Down

0 comments on commit b918092

Please sign in to comment.