Skip to content

Commit

Permalink
The total bytes are not equal to sum of all producers output (#110)
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 authored Nov 16, 2021
1 parent cc17536 commit e8e2f9e
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 50 deletions.
13 changes: 0 additions & 13 deletions app/src/main/java/org/astraea/performance/Metrics.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
package org.astraea.performance;

import java.math.BigDecimal;
import java.math.RoundingMode;

/** Used to record statistics. This is thread safe. */
public class Metrics {
private final long startTime = System.currentTimeMillis();
private double avgLatency;
private long num;
private long max;
Expand Down Expand Up @@ -54,15 +50,6 @@ public synchronized double avgLatency() {
return avgLatency;
}

/** @return the average bytes (in second) */
public synchronized double avgBytes() {
var value = new BigDecimal(bytes);
var scale = new BigDecimal(1024 * 1024);
var time = BigDecimal.valueOf((double) (System.currentTimeMillis() - startTime) / 1000);
if (time.doubleValue() == 0.0) return 0.0;
return value.divide(scale, 3, RoundingMode.UP).divide(time, 3, RoundingMode.UP).doubleValue();
}

/** @return total send/received bytes */
public synchronized long bytes() {
return bytes;
Expand Down
118 changes: 81 additions & 37 deletions app/src/main/java/org/astraea/performance/Tracker.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.astraea.performance;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.astraea.concurrent.ThreadPool;

Expand All @@ -25,23 +27,22 @@ public State execute() throws InterruptedException {
return State.RUNNING;
}

private boolean logProducers() {
var completed = 0L;
var bytes = 0D;
var max = 0L;
var min = Long.MAX_VALUE;
private Duration duration() {
if (start == 0L) start = System.currentTimeMillis();
return Duration.ofMillis(System.currentTimeMillis() - start);
}

for (Metrics data : producerData) {
completed += data.num();
bytes += data.avgBytes();
max = Math.max(max, data.max());
min = Math.min(min, data.min());
}
private static double avg(Duration duration, long value) {
return duration.toSeconds() <= 0
? 0
: ((double) (value / duration.toSeconds())) / 1024D / 1024D;
}

if (completed == 0) return false;
private boolean logProducers() {
var result = result(producerData);
if (result.completedRecords == 0) return false;

if (start == 0L) start = System.currentTimeMillis();
var duration = Duration.ofMillis(System.currentTimeMillis() - start);
var duration = duration();
System.out.println(
"Time: "
+ duration.toHoursPart()
Expand All @@ -50,47 +51,90 @@ private boolean logProducers() {
+ "min "
+ duration.toSecondsPart()
+ "sec");
System.out.printf("producers完成度: %.2f%%%n", ((double) completed * 100.0 / (double) records));
System.out.printf(" 輸出%.3fMB/second%n", bytes);
System.out.println(" 發送max latency: " + max + "ms");
System.out.println(" 發送mim latency: " + min + "ms");
for (int i = 0; i < producerData.size(); ++i) {
System.out.printf(
"producers完成度: %.2f%%%n", ((double) result.completedRecords * 100.0 / (double) records));
System.out.printf(" 輸出%.3fMB/second%n", result.averageBytes(duration));
System.out.println(" 發送max latency: " + result.maxLatency + "ms");
System.out.println(" 發送mim latency: " + result.minLatency + "ms");
for (int i = 0; i < result.bytes.size(); ++i) {
System.out.printf(
" producer[%d]的發送average bytes: %.3fMB%n", i, producerData.get(i).avgBytes());
" producer[%d]的發送average bytes: %.3fMB%n", i, avg(duration, result.bytes.get(i)));
System.out.printf(
" producer[%d]的發送average latency: %.3fms%n", i, producerData.get(i).avgLatency());
" producer[%d]的發送average latency: %.3fms%n", i, result.averageLatencies.get(i));
}
System.out.println("\n");
return completed >= records;
return result.completedRecords >= records;
}

private boolean logConsumers() {
// there is no consumer, so we just complete this log.
if (consumerData.isEmpty()) return true;
var result = result(consumerData);
if (result.completedRecords == 0) return false;
var duration = duration();

System.out.printf(
"consumer完成度: %.2f%%%n", ((double) result.completedRecords * 100.0 / (double) records));
System.out.printf(" 輸入%.3fMB/second%n", result.averageBytes(duration));
System.out.println(" 端到端max latency: " + result.maxLatency + "ms");
System.out.println(" 端到端mim latency: " + result.minLatency + "ms");
for (int i = 0; i < result.bytes.size(); ++i) {
System.out.printf(
" consumer[%d]的端到端average bytes: %.3fMB%n", i, avg(duration, result.bytes.get(i)));
System.out.printf(
" consumer[%d]的端到端average latency: %.3fms%n", i, result.averageLatencies.get(i));
}
System.out.println("\n");
return result.completedRecords >= records;
}

private static Result result(List<Metrics> metrics) {
var completed = 0;
var bytes = 0D;
var bytes = new ArrayList<Long>();
var averageLatencies = new ArrayList<Double>();
var max = 0L;
var min = Long.MAX_VALUE;
for (Metrics data : consumerData) {
for (Metrics data : metrics) {
completed += data.num();
bytes += data.avgBytes();
bytes.add(data.bytes());
averageLatencies.add(data.avgLatency());
max = Math.max(max, data.max());
min = Math.min(min, data.min());
}
return new Result(
completed,
Collections.unmodifiableList(bytes),
Collections.unmodifiableList(averageLatencies),
min,
max);
}

if (completed == 0) return false;
private static class Result {
public final long completedRecords;
public final List<Long> bytes;
public final List<Double> averageLatencies;
public final long minLatency;
public final long maxLatency;

System.out.printf("consumer完成度: %.2f%%%n", ((double) completed * 100.0 / (double) records));
System.out.printf(" 輸入%.3fMB/second%n", bytes);
System.out.println(" 端到端max latency: " + max + "ms");
System.out.println(" 端到端mim latency: " + min + "ms");
for (int i = 0; i < consumerData.size(); ++i) {
System.out.printf(
" consumer[%d]的端到端average bytes: %.3fMB%n", i, consumerData.get(i).avgBytes());
System.out.printf(
" consumer[%d]的端到端average latency: %.3fms%n", i, consumerData.get(i).avgLatency());
Result(
long completedRecords,
List<Long> bytes,
List<Double> averageLatencies,
long minLatency,
long maxLatency) {
this.completedRecords = completedRecords;
this.bytes = bytes;
this.averageLatencies = averageLatencies;
this.minLatency = minLatency;
this.maxLatency = maxLatency;
}

double averageBytes(Duration duration) {
return avg(duration, totalBytes());
}

long totalBytes() {
return bytes.stream().mapToLong(i -> i).sum();
}
System.out.println("\n");
return completed >= records;
}
}

0 comments on commit e8e2f9e

Please sign in to comment.