Skip to content

Commit

Permalink
[improve][broker] Optimise msgOutCounter and bytesOutCounter (#16214)
Browse files Browse the repository at this point in the history
### Motivation

AbstractTopic#getBytesOutCounter and AbstractTopic#getMsgOutCounter
both generate full stats only to pick the single required counter for
the getters. These can be optimised to perform only the necessary work
to calculate the counters.

### Modification

Provided a scoped implementation of the above methods for the single
counter required for each getter.
  • Loading branch information
Dave Maughan committed Jun 29, 2022
1 parent f230d15 commit 0a1afed
Show file tree
Hide file tree
Showing 10 changed files with 259 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import java.util.Optional;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.ToLongFunction;

public abstract class AbstractSubscription implements Subscription {
protected final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
protected final LongAdder msgOutFromRemovedConsumer = new LongAdder();

public long getMsgOutCounter() {
return msgOutFromRemovedConsumer.longValue() + sumConsumers(Consumer::getMsgOutCounter);
}

public long getBytesOutCounter() {
return bytesOutFromRemovedConsumers.longValue() + sumConsumers(Consumer::getBytesOutCounter);
}

private long sumConsumers(ToLongFunction<Consumer> toCounter) {
return Optional.ofNullable(getDispatcher())
.map(dispatcher -> dispatcher.getConsumers().stream().mapToLong(toCounter).sum())
.orElse(0L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.ToLongFunction;
import lombok.Getter;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.commons.collections4.CollectionUtils;
Expand All @@ -51,6 +52,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
Expand Down Expand Up @@ -141,6 +143,9 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP

private Map<String/*subscription*/, SubscriptionPolicies> subscriptionPolicies = Collections.emptyMap();

protected final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
protected final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();

public AbstractTopic(String topic, BrokerService brokerService) {
this.topic = topic;
this.brokerService = brokerService;
Expand Down Expand Up @@ -1133,11 +1138,18 @@ public long getBytesInCounter() {
}

public long getMsgOutCounter() {
return getStats(false, false, false).msgOutCounter;
return msgOutFromRemovedSubscriptions.longValue() + sumSubscriptions(AbstractSubscription::getMsgOutCounter);
}

public long getBytesOutCounter() {
return getStats(false, false, false).bytesOutCounter;
return bytesOutFromRemovedSubscriptions.longValue() + sumSubscriptions(AbstractSubscription::getBytesOutCounter);
}

private long sumSubscriptions(ToLongFunction<AbstractSubscription> toCounter) {
return getSubscriptions().values().stream()
.map(AbstractSubscription.class::cast)
.mapToLong(toCounter)
.sum();
}

public boolean isDeleteWhileInactive() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,14 @@ public ConsumerStatsImpl getStats() {
return stats;
}

public long getMsgOutCounter() {
return msgOutCounter.longValue();
}

public long getBytesOutCounter() {
return bytesOutCounter.longValue();
}

public int getUnackedMessages() {
return unackedMessages;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.AbstractSubscription;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
Expand All @@ -48,7 +48,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NonPersistentSubscription implements Subscription {
public class NonPersistentSubscription extends AbstractSubscription implements Subscription {
private final NonPersistentTopic topic;
private volatile NonPersistentDispatcher dispatcher;
private final String topicName;
Expand All @@ -66,8 +66,6 @@ public class NonPersistentSubscription implements Subscription {
// Timestamp of when this subscription was last seen active
private volatile long lastActive;

private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
private volatile Map<String, String> subscriptionProperties;

// If isDurable is false(such as a Reader), remove subscription from topic when closing this subscription.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.PulsarServerException;
Expand Down Expand Up @@ -107,9 +106,6 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
AtomicLongFieldUpdater.newUpdater(NonPersistentTopic.class, "entriesAddedCounter");
private volatile long entriesAddedCounter = 0;

private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
private final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();

private static final FastThreadLocal<TopicStats> threadLocalTopicStats = new FastThreadLocal<TopicStats>() {
@Override
protected TopicStats initialValue() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
Expand All @@ -52,6 +51,7 @@
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.AbstractSubscription;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
Expand Down Expand Up @@ -82,7 +82,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentSubscription implements Subscription {
public class PersistentSubscription extends AbstractSubscription implements Subscription {
protected final PersistentTopic topic;
protected final ManagedCursor cursor;
protected volatile Dispatcher dispatcher;
Expand Down Expand Up @@ -116,9 +116,6 @@ public class PersistentSubscription implements Subscription {
private final PendingAckHandle pendingAckHandle;
private volatile Map<String, String> subscriptionProperties;

private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
private final LongAdder msgOutFromRemovedConsumer = new LongAdder();

static {
REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,6 @@ protected TopicStatsHelper initialValue() {
@Getter
protected final TransactionBuffer transactionBuffer;

private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
private final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();

// Record the last time a data message (ie: not an internal Pulsar marker) is published on the topic
private long lastDataMessagePublishedTimestamp = 0;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import java.util.List;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class AbstractSubscriptionTest {
private Consumer consumer;
private AbstractSubscription subscription;

@BeforeMethod
public void beforeMethod() {
Dispatcher dispatcher = mock(Dispatcher.class);
consumer = mock(Consumer.class);
subscription = spy(AbstractSubscription.class);

when(subscription.getDispatcher()).thenReturn(dispatcher);
when(dispatcher.getConsumers()).thenReturn(List.of(consumer));
}

@Test
public void testGetMsgOutCounter() {
subscription.msgOutFromRemovedConsumer.add(1L);
when(consumer.getMsgOutCounter()).thenReturn(2L);
assertEquals(subscription.getMsgOutCounter(), 3L);
}

@Test
public void testGetBytesOutCounter() {
subscription.bytesOutFromRemovedConsumers.add(1L);
when(consumer.getBytesOutCounter()).thenReturn(2L);
assertEquals(subscription.getBytesOutCounter(), 3L);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.apache.pulsar.broker.service;

import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;
import static org.testng.Assert.assertEquals;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class AbstractTopicTest {
private AbstractSubscription subscription;
private AbstractTopic topic;

@BeforeMethod
public void beforeMethod() {
BrokerService brokerService = mock(BrokerService.class);
PulsarService pulsarService = mock(PulsarService.class);
ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class);
BacklogQuotaManager backlogQuotaManager = mock(BacklogQuotaManager.class);
subscription = mock(AbstractSubscription.class);

when(brokerService.pulsar()).thenReturn(pulsarService);
when(pulsarService.getConfiguration()).thenReturn(serviceConfiguration);
when(brokerService.getBacklogQuotaManager()).thenReturn(backlogQuotaManager);

topic = mock(AbstractTopic.class, withSettings()
.useConstructor("topic", brokerService)
.defaultAnswer(CALLS_REAL_METHODS));

ConcurrentOpenHashMap<String, Subscription> subscriptions =
ConcurrentOpenHashMap.<String, Subscription>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
subscriptions.put("subscription", subscription);
when(topic.getSubscriptions()).thenAnswer(invocation -> subscriptions);
}

@Test
public void testGetMsgOutCounter() {
topic.msgOutFromRemovedSubscriptions.add(1L);
when(subscription.getMsgOutCounter()).thenReturn(2L);
assertEquals(topic.getMsgOutCounter(), 3L);
}

@Test
public void testGetBytesOutCounter() {
topic.bytesOutFromRemovedSubscriptions.add(1L);
when(subscription.getBytesOutCounter()).thenReturn(2L);
assertEquals(topic.getBytesOutCounter(), 3L);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import static java.util.Collections.emptyMap;
import static org.apache.pulsar.client.api.MessageId.latest;
import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Exclusive;
import static org.apache.pulsar.common.api.proto.KeySharedMode.AUTO_SPLIT;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import java.net.SocketAddress;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class ConsumerTest {
private Consumer consumer;
private final ConsumerStatsImpl stats = new ConsumerStatsImpl();

@BeforeMethod
public void beforeMethod() {
Subscription subscription = mock(Subscription.class);
ServerCnx cnx = mock(ServerCnx.class);
SocketAddress address = mock(SocketAddress.class);
Topic topic = mock(Topic.class);
BrokerService brokerService = mock(BrokerService.class);
PulsarService pulsarService = mock(PulsarService.class);
ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class);

when(cnx.clientAddress()).thenReturn(address);
when(subscription.getTopic()).thenReturn(topic);
when(topic.getBrokerService()).thenReturn(brokerService);
when(brokerService.getPulsar()).thenReturn(pulsarService);
when(pulsarService.getConfiguration()).thenReturn(serviceConfiguration);

consumer =
new Consumer(subscription, Exclusive, "topic", 1, 0, "Cons1", true, cnx, "myrole-1", emptyMap(), false,
new KeySharedMeta().setKeySharedMode(AUTO_SPLIT), latest, DEFAULT_CONSUMER_EPOCH);
}

@Test
public void testGetMsgOutCounter() {
stats.msgOutCounter = 1L;
consumer.updateStats(stats);
assertEquals(consumer.getMsgOutCounter(), 1L);
}

@Test
public void testGetBytesOutCounter() {
stats.bytesOutCounter = 1L;
consumer.updateStats(stats);
assertEquals(consumer.getBytesOutCounter(), 1L);
}
}

0 comments on commit 0a1afed

Please sign in to comment.