Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support shrink for map or set #14663

Merged
merged 3 commits into from
Mar 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ private void calculateCursorBacklogs(final ManagedLedgerFactoryImpl factory, fin
BookKeeper bk = factory.getBookKeeper();
final CountDownLatch allCursorsCounter = new CountDownLatch(1);
final long errorInReadingCursor = -1;
ConcurrentOpenHashMap<String, Long> ledgerRetryMap = new ConcurrentOpenHashMap<>();
ConcurrentOpenHashMap<String, Long> ledgerRetryMap =
ConcurrentOpenHashMap.<String, Long>newBuilder().build();

final MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgers.lastEntry().getValue();
final PositionImpl lastLedgerPosition = new PositionImpl(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ public static void fillNamespaceToBundlesMap(final Set<String> bundles,
bundles.forEach(bundleName -> {
final String namespaceName = getNamespaceNameFromBundleName(bundleName);
final String bundleRange = getBundleRangeFromBundleName(bundleName);
target.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).add(bundleRange);
target.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.add(bundleRange);
});
}

Expand Down Expand Up @@ -263,8 +265,12 @@ public static void removeMostServicingBrokersForNamespace(

for (final String broker : candidates) {
int bundles = (int) brokerToNamespaceToBundleRange
.computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>())
.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).size();
.computeIfAbsent(broker,
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder().build())
.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.size();
leastBundles = Math.min(leastBundles, bundles);
if (leastBundles == 0) {
break;
Expand All @@ -276,8 +282,12 @@ public static void removeMostServicingBrokersForNamespace(

final int finalLeastBundles = leastBundles;
candidates.removeIf(
broker -> brokerToNamespaceToBundleRange.computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>())
.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).size() > finalLeastBundles);
broker -> brokerToNamespaceToBundleRange.computeIfAbsent(broker,
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder().build())
.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.size() > finalLeastBundles);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,10 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
*/
public ModularLoadManagerImpl() {
brokerCandidateCache = new HashSet<>();
brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
brokerToNamespaceToBundleRange =
ConcurrentOpenHashMap.<String,
ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder()
.build();
defaultStats = new NamespaceBundleStats();
filterPipeline = new ArrayList<>();
loadData = new LoadData();
Expand Down Expand Up @@ -567,7 +570,10 @@ private void updateBundleData() {
brokerData.getTimeAverageData().reset(statsMap.keySet(), bundleData, defaultStats);
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
brokerToNamespaceToBundleRange
.computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>());
.computeIfAbsent(broker, k ->
ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build());
synchronized (namespaceToBundleRange) {
namespaceToBundleRange.clear();
LoadManagerShared.fillNamespaceToBundlesMap(statsMap.keySet(), namespaceToBundleRange);
Expand Down Expand Up @@ -850,9 +856,13 @@ public Optional<String> selectBrokerForAssignment(final ServiceUnitId serviceUni
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
brokerToNamespaceToBundleRange
.computeIfAbsent(broker.get(), k -> new ConcurrentOpenHashMap<>());
.computeIfAbsent(broker.get(),
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build());
synchronized (namespaceToBundleRange) {
namespaceToBundleRange.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>())
namespaceToBundleRange.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.add(bundleRange);
}
return broker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,10 @@ public SimpleLoadManagerImpl() {
bundleLossesCache = new HashSet<>();
brokerCandidateCache = new HashSet<>();
availableBrokersCache = new HashSet<>();
brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
brokerToNamespaceToBundleRange =
ConcurrentOpenHashMap.<String,
ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder()
.build();
this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
@Override
public boolean isEnablePersistentTopics(String brokerUrl) {
Expand Down Expand Up @@ -851,8 +854,12 @@ private synchronized ResourceUnit findBrokerForPlacement(Multimap<Long, Resource
// same broker.
brokerToNamespaceToBundleRange
.computeIfAbsent(selectedRU.getResourceId().replace("http://", ""),
k -> new ConcurrentOpenHashMap<>())
.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).add(bundleRange);
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build())
.computeIfAbsent(namespaceName, k ->
ConcurrentOpenHashSet.<String>newBuilder().build())
.add(bundleRange);
ranking.addPreAllocatedServiceUnit(serviceUnitId, quota);
resourceUnitRankings.put(selectedRU, ranking);
}
Expand Down Expand Up @@ -1271,7 +1278,10 @@ private synchronized void updateBrokerToNamespaceToBundle() {
final Set<String> preallocatedBundles = resourceUnitRankings.get(resourceUnit).getPreAllocatedBundles();
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
brokerToNamespaceToBundleRange
.computeIfAbsent(broker.replace("http://", ""), k -> new ConcurrentOpenHashMap<>());
.computeIfAbsent(broker.replace("http://", ""),
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build());
namespaceToBundleRange.clear();
LoadManagerShared.fillNamespaceToBundlesMap(loadedBundles, namespaceToBundleRange);
LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundles, namespaceToBundleRange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ public NamespaceService(PulsarService pulsar) {
this.loadManager = pulsar.getLoadManager();
this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
this.ownershipCache = new OwnershipCache(pulsar, bundleFactory, this);
this.namespaceClients = new ConcurrentOpenHashMap<>();
this.namespaceClients =
ConcurrentOpenHashMap.<ClusterDataImpl, PulsarClientImpl>newBuilder().build();
this.bundleOwnershipListeners = new CopyOnWriteArrayList<>();
this.localBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
this.localPoliciesCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalPolicies.class);
Expand Down Expand Up @@ -355,9 +356,15 @@ public boolean registerNamespace(NamespaceName nsname, boolean ensureOwned) thro
}

private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>>
findingBundlesAuthoritative = new ConcurrentOpenHashMap<>();
findingBundlesAuthoritative =
ConcurrentOpenHashMap.<NamespaceBundle,
CompletableFuture<Optional<LookupResult>>>newBuilder()
.build();
private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>>
findingBundlesNotAuthoritative = new ConcurrentOpenHashMap<>();
findingBundlesNotAuthoritative =
ConcurrentOpenHashMap.<NamespaceBundle,
CompletableFuture<Optional<LookupResult>>>newBuilder()
.build();

/**
* Main internal method to lookup and setup ownership of service unit to a broker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,8 @@ private CompletableFuture<Void> lookUpBrokerForTopic(TopicName partitionedTopicN
partitionedTopicName, result.getLookupData());
}
pulsar().getBrokerService().getOwningTopics().computeIfAbsent(partitionedTopicName
.getPartitionedTopicName(), (key) -> new ConcurrentOpenHashSet<Integer>())
.getPartitionedTopicName(),
(key) -> ConcurrentOpenHashSet.<Integer>newBuilder().build())
.add(partitionedTopicName.getPartitionIndex());
completeLookup(Pair.of(Collections.emptyList(), false), redirectAddresses, future);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,17 +281,28 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
this.preciseTopicPublishRateLimitingEnable =
pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
this.managedLedgerFactory = pulsar.getManagedLedgerFactory();
this.topics = new ConcurrentOpenHashMap<>();
this.replicationClients = new ConcurrentOpenHashMap<>();
this.clusterAdmins = new ConcurrentOpenHashMap<>();
this.topics =
ConcurrentOpenHashMap.<String, CompletableFuture<Optional<Topic>>>newBuilder()
.build();
this.replicationClients =
ConcurrentOpenHashMap.<String, PulsarClient>newBuilder().build();
this.clusterAdmins =
ConcurrentOpenHashMap.<String, PulsarAdmin>newBuilder().build();
this.keepAliveIntervalSeconds = pulsar.getConfiguration().getKeepAliveIntervalSeconds();
this.configRegisteredListeners = new ConcurrentOpenHashMap<>();
this.configRegisteredListeners =
ConcurrentOpenHashMap.<String, Consumer<?>>newBuilder().build();
this.pendingTopicLoadingQueue = Queues.newConcurrentLinkedQueue();

this.multiLayerTopicsMap = new ConcurrentOpenHashMap<>();
this.owningTopics = new ConcurrentOpenHashMap<>();
this.multiLayerTopicsMap = ConcurrentOpenHashMap.<String,
ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>>newBuilder()
.build();
this.owningTopics = ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<Integer>>newBuilder()
.build();
this.pulsarStats = new PulsarStats(pulsar);
this.offlineTopicStatCache = new ConcurrentOpenHashMap<>();
this.offlineTopicStatCache =
ConcurrentOpenHashMap.<TopicName,
PersistentOfflineTopicStats>newBuilder().build();

this.topicOrderedExecutor = OrderedScheduler.newSchedulerBuilder()
.numThreads(pulsar.getConfiguration().getNumWorkerThreadsForNonPersistentTopic())
Expand Down Expand Up @@ -326,7 +337,8 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
this.backlogQuotaChecker = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));
this.authenticationService = new AuthenticationService(pulsar.getConfiguration());
this.blockedDispatchers = new ConcurrentOpenHashSet<>();
this.blockedDispatchers =
ConcurrentOpenHashSet.<PersistentDispatcherMultipleConsumers>newBuilder().build();
// update dynamic configuration and register-listener
updateConfigurationAndRegisterListeners();
this.lookupRequestSemaphore = new AtomicReference<Semaphore>(
Expand Down Expand Up @@ -1592,8 +1604,12 @@ private void addTopicToStatsMaps(TopicName topicName, Topic topic) {
synchronized (multiLayerTopicsMap) {
String serviceUnit = namespaceBundle.toString();
multiLayerTopicsMap //
.computeIfAbsent(topicName.getNamespace(), k -> new ConcurrentOpenHashMap<>()) //
.computeIfAbsent(serviceUnit, k -> new ConcurrentOpenHashMap<>()) //
.computeIfAbsent(topicName.getNamespace(),
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashMap<String, Topic>>newBuilder()
.build()) //
.computeIfAbsent(serviceUnit,
k -> ConcurrentOpenHashMap.<String, Topic>newBuilder().build()) //
.put(topicName.toString(), topic);
}
}
Expand Down Expand Up @@ -2415,7 +2431,8 @@ public static boolean validateDynamicConfiguration(String key, String value) {
}

private static ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigurationMap() {
ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap = new ConcurrentOpenHashMap<>();
ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap =
ConcurrentOpenHashMap.<String, ConfigField>newBuilder().build();
for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
if (field != null && field.isAnnotationPresent(FieldContext.class)) {
field.setAccessible(true);
Expand All @@ -2428,7 +2445,8 @@ private static ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigur
}

private ConcurrentOpenHashMap<String, Object> getRuntimeConfigurationMap() {
ConcurrentOpenHashMap<String, Object> runtimeConfigurationMap = new ConcurrentOpenHashMap<>();
ConcurrentOpenHashMap<String, Object> runtimeConfigurationMap =
ConcurrentOpenHashMap.<String, Object>newBuilder().build();
for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
if (field != null && field.isAnnotationPresent(FieldContext.class)) {
field.setAccessible(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,16 @@ public void reset() {
public NonPersistentTopic(String topic, BrokerService brokerService) {
super(topic, brokerService);

this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
this.replicators = new ConcurrentOpenHashMap<>(16, 1);
this.subscriptions =
ConcurrentOpenHashMap.<String, NonPersistentSubscription>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.replicators =
ConcurrentOpenHashMap.<String, NonPersistentReplicator>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.isFenced = false;
registerTopicPolicyListener();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,20 @@ public MessageDupUnknownException() {
// Map that contains the highest sequenceId that have been sent by each producers. The map will be updated before
// the messages are persisted
@VisibleForTesting
final ConcurrentOpenHashMap<String, Long> highestSequencedPushed = new ConcurrentOpenHashMap<>(16, 1);
final ConcurrentOpenHashMap<String, Long> highestSequencedPushed =
ConcurrentOpenHashMap.<String, Long>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();

// Map that contains the highest sequenceId that have been persistent by each producers. The map will be updated
// after the messages are persisted
@VisibleForTesting
final ConcurrentOpenHashMap<String, Long> highestSequencedPersisted = new ConcurrentOpenHashMap<>(16, 1);
final ConcurrentOpenHashMap<String, Long> highestSequencedPersisted =
ConcurrentOpenHashMap.<String, Long>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();

// Number of persisted entries after which to store a snapshot of the sequence ids map
private final int snapshotInterval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,14 @@ public void reset() {
public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) {
super(topic, brokerService);
this.ledger = ledger;
this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
this.replicators = new ConcurrentOpenHashMap<>(16, 1);
this.subscriptions = ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.replicators = ConcurrentOpenHashMap.<String, Replicator>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.backloggedCursorThresholdEntries =
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
registerTopicPolicyListener();
Expand Down Expand Up @@ -346,8 +352,14 @@ public CompletableFuture<Void> initialize() {
super(topic, brokerService);
this.ledger = ledger;
this.messageDeduplication = messageDeduplication;
this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
this.replicators = new ConcurrentOpenHashMap<>(16, 1);
this.subscriptions = ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.replicators = ConcurrentOpenHashMap.<String, Replicator>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
this.backloggedCursorThresholdEntries =
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
Expand Down
Loading