Skip to content

Commit

Permalink
Fix collection get bug in ResourceGroupService (#12499)
Browse files Browse the repository at this point in the history
  • Loading branch information
hezhangjian authored Nov 4, 2021
1 parent 938dd54 commit 086906d
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,7 @@ public ResourceGroup getNamespaceResourceGroup(NamespaceName namespaceName) {
protected boolean incrementUsage(String tenantName, String nsName,
ResourceGroupMonitoringClass monClass,
BytesAndMessagesCount incStats) throws PulsarAdminException {
final String tenantAndNsString = tenantName + "/" + nsName;
final ResourceGroup nsRG = this.namespaceToRGsMap.get(tenantAndNsString);
final ResourceGroup nsRG = this.namespaceToRGsMap.get(NamespaceName.get(tenantName, nsName));
final ResourceGroup tenantRG = this.tenantToRGsMap.get(tenantName);
if (tenantRG == null && nsRG == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,11 +517,18 @@ private void testProduceConsumeUsageOnRG(String[] topicStrings) throws Exception
Assert.assertEquals(recvdNumMsgs, TotalExpectedMessagesToReceive);
Assert.assertEquals(numConsumerExceptions, 0);

boolean tenantRGEqualsNsRG = tenantRGEqualsNamespaceRG(topicStrings);
// If the tenant and NS are on different RGs, the bytes/messages get counted once on the
// tenant RG, and again on the namespace RG. This double-counting is avoided if tenant-RG == NS-RG.
// This is a known (and discussed) artifact in the implementation.
// 'ScaleFactor' is a way to incorporate that effect in the verification.
final int scaleFactor = tenantRGEqualsNsRG ? 1 : 2;

// Verify producer and consumer side stats.
this.verifyRGProdConsStats(topicStrings, sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs, true, true);
this.verifyRGProdConsStats(topicStrings, sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs, scaleFactor, true, true);

// Verify the metrics corresponding to the operations in this test.
this.verifyRGMetrics(topicStrings, sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs, true, true);
this.verifyRGMetrics(sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs, scaleFactor, true, true);

unRegisterTenantsAndNamespaces(topicStrings);
// destroyTopics can be called after createTopics() is added back
Expand All @@ -535,9 +542,9 @@ private void testProduceConsumeUsageOnRG(String[] topicStrings) throws Exception
private void verifyRGProdConsStats(String[] topicStrings,
int sentNumBytes, int sentNumMsgs,
int recvdNumBytes, int recvdNumMsgs,
boolean checkProduce, boolean checkConsume) throws Exception {
int scaleFactor, boolean checkProduce,
boolean checkConsume) throws Exception {

boolean tenantRGEqualsNsRG = tenantRGEqualsNamespaceRG(topicStrings);
BrokerService bs = pulsar.getBrokerService();
Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats();

Expand All @@ -557,12 +564,6 @@ private void verifyRGProdConsStats(String[] topicStrings,
BytesAndMessagesCount totalNsRGConsCounts = new BytesAndMessagesCount();
BytesAndMessagesCount prodCounts, consCounts;

// If the tenant and NS are on different RGs, the bytes/messages get counted once on the
// tenant RG, and again on the namespace RG. This double-counting is avoided if tenant-RG == NS-RG.
// This is a known (and discussed) artifact in the implementation.
// 'ScaleFactor' is a way to incorporate that effect in the verification.
final int scaleFactor = tenantRGEqualsNsRG ? 1 : 2;

// Since the following walk is on topics, keep track of the RGs for which we have already gathered stats,
// so that we do not double-accumulate stats if multiple topics refer to the same RG.
HashSet<String> RGsWithPublishStatsGathered = new HashSet<>();
Expand Down Expand Up @@ -643,24 +644,22 @@ private void verifyRGProdConsStats(String[] topicStrings,

if (checkProduce) {
prodCounts = ResourceGroup.accumulateBMCount(totalTenantRGProdCounts, totalNsRGProdCounts);
Assert.assertEquals(prodCounts.messages, sentNumMsgs);
Assert.assertEquals(prodCounts.messages, sentNumMsgs * scaleFactor);
Assert.assertTrue(prodCounts.bytes >= ExpectedNumBytesSent);
}

if (checkConsume) {
consCounts = ResourceGroup.accumulateBMCount(totalTenantRGConsCounts, totalNsRGConsCounts);
Assert.assertEquals(consCounts.messages, recvdNumMsgs);
Assert.assertEquals(consCounts.messages, recvdNumMsgs * scaleFactor);
Assert.assertTrue(consCounts.bytes >= ExpectedNumBytesReceived);
}
}

// Check the metrics for the RGs involved
private void verifyRGMetrics(String[] topicStrings,
int sentNumBytes, int sentNumMsgs,
private void verifyRGMetrics(int sentNumBytes, int sentNumMsgs,
int recvdNumBytes, int recvdNumMsgs,
boolean checkProduce, boolean checkConsume) throws Exception {

tenantRGEqualsNamespaceRG(topicStrings);
int scaleFactor, boolean checkProduce,
boolean checkConsume) throws Exception {
final int ExpectedNumBytesSent = sentNumBytes + PER_MESSAGE_METADATA_OHEAD * sentNumMsgs;
final int ExpectedNumBytesReceived = recvdNumBytes + PER_MESSAGE_METADATA_OHEAD * recvdNumMsgs;
long totalTenantRegisters = 0;
Expand Down Expand Up @@ -729,12 +728,12 @@ private void verifyRGMetrics(String[] topicStrings,
// So, we take the residuals into account when comparing against the expected.
if (checkProduce && mc == ResourceGroupMonitoringClass.Publish) {
Assert.assertEquals(totalUsedMessages[mcIdx] - residualSentNumMessages,
sentNumMsgs);
sentNumMsgs * scaleFactor);
Assert.assertTrue(totalUsedBytes[mcIdx] - residualSentNumBytes
>= ExpectedNumBytesSent);
} else if (checkConsume && mc == ResourceGroupMonitoringClass.Dispatch) {
Assert.assertEquals(totalUsedMessages[mcIdx] - residualRecvdNumMessages,
recvdNumMsgs);
recvdNumMsgs * scaleFactor);
Assert.assertTrue(totalUsedBytes[mcIdx] - residualRecvdNumBytes
>= ExpectedNumBytesReceived);
}
Expand All @@ -745,9 +744,9 @@ private void verifyRGMetrics(String[] topicStrings,

// Update the residuals for next round of tests.
residualSentNumBytes += sentNumBytes;
residualSentNumMessages += sentNumMsgs;
residualSentNumMessages += sentNumMsgs * scaleFactor;
residualRecvdNumBytes += recvdNumBytes;
residualRecvdNumMessages += recvdNumMsgs;
residualRecvdNumMessages += recvdNumMsgs * scaleFactor;

Assert.assertEquals(totalUpdates, 0); // currently, we don't update the RGs in this UT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public class ResourceGroupRateLimiterTest extends BrokerTestBase {
new org.apache.pulsar.common.policies.data.ResourceGroup();
final String namespaceName = "prop/ns-abc";
final String persistentTopicString = "persistent://prop/ns-abc/test-topic";
final String nonPersistentTopicString = "non-persistent://prop/ns-abc/test-topic";
final int MESSAGE_SIZE = 10;

@BeforeClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,9 @@ public void measureOpsTime() throws PulsarAdminException {
rgs.unRegisterNameSpace(rgName, tenantAndNamespaceName);

// The overhead of a RG lookup
ResourceGroup retRG;
mSecsStart = System.currentTimeMillis();
for (int ix = 0; ix < numPerfTestIterations; ix++) {
retRG = rgs.resourceGroupGet(rg.resourceGroupName);
rgs.resourceGroupGet(rg.resourceGroupName);
}
mSecsEnd = System.currentTimeMillis();
diffMsecs = mSecsEnd - mSecsStart;
Expand Down Expand Up @@ -221,7 +220,6 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep
// maxUsageReportSuppressRounds iterations. So, if we run for maxUsageReportSuppressRounds iterations,
// we should see needToReportLocalUsage() return true at least once.
Set<Boolean> myBoolSet = new HashSet<>();
myBoolSet.clear();
for (int idx = 0; idx < ResourceGroupService.MaxUsageReportSuppressRounds; idx++) {
needToReport = retRG.setUsageInMonitoredEntity(monClass, nwUsage);
myBoolSet.add(needToReport);
Expand Down

0 comments on commit 086906d

Please sign in to comment.