Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into improve/loadbalance…
Browse files Browse the repository at this point in the history
…/metadata_service_not_available_dont_do_load_shedding
  • Loading branch information
congbobo184 committed Jul 22, 2024
2 parents a196001 + 3e4f338 commit 78b0a99
Show file tree
Hide file tree
Showing 20 changed files with 1,683 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3472,6 +3472,19 @@ public LongPairRangeSet<Position> getIndividuallyDeletedMessagesSet() {
return individualDeletedMessages;
}

public Position processIndividuallyDeletedMessagesAndGetMarkDeletedPosition(
LongPairRangeSet.RangeProcessor<Position> processor) {
final Position mdp;
lock.readLock().lock();
try {
mdp = markDeletePosition;
individualDeletedMessages.forEach(processor);
} finally {
lock.readLock().unlock();
}
return mdp;
}

public boolean isMessageDeleted(Position position) {
lock.readLock().lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3497,7 +3497,7 @@ private CompletableFuture<Void> completeLedgerInfoForOffloaded(long ledgerId, UU
* the position range
* @return the count of entries
*/
long getNumberOfEntries(Range<Position> range) {
public long getNumberOfEntries(Range<Position> range) {
Position fromPosition = range.lowerEndpoint();
boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED;
Position toPosition = range.upperEndpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -55,6 +56,7 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -104,6 +106,7 @@
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.stats.MetricsUtil;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataCache;
Expand Down Expand Up @@ -187,6 +190,9 @@ public class NamespaceService implements AutoCloseable {
.register();
private final DoubleHistogram lookupLatencyHistogram;

private ConcurrentHashMap<String, CompletableFuture<List<String>>> inProgressQueryUserTopics =
new ConcurrentHashMap<>();

/**
* Default constructor.
*/
Expand Down Expand Up @@ -1509,6 +1515,23 @@ public CompletableFuture<List<String>> getListOfTopics(NamespaceName namespaceNa
}
}

public CompletableFuture<List<String>> getListOfUserTopics(NamespaceName namespaceName, Mode mode) {
String key = String.format("%s://%s", mode, namespaceName);
final MutableBoolean initializedByCurrentThread = new MutableBoolean();
CompletableFuture<List<String>> queryRes = inProgressQueryUserTopics.computeIfAbsent(key, k -> {
initializedByCurrentThread.setTrue();
return getListOfTopics(namespaceName, mode).thenApplyAsync(list -> {
return TopicList.filterSystemTopic(list);
}, pulsar.getExecutor());
});
if (initializedByCurrentThread.getValue()) {
queryRes.whenComplete((ignore, ex) -> {
inProgressQueryUserTopics.remove(key, queryRes);
});
}
return queryRes;
}

public CompletableFuture<List<String>> getAllPartitions(NamespaceName namespaceName) {
return getPartitions(namespaceName, TopicDomain.persistent)
.thenCombine(getPartitions(namespaceName, TopicDomain.non_persistent),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public class Consumer {

private static final double avgPercent = 0.9;
private boolean preciseDispatcherFlowControl;
private Position readPositionWhenJoining;
private Position lastSentPositionWhenJoining;
private final String clientAddress; // IP address only, no port number included
private final MessageId startMessageId;
private final boolean isAcknowledgmentAtBatchIndexLevelEnabled;
Expand Down Expand Up @@ -931,8 +931,8 @@ public ConsumerStatsImpl getStats() {
stats.unackedMessages = unackedMessages;
stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
stats.avgMessagesPerEntry = getAvgMessagesPerEntry();
if (readPositionWhenJoining != null) {
stats.readPositionWhenJoining = readPositionWhenJoining.toString();
if (lastSentPositionWhenJoining != null) {
stats.lastSentPositionWhenJoining = lastSentPositionWhenJoining.toString();
}
return stats;
}
Expand Down Expand Up @@ -1166,8 +1166,8 @@ public boolean isPreciseDispatcherFlowControl() {
return preciseDispatcherFlowControl;
}

public void setReadPositionWhenJoining(Position readPositionWhenJoining) {
this.readPositionWhenJoining = readPositionWhenJoining;
public void setLastSentPositionWhenJoining(Position lastSentPositionWhenJoining) {
this.lastSentPositionWhenJoining = lastSentPositionWhenJoining;
}

public int getMaxUnackedMessages() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2459,11 +2459,11 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet
if (lookupSemaphore.tryAcquire()) {
isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
if (isAuthorized) {
getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
getBrokerService().pulsar().getNamespaceService().getListOfUserTopics(namespaceName, mode)
.thenAccept(topics -> {
boolean filterTopics = false;
// filter system topic
List<String> filteredTopics = TopicList.filterSystemTopic(topics);
List<String> filteredTopics = topics;

if (enableSubscriptionPatternEvaluation && topicsPattern.isPresent()) {
if (topicsPattern.get().length() <= maxSubscriptionPatternLength) {
Expand Down
Loading

0 comments on commit 78b0a99

Please sign in to comment.