Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Fixed implicit conversions from long -> int #22055

Merged
merged 4 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public class BrokerLoadData {
private double msgThroughputOut; // bytes/sec
private double msgRateIn; // messages/sec
private double msgRateOut; // messages/sec
private int bundleCount;
private int topics;
private long bundleCount;
private long topics;

// Load data features computed from the above resources.
private double maxResourceUsage; // max of resource usages
Expand Down Expand Up @@ -115,8 +115,8 @@ public void update(final SystemResourceUsage usage,
double msgThroughputOut,
double msgRateIn,
double msgRateOut,
int bundleCount,
int topics,
long bundleCount,
long topics,
ServiceConfiguration conf) {
updateSystemResourceUsage(usage.cpu, usage.memory, usage.directMemory, usage.bandwidthIn, usage.bandwidthOut);
this.msgThroughputIn = msgThroughputIn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String,
int loadBalancerBrokerMaxTopics = context.brokerConfiguration().getLoadBalancerBrokerMaxTopics();
brokers.keySet().removeIf(broker -> {
Optional<BrokerLoadData> brokerLoadDataOpt = context.brokerLoadDataStore().get(broker);
long topics = brokerLoadDataOpt.map(BrokerLoadData::getTopics).orElse(0);
long topics = brokerLoadDataOpt.map(BrokerLoadData::getTopics).orElse(0L);
// TODO: The broker load data might be delayed, so the max topic check might not accurate.
return topics >= loadBalancerBrokerMaxTopics;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public long calculateRank() {
percentageUsage = (entry.getValue().usage / entry.getValue().limit) * 100;
}
// give equal weight to each resource
double resourceWeight = weight * percentageUsage;
int resourceWeight = (int) (weight * percentageUsage);
// any resource usage over 75% doubles the whole weight per resource
if (percentageUsage > throttle) {
final int i = resourcesWithHighUsage++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public ResourceUnit findBrokerForPlacement(Multimap<Long, ResourceUnit> finalCan
}
int weightedSelector = rand.nextInt(totalAvailability);
log.debug("Generated Weighted Selector Number - [{}] ", weightedSelector);
int weightRangeSoFar = 0;
long weightRangeSoFar = 0;
for (Map.Entry<Long, ResourceUnit> candidateOwner : finalCandidates.entries()) {
weightRangeSoFar += candidateOwner.getKey();
if (weightedSelector < weightRangeSoFar) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,8 @@ protected static double getRgRemoteUsageMessageCount (String rgName, String monC
}

// Visibility for unit testing
protected static double getRgUsageReportedCount (String rgName, String monClassName) {
return rgLocalUsageReportCount.labels(rgName, monClassName).get();
protected static long getRgUsageReportedCount (String rgName, String monClassName) {
return (long) rgLocalUsageReportCount.labels(rgName, monClassName).get();
}

// Visibility for unit testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,48 +483,48 @@ protected BytesAndMessagesCount getPublishRateLimiters (String rgName) throws Pu
}

// Visibility for testing.
protected static double getRgQuotaByteCount (String rgName, String monClassName) {
return rgCalculatedQuotaBytes.labels(rgName, monClassName).get();
protected static long getRgQuotaByteCount (String rgName, String monClassName) {
return (long) rgCalculatedQuotaBytes.labels(rgName, monClassName).get();
}

// Visibility for testing.
protected static double getRgQuotaMessageCount (String rgName, String monClassName) {
return rgCalculatedQuotaMessages.labels(rgName, monClassName).get();
protected static long getRgQuotaMessageCount (String rgName, String monClassName) {
return (long) rgCalculatedQuotaMessages.labels(rgName, monClassName).get();
}

// Visibility for testing.
protected static double getRgLocalUsageByteCount (String rgName, String monClassName) {
return rgLocalUsageBytes.labels(rgName, monClassName).get();
protected static long getRgLocalUsageByteCount (String rgName, String monClassName) {
return (long) rgLocalUsageBytes.labels(rgName, monClassName).get();
}

// Visibility for testing.
protected static double getRgLocalUsageMessageCount (String rgName, String monClassName) {
return rgLocalUsageMessages.labels(rgName, monClassName).get();
protected static long getRgLocalUsageMessageCount (String rgName, String monClassName) {
return (long) rgLocalUsageMessages.labels(rgName, monClassName).get();
}

// Visibility for testing.
protected static double getRgUpdatesCount (String rgName) {
return rgUpdates.labels(rgName).get();
protected static long getRgUpdatesCount (String rgName) {
return (long) rgUpdates.labels(rgName).get();
}

// Visibility for testing.
protected static double getRgTenantRegistersCount (String rgName) {
return rgTenantRegisters.labels(rgName).get();
protected static long getRgTenantRegistersCount (String rgName) {
return (long) rgTenantRegisters.labels(rgName).get();
}

// Visibility for testing.
protected static double getRgTenantUnRegistersCount (String rgName) {
return rgTenantUnRegisters.labels(rgName).get();
protected static long getRgTenantUnRegistersCount (String rgName) {
return (long) rgTenantUnRegisters.labels(rgName).get();
}

// Visibility for testing.
protected static double getRgNamespaceRegistersCount (String rgName) {
return rgNamespaceRegisters.labels(rgName).get();
protected static long getRgNamespaceRegistersCount (String rgName) {
return (long) rgNamespaceRegisters.labels(rgName).get();
}

// Visibility for testing.
protected static double getRgNamespaceUnRegistersCount (String rgName) {
return rgNamespaceUnRegisters.labels(rgName).get();
protected static long getRgNamespaceUnRegistersCount (String rgName) {
return (long) rgNamespaceUnRegisters.labels(rgName).get();
}

// Visibility for testing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public class BrokerService implements Closeable {
// Keep track of topics and partitions served by this broker for fast lookup.
@Getter
private final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<Integer>> owningTopics;
private int numberOfNamespaceBundles = 0;
private long numberOfNamespaceBundles = 0;

private final EventLoopGroup acceptorGroup;
private final EventLoopGroup workerGroup;
Expand Down Expand Up @@ -2309,7 +2309,7 @@ private void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle,
topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD, EventStage.SUCCESS);
}

public int getNumberOfNamespaceBundles() {
public long getNumberOfNamespaceBundles() {
this.numberOfNamespaceBundles = 0;
this.multiLayerTopicsMap.forEach((namespaceName, bundles) -> {
this.numberOfNamespaceBundles += bundles.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

public class BrokerStats extends NamespaceStats {

public int bundleCount;
public int topics;
public long bundleCount;
public long topics;
public BrokerStats(int ratePeriodInSeconds) {
super(ratePeriodInSeconds);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class NamespaceStats {
public int consumerCount;
public int producerCount;
public int replicatorCount;
public int subsCount;
public long subsCount;
public static final String BRK_ADD_ENTRY_LATENCY_PREFIX = "brk_AddEntryLatencyBuckets";
public long[] addLatencyBucket = new long[ENTRY_LATENCY_BUCKETS_USEC.length + 1];
public static final String[] ADD_LATENCY_BUCKET_KEYS = new String[ENTRY_LATENCY_BUCKETS_USEC.length + 1];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,17 +682,11 @@ private void verifyRGMetrics(int sentNumBytes, int sentNumMsgs,
for (ResourceGroupMonitoringClass mc : ResourceGroupMonitoringClass.values()) {
String mcName = mc.name();
int mcIndex = mc.ordinal();
double quotaBytes = ResourceGroupService.getRgQuotaByteCount(rgName, mcName);
totalQuotaBytes[mcIndex] += quotaBytes;
double quotaMesgs = ResourceGroupService.getRgQuotaMessageCount(rgName, mcName);
totalQuotaMessages[mcIndex] += quotaMesgs;
double usedBytes = ResourceGroupService.getRgLocalUsageByteCount(rgName, mcName);
totalUsedBytes[mcIndex] += usedBytes;
double usedMesgs = ResourceGroupService.getRgLocalUsageMessageCount(rgName, mcName);
totalUsedMessages[mcIndex] += usedMesgs;

double usageReportedCount = ResourceGroup.getRgUsageReportedCount(rgName, mcName);
totalUsageReportCounts[mcIndex] += usageReportedCount;
totalQuotaBytes[mcIndex] += ResourceGroupService.getRgQuotaByteCount(rgName, mcName);
totalQuotaMessages[mcIndex] += ResourceGroupService.getRgQuotaMessageCount(rgName, mcName);
totalUsedBytes[mcIndex] += ResourceGroupService.getRgLocalUsageByteCount(rgName, mcName);
totalUsedMessages[mcIndex] += ResourceGroupService.getRgLocalUsageMessageCount(rgName, mcName);
totalUsageReportCounts[mcIndex] += ResourceGroup.getRgUsageReportedCount(rgName, mcName);
}

totalTenantRegisters += ResourceGroupService.getRgTenantRegistersCount(rgName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public interface LoadManagerReport extends ServiceLookupData {

Map<String, NamespaceBundleStats> getBundleStats();

int getNumTopics();
long getNumTopics();

int getNumBundles();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class LoadReport implements LoadManagerReport {
private long timestamp;
private double msgRateIn;
private double msgRateOut;
private int numTopics;
private long numTopics;
private int numConsumers;
private int numProducers;
private int numBundles;
Expand Down Expand Up @@ -205,7 +205,7 @@ public String getLoadReportType() {
}

@Override
public int getNumTopics() {
public long getNumTopics() {
numTopics = 0;
if (this.bundleStats != null) {
this.bundleStats.forEach((bundle, stats) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ public void setLastStats(Map<String, NamespaceBundleStats> lastStats) {
}

@Override
public int getNumTopics() {
public long getNumTopics() {
return numTopics;
}

Expand Down
Loading