Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The total bytes are not equal to sum of all producers output #110

Merged
merged 2 commits into from
Nov 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions app/src/main/java/org/astraea/performance/Metrics.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
package org.astraea.performance;

import java.math.BigDecimal;
import java.math.RoundingMode;

/** Used to record statistics. This is thread safe. */
public class Metrics {
private final long startTime = System.currentTimeMillis();
private double avgLatency;
private long num;
private long max;
Expand Down Expand Up @@ -54,15 +50,6 @@ public synchronized double avgLatency() {
return avgLatency;
}

/** @return the average bytes (in second) */
public synchronized double avgBytes() {
var value = new BigDecimal(bytes);
var scale = new BigDecimal(1024 * 1024);
var time = BigDecimal.valueOf((double) (System.currentTimeMillis() - startTime) / 1000);
if (time.doubleValue() == 0.0) return 0.0;
return value.divide(scale, 3, RoundingMode.UP).divide(time, 3, RoundingMode.UP).doubleValue();
}

/** @return total send/received bytes */
public synchronized long bytes() {
return bytes;
Expand Down
118 changes: 81 additions & 37 deletions app/src/main/java/org/astraea/performance/Tracker.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.astraea.performance;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.astraea.concurrent.ThreadPool;

Expand All @@ -25,23 +27,22 @@ public State execute() throws InterruptedException {
return State.RUNNING;
}

private boolean logProducers() {
var completed = 0L;
var bytes = 0D;
var max = 0L;
var min = Long.MAX_VALUE;
private Duration duration() {
if (start == 0L) start = System.currentTimeMillis();
return Duration.ofMillis(System.currentTimeMillis() - start);
}

for (Metrics data : producerData) {
completed += data.num();
bytes += data.avgBytes();
max = Math.max(max, data.max());
min = Math.min(min, data.min());
}
private static double avg(Duration duration, long value) {
return duration.toSeconds() <= 0
? 0
: ((double) (value / duration.toSeconds())) / 1024D / 1024D;
}

if (completed == 0) return false;
private boolean logProducers() {
var result = result(producerData);
if (result.completedRecords == 0) return false;

if (start == 0L) start = System.currentTimeMillis();
var duration = Duration.ofMillis(System.currentTimeMillis() - start);
var duration = duration();
System.out.println(
"Time: "
+ duration.toHoursPart()
Expand All @@ -50,47 +51,90 @@ private boolean logProducers() {
+ "min "
+ duration.toSecondsPart()
+ "sec");
System.out.printf("producers完成度: %.2f%%%n", ((double) completed * 100.0 / (double) records));
System.out.printf(" 輸出%.3fMB/second%n", bytes);
System.out.println(" 發送max latency: " + max + "ms");
System.out.println(" 發送mim latency: " + min + "ms");
for (int i = 0; i < producerData.size(); ++i) {
System.out.printf(
"producers完成度: %.2f%%%n", ((double) result.completedRecords * 100.0 / (double) records));
System.out.printf(" 輸出%.3fMB/second%n", result.averageBytes(duration));
System.out.println(" 發送max latency: " + result.maxLatency + "ms");
System.out.println(" 發送mim latency: " + result.minLatency + "ms");
for (int i = 0; i < result.bytes.size(); ++i) {
System.out.printf(
" producer[%d]的發送average bytes: %.3fMB%n", i, producerData.get(i).avgBytes());
" producer[%d]的發送average bytes: %.3fMB%n", i, avg(duration, result.bytes.get(i)));
System.out.printf(
" producer[%d]的發送average latency: %.3fms%n", i, producerData.get(i).avgLatency());
" producer[%d]的發送average latency: %.3fms%n", i, result.averageLatencies.get(i));
}
System.out.println("\n");
return completed >= records;
return result.completedRecords >= records;
}

private boolean logConsumers() {
// there is no consumer, so we just complete this log.
if (consumerData.isEmpty()) return true;
var result = result(consumerData);
if (result.completedRecords == 0) return false;
var duration = duration();

System.out.printf(
"consumer完成度: %.2f%%%n", ((double) result.completedRecords * 100.0 / (double) records));
System.out.printf(" 輸入%.3fMB/second%n", result.averageBytes(duration));
System.out.println(" 端到端max latency: " + result.maxLatency + "ms");
System.out.println(" 端到端mim latency: " + result.minLatency + "ms");
for (int i = 0; i < result.bytes.size(); ++i) {
System.out.printf(
" consumer[%d]的端到端average bytes: %.3fMB%n", i, avg(duration, result.bytes.get(i)));
System.out.printf(
" consumer[%d]的端到端average latency: %.3fms%n", i, result.averageLatencies.get(i));
}
System.out.println("\n");
return result.completedRecords >= records;
}

private static Result result(List<Metrics> metrics) {
var completed = 0;
var bytes = 0D;
var bytes = new ArrayList<Long>();
var averageLatencies = new ArrayList<Double>();
var max = 0L;
var min = Long.MAX_VALUE;
for (Metrics data : consumerData) {
for (Metrics data : metrics) {
completed += data.num();
bytes += data.avgBytes();
bytes.add(data.bytes());
averageLatencies.add(data.avgLatency());
max = Math.max(max, data.max());
min = Math.min(min, data.min());
}
return new Result(
completed,
Collections.unmodifiableList(bytes),
Collections.unmodifiableList(averageLatencies),
min,
max);
}

if (completed == 0) return false;
private static class Result {
public final long completedRecords;
public final List<Long> bytes;
public final List<Double> averageLatencies;
public final long minLatency;
public final long maxLatency;

System.out.printf("consumer完成度: %.2f%%%n", ((double) completed * 100.0 / (double) records));
System.out.printf(" 輸入%.3fMB/second%n", bytes);
System.out.println(" 端到端max latency: " + max + "ms");
System.out.println(" 端到端mim latency: " + min + "ms");
for (int i = 0; i < consumerData.size(); ++i) {
System.out.printf(
" consumer[%d]的端到端average bytes: %.3fMB%n", i, consumerData.get(i).avgBytes());
System.out.printf(
" consumer[%d]的端到端average latency: %.3fms%n", i, consumerData.get(i).avgLatency());
Result(
long completedRecords,
List<Long> bytes,
List<Double> averageLatencies,
long minLatency,
long maxLatency) {
this.completedRecords = completedRecords;
this.bytes = bytes;
this.averageLatencies = averageLatencies;
this.minLatency = minLatency;
this.maxLatency = maxLatency;
}

double averageBytes(Duration duration) {
return avg(duration, totalBytes());
}

long totalBytes() {
return bytes.stream().mapToLong(i -> i).sum();
}
System.out.println("\n");
return completed >= records;
}
}