diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java index c209c34a3d76e..3f8969860163d 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.testclient; +import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; @@ -31,7 +32,13 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData; import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SizeUnit; +import org.apache.pulsar.client.api.TableView; import org.apache.pulsar.policies.data.loadbalancer.LoadReport; import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; @@ -54,7 +61,7 @@ public class BrokerMonitor { private static final String BROKER_ROOT = "/loadbalance/brokers"; private static final int ZOOKEEPER_TIMEOUT_MILLIS = 30000; private static final int GLOBAL_STATS_PRINT_PERIOD_MILLIS = 60000; - private final ZooKeeper zkClient; + private ZooKeeper zkClient; private static final Gson gson = new Gson(); // Fields common for message rows. @@ -77,7 +84,7 @@ public class BrokerMonitor { private static final Object[] ALLOC_MESSAGE_ROW = makeMessageRow("ALLOC MSG"); private static final Object[] GLOBAL_HEADER = { "BROKER", "BUNDLE", "MSG/S", "LONG/S", "KB/S", "MAX %" }; - private final Map loadData; + private Map loadData; private static final FixedColumnLengthTableMaker localTableMaker = new FixedColumnLengthTableMaker(); static { @@ -434,8 +441,11 @@ private static class Arguments { @Parameter(names = { "-h", "--help" }, description = "Help message", help = true) boolean help; - @Parameter(names = { "--connect-string" }, description = "Zookeeper connect string", required = true) + @Parameter(names = { "--connect-string" }, description = "Zookeeper or broker connect string", required = true) public String connectString = null; + + @Parameter(names = { "--extensions" }, description = "true to monitor Load Balance Extensions.") + boolean extensions = false; } /** @@ -464,6 +474,71 @@ public void start() { } } + private TableView brokerLoadDataTableView; + + private BrokerMonitor(String brokerServiceUrl) { + try { + PulsarClient client = PulsarClient.builder() + .memoryLimit(0, SizeUnit.BYTES) + .serviceUrl(brokerServiceUrl) + .connectionsPerBroker(4) + .ioThreads(Runtime.getRuntime().availableProcessors()) + .statsInterval(0, TimeUnit.SECONDS) + .build(); + this.brokerLoadDataTableView = client + .newTableView(Schema.JSON(BrokerLoadData.class)) + .topic(BROKER_LOAD_DATA_STORE_TOPIC).create(); + } catch (Throwable e) { + log.info("Failed to start BrokerMonitor", e); + throw new RuntimeException(e); + } + } + + private synchronized void printBrokerLoadData(final String broker, final BrokerLoadData brokerLoadData) { + + // Initialize the constant rows. + final Object[][] rows = new Object[6][]; + rows[0] = SYSTEM_ROW; + rows[2] = COUNT_ROW; + rows[4] = LATEST_ROW; + + // First column is a label, so start at the second column at index 1. + // System row. + rows[1] = new Object[SYSTEM_ROW.length]; + initRow(rows[1], brokerLoadData.getCpu().percentUsage(), brokerLoadData.getMemory().percentUsage(), + brokerLoadData.getDirectMemory().percentUsage(), brokerLoadData.getBandwidthIn().percentUsage(), + brokerLoadData.getBandwidthOut().percentUsage(), brokerLoadData.getMaxResourceUsage() * 100); + + // Count row. + rows[3] = new Object[COUNT_ROW.length]; + initRow(rows[3], null, brokerLoadData.getBundleCount(), + null, null, + null, null); + + // Latest message data row. + rows[5] = new Object[LATEST_ROW.length]; + initMessageRow(rows[5], brokerLoadData.getMsgRateIn(), brokerLoadData.getMsgRateOut(), + brokerLoadData.getMsgThroughputIn(), brokerLoadData.getMsgThroughputOut()); + + final String table = localTableMaker.make(rows); + log.info("\nBroker Data for {}:\n{}\n", broker, table); + } + + private synchronized void printBrokerLoadDataStore() { + brokerLoadDataTableView.forEach(this::printBrokerLoadData); + } + + private void startBrokerLoadDataStoreMonitor() { + try { + while (true) { + Thread.sleep(GLOBAL_STATS_PRINT_PERIOD_MILLIS); + printBrokerLoadDataStore(); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + /** * Run a monitor from command line arguments. * @@ -481,8 +556,15 @@ public static void main(String[] args) throws Exception { jc.usage(); PerfClientUtils.exit(1); } - final ZooKeeper zkClient = new ZooKeeper(arguments.connectString, ZOOKEEPER_TIMEOUT_MILLIS, null); - final BrokerMonitor monitor = new BrokerMonitor(zkClient); - monitor.start(); + + + if (arguments.extensions) { + final BrokerMonitor monitor = new BrokerMonitor(arguments.connectString); + monitor.startBrokerLoadDataStoreMonitor(); + } else { + final ZooKeeper zkClient = new ZooKeeper(arguments.connectString, ZOOKEEPER_TIMEOUT_MILLIS, null); + final BrokerMonitor monitor = new BrokerMonitor(zkClient); + monitor.start(); + } } }