Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add V2 protocol and warmupMessages support for benchMark #3856

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ public static void main(String[] args)
options.addOption("skipwarmup", false, "Skip warm up, default false");
options.addOption("sendlimit", true, "Max number of entries to send. Default 20000000");
options.addOption("latencyFile", true, "File to dump latencies. Default is latencyDump.dat");
options.addOption("useV2", false, "Whether use V2 protocol to send requests to the bookie server");
options.addOption("warmupMessages", true, "Number of messages to warm up. Default 10000");
options.addOption("help", false, "This message");

CommandLineParser parser = new PosixParser();
Expand All @@ -281,6 +283,7 @@ public static void main(String[] args)
}
int throttle = Integer.parseInt(cmd.getOptionValue("throttle", "10000"));
int sendLimit = Integer.parseInt(cmd.getOptionValue("sendlimit", "20000000"));
int warmupMessages = Integer.parseInt(cmd.getOptionValue("warmupMessages", "10000"));

final int sockTimeout = Integer.parseInt(cmd.getOptionValue("sockettimeout", "5"));

Expand Down Expand Up @@ -321,11 +324,15 @@ public void run() {
ClientConfiguration conf = new ClientConfiguration();
conf.setThrottleValue(throttle).setReadTimeout(sockTimeout).setZkServers(servers);

if (cmd.hasOption("useV2")) {
conf.setUseV2WireProtocol(true);
}

if (!cmd.hasOption("skipwarmup")) {
long throughput;
LOG.info("Starting warmup");

throughput = warmUp(data, ledgers, ensemble, quorum, passwd, conf);
throughput = warmUp(data, ledgers, ensemble, quorum, passwd, warmupMessages, conf);
LOG.info("Warmup tp: " + throughput);
LOG.info("Warmup phase finished");
}
Expand Down Expand Up @@ -438,7 +445,7 @@ private static double percentile(long[] latency, int percentile) {
* <p>TODO: update benchmark to use metadata service uri {@link https://github.com/apache/bookkeeper/issues/1331}
*/
private static long warmUp(byte[] data, int ledgers, int ensemble, int qSize,
byte[] passwd, ClientConfiguration conf)
byte[] passwd, int warmupMessages, ClientConfiguration conf)
throws KeeperException, IOException, InterruptedException, BKException {
final CountDownLatch connectLatch = new CountDownLatch(1);
final int bookies;
Expand All @@ -465,7 +472,7 @@ public void process(WatchedEvent event) {
}

BenchThroughputLatency warmup = new BenchThroughputLatency(bookies, bookies, bookies, passwd,
ledgers, 10000, conf);
ledgers, warmupMessages, conf);
warmup.setEntryData(data);
Thread thread = new Thread(warmup);
thread.start();
Expand Down