diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/BrokerStats.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/BrokerStats.java index 10bb11c0e264d..c9341749953fa 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/BrokerStats.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/BrokerStats.java @@ -15,7 +15,6 @@ */ package com.yahoo.pulsar.broker.admin; -import java.io.IOException; import java.io.OutputStream; import java.util.Collection; import java.util.Map; @@ -33,13 +32,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; -import com.yahoo.pulsar.common.naming.NamespaceName; -import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport; -import com.yahoo.pulsar.common.stats.AllocatorStats; import com.yahoo.pulsar.broker.loadbalance.ResourceUnit; import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; import com.yahoo.pulsar.broker.stats.AllocatorStatsGenerator; @@ -47,9 +39,14 @@ import com.yahoo.pulsar.broker.stats.MBeanStatsGenerator; import com.yahoo.pulsar.broker.stats.Metrics; import com.yahoo.pulsar.broker.web.RestException; +import com.yahoo.pulsar.common.naming.NamespaceName; +import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport; +import com.yahoo.pulsar.common.stats.AllocatorStats; -import io.netty.buffer.ByteBuf; -import io.netty.util.ReferenceCountUtil; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; @Path("/broker-stats") @Api(value = "/broker-stats", description = "Stats for broker", tags = "broker-stats") @@ -99,19 +96,13 @@ public Collection getMBeans() throws Exception { public StreamingOutput getDestinations2() throws Exception { // Ensure super user access only validateSuperUserAccess(); - return new StreamingOutput() { - public void write(OutputStream output) throws IOException, WebApplicationException { - ByteBuf statsBuf = null; - try { - statsBuf = pulsar().getBrokerService().getDimensionMetrics(); - output.write(statsBuf.array(), statsBuf.arrayOffset(), statsBuf.readableBytes()); - } catch (Exception e) { - throw new WebApplicationException(e); - } finally { - ReferenceCountUtil.release(statsBuf); - } + return output -> pulsar().getBrokerService().getDimensionMetrics(statsBuf -> { + try { + output.write(statsBuf.array(), statsBuf.arrayOffset(), statsBuf.readableBytes()); + } catch (Exception e) { + throw new WebApplicationException(e); } - }; + }); } @GET diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java index 8df6784437046..263337ff47ea4 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java @@ -37,6 +37,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback; @@ -591,8 +592,8 @@ public void updateRates() { } } - public ByteBuf getDimensionMetrics() { - return pulsarStats.getDimensionMetrics(); + public void getDimensionMetrics(Consumer consumer) { + pulsarStats.getDimensionMetrics(consumer); } public List getDestinationMetrics() { diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/PulsarStats.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/PulsarStats.java index 350b1c7954ef7..fdaad39c55be7 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/PulsarStats.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/PulsarStats.java @@ -18,6 +18,8 @@ import java.io.Closeable; import java.util.List; import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +53,8 @@ public class PulsarStats implements Closeable { private List metricsCollection; private final BrokerOperabilityMetrics brokerOperabilityMetrics; + private final ReentrantReadWriteLock bufferLock = new ReentrantReadWriteLock(); + public PulsarStats(PulsarService pulsar) { this.topicStatsBuf = PooledByteBufAllocator.DEFAULT.heapBuffer(16 * 1024); this.tempTopicStatsBuf = PooledByteBufAllocator.DEFAULT.heapBuffer(16 * 1024); @@ -148,21 +152,23 @@ public synchronized void updateStats( metricsCollection = tempMetricsCollection; tempMetricsCollection = tempRefMetrics; - ByteBuf tmp = topicStatsBuf; - topicStatsBuf = tempTopicStatsBuf; - tempTopicStatsBuf = tmp; + bufferLock.writeLock().lock(); + try { + ByteBuf tmp = topicStatsBuf; + topicStatsBuf = tempTopicStatsBuf; + tempTopicStatsBuf = tmp; + tempTopicStatsBuf.clear(); + } finally { + bufferLock.writeLock().unlock(); + } } - public ByteBuf getDimensionMetrics() { - while (true) { - ByteBuf topicStatsBuf = this.topicStatsBuf; - try { - topicStatsBuf.retain(); - return topicStatsBuf; - } catch (Exception e) { - // Re-fetch the buffer, since it have been swapped and release - continue; - } + public void getDimensionMetrics(Consumer consumer) { + bufferLock.readLock().lock(); + try { + consumer.accept(topicStatsBuf); + } finally { + bufferLock.readLock().unlock(); } }