Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-192 Added --extensions option in BrokerMonitor #19654

Merged
merged 1 commit into from
Mar 1, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.testclient;

import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
Expand All @@ -31,7 +32,13 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
Expand All @@ -54,7 +61,7 @@ public class BrokerMonitor {
private static final String BROKER_ROOT = "/loadbalance/brokers";
private static final int ZOOKEEPER_TIMEOUT_MILLIS = 30000;
private static final int GLOBAL_STATS_PRINT_PERIOD_MILLIS = 60000;
private final ZooKeeper zkClient;
private ZooKeeper zkClient;
private static final Gson gson = new Gson();

// Fields common for message rows.
Expand All @@ -77,7 +84,7 @@ public class BrokerMonitor {
private static final Object[] ALLOC_MESSAGE_ROW = makeMessageRow("ALLOC MSG");
private static final Object[] GLOBAL_HEADER = { "BROKER", "BUNDLE", "MSG/S", "LONG/S", "KB/S", "MAX %" };

private final Map<String, Object> loadData;
private Map<String, Object> loadData;

private static final FixedColumnLengthTableMaker localTableMaker = new FixedColumnLengthTableMaker();
static {
Expand Down Expand Up @@ -434,8 +441,11 @@ private static class Arguments {
@Parameter(names = { "-h", "--help" }, description = "Help message", help = true)
boolean help;

@Parameter(names = { "--connect-string" }, description = "Zookeeper connect string", required = true)
@Parameter(names = { "--connect-string" }, description = "Zookeeper or broker connect string", required = true)
public String connectString = null;

@Parameter(names = { "--extensions" }, description = "true to monitor Load Balance Extensions.")
boolean extensions = false;
}

/**
Expand Down Expand Up @@ -464,6 +474,71 @@ public void start() {
}
}

private TableView<BrokerLoadData> brokerLoadDataTableView;

private BrokerMonitor(String brokerServiceUrl) {
try {
PulsarClient client = PulsarClient.builder()
.memoryLimit(0, SizeUnit.BYTES)
.serviceUrl(brokerServiceUrl)
.connectionsPerBroker(4)
.ioThreads(Runtime.getRuntime().availableProcessors())
.statsInterval(0, TimeUnit.SECONDS)
.build();
this.brokerLoadDataTableView = client
.newTableView(Schema.JSON(BrokerLoadData.class))
.topic(BROKER_LOAD_DATA_STORE_TOPIC).create();
} catch (Throwable e) {
log.info("Failed to start BrokerMonitor", e);
throw new RuntimeException(e);
}
}

private synchronized void printBrokerLoadData(final String broker, final BrokerLoadData brokerLoadData) {

// Initialize the constant rows.
final Object[][] rows = new Object[6][];
rows[0] = SYSTEM_ROW;
rows[2] = COUNT_ROW;
rows[4] = LATEST_ROW;

// First column is a label, so start at the second column at index 1.
// System row.
rows[1] = new Object[SYSTEM_ROW.length];
initRow(rows[1], brokerLoadData.getCpu().percentUsage(), brokerLoadData.getMemory().percentUsage(),
brokerLoadData.getDirectMemory().percentUsage(), brokerLoadData.getBandwidthIn().percentUsage(),
brokerLoadData.getBandwidthOut().percentUsage(), brokerLoadData.getMaxResourceUsage() * 100);

// Count row.
rows[3] = new Object[COUNT_ROW.length];
initRow(rows[3], null, brokerLoadData.getBundleCount(),
null, null,
null, null);

// Latest message data row.
rows[5] = new Object[LATEST_ROW.length];
initMessageRow(rows[5], brokerLoadData.getMsgRateIn(), brokerLoadData.getMsgRateOut(),
brokerLoadData.getMsgThroughputIn(), brokerLoadData.getMsgThroughputOut());

final String table = localTableMaker.make(rows);
log.info("\nBroker Data for {}:\n{}\n", broker, table);
}

private synchronized void printBrokerLoadDataStore() {
brokerLoadDataTableView.forEach(this::printBrokerLoadData);
}

private void startBrokerLoadDataStoreMonitor() {
try {
while (true) {
Thread.sleep(GLOBAL_STATS_PRINT_PERIOD_MILLIS);
printBrokerLoadDataStore();
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

/**
* Run a monitor from command line arguments.
*
Expand All @@ -481,8 +556,15 @@ public static void main(String[] args) throws Exception {
jc.usage();
PerfClientUtils.exit(1);
}
final ZooKeeper zkClient = new ZooKeeper(arguments.connectString, ZOOKEEPER_TIMEOUT_MILLIS, null);
final BrokerMonitor monitor = new BrokerMonitor(zkClient);
monitor.start();


if (arguments.extensions) {
final BrokerMonitor monitor = new BrokerMonitor(arguments.connectString);
monitor.startBrokerLoadDataStoreMonitor();
} else {
final ZooKeeper zkClient = new ZooKeeper(arguments.connectString, ZOOKEEPER_TIMEOUT_MILLIS, null);
final BrokerMonitor monitor = new BrokerMonitor(zkClient);
monitor.start();
}
}
}