Skip to content

Commit

Permalink
QueryPerformanceRecorder: Group Batched Operations as a Single Query (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind authored Nov 18, 2023
1 parent a4e6c5b commit 97b8407
Show file tree
Hide file tree
Showing 48 changed files with 2,789 additions and 1,851 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1268,11 +1268,8 @@ void handleUncaughtException(Exception throwable) {
final BasePerformanceEntry basePerformanceEntry =
initialFilterExecution.getBasePerformanceEntry();
if (basePerformanceEntry != null) {
final QueryPerformanceNugget outerNugget =
QueryPerformanceRecorder.getInstance().getOuterNugget();
if (outerNugget != null) {
outerNugget.addBaseEntry(basePerformanceEntry);
}
QueryPerformanceRecorder.getInstance().getEnclosingNugget()
.accumulate(basePerformanceEntry);
}
}
currentMapping.initializePreviousValue();
Expand Down Expand Up @@ -1516,11 +1513,7 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc
} finally {
final BasePerformanceEntry baseEntry = jobScheduler.getAccumulatedPerformance();
if (baseEntry != null) {
final QueryPerformanceNugget outerNugget =
QueryPerformanceRecorder.getInstance().getOuterNugget();
if (outerNugget != null) {
outerNugget.addBaseEntry(baseEntry);
}
QueryPerformanceRecorder.getInstance().getEnclosingNugget().accumulate(baseEntry);
}
}
}
Expand Down Expand Up @@ -3572,12 +3565,9 @@ public static void checkInitiateBinaryOperation(@NotNull final Table first, @Not
}

private <R> R applyInternal(@NotNull final Function<Table, R> function) {
final QueryPerformanceNugget nugget =
QueryPerformanceRecorder.getInstance().getNugget("apply(" + function + ")");
try {
try (final SafeCloseable ignored =
QueryPerformanceRecorder.getInstance().getNugget("apply(" + function + ")")) {
return function.apply(this);
} finally {
nugget.done();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,9 @@ private void completionRoutine(TableUpdate upstream, JobScheduler jobScheduler,
getUpdateGraph().addNotification(new TerminalNotification() {
@Override
public void run() {
synchronized (accumulated) {
final PerformanceEntry entry = getEntry();
if (entry != null) {
entry.accumulate(accumulated);
}
final PerformanceEntry entry = getEntry();
if (entry != null) {
entry.accumulate(accumulated);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.deephaven.base.log.LogOutputAppendable;
import io.deephaven.base.verify.Assert;
import io.deephaven.util.profiling.ThreadProfiler;
import org.jetbrains.annotations.NotNull;

import static io.deephaven.engine.table.impl.lang.QueryLanguageFunctionUtils.minus;
import static io.deephaven.engine.table.impl.lang.QueryLanguageFunctionUtils.plus;
Expand All @@ -15,13 +16,13 @@
* A smaller entry that simply records usage data, meant for aggregating into the larger entry.
*/
public class BasePerformanceEntry implements LogOutputAppendable {
private long intervalUsageNanos;
private long usageNanos;

private long intervalCpuNanos;
private long intervalUserCpuNanos;
private long cpuNanos;
private long userCpuNanos;

private long intervalAllocatedBytes;
private long intervalPoolAllocatedBytes;
private long allocatedBytes;
private long poolAllocatedBytes;

private long startTimeNanos;

Expand All @@ -31,26 +32,26 @@ public class BasePerformanceEntry implements LogOutputAppendable {
private long startAllocatedBytes;
private long startPoolAllocatedBytes;

public void onBaseEntryStart() {
public synchronized void onBaseEntryStart() {
startAllocatedBytes = ThreadProfiler.DEFAULT.getCurrentThreadAllocatedBytes();
startPoolAllocatedBytes = QueryPerformanceRecorder.getPoolAllocatedBytesForCurrentThread();
startPoolAllocatedBytes = QueryPerformanceRecorderState.getPoolAllocatedBytesForCurrentThread();

startUserCpuNanos = ThreadProfiler.DEFAULT.getCurrentThreadUserTime();
startCpuNanos = ThreadProfiler.DEFAULT.getCurrentThreadCpuTime();
startTimeNanos = System.nanoTime();
}

public void onBaseEntryEnd() {
intervalUserCpuNanos = plus(intervalUserCpuNanos,
public synchronized void onBaseEntryEnd() {
userCpuNanos = plus(userCpuNanos,
minus(ThreadProfiler.DEFAULT.getCurrentThreadUserTime(), startUserCpuNanos));
intervalCpuNanos =
plus(intervalCpuNanos, minus(ThreadProfiler.DEFAULT.getCurrentThreadCpuTime(), startCpuNanos));
cpuNanos =
plus(cpuNanos, minus(ThreadProfiler.DEFAULT.getCurrentThreadCpuTime(), startCpuNanos));

intervalUsageNanos += System.nanoTime() - startTimeNanos;
usageNanos += System.nanoTime() - startTimeNanos;

intervalPoolAllocatedBytes = plus(intervalPoolAllocatedBytes,
minus(QueryPerformanceRecorder.getPoolAllocatedBytesForCurrentThread(), startPoolAllocatedBytes));
intervalAllocatedBytes = plus(intervalAllocatedBytes,
poolAllocatedBytes = plus(poolAllocatedBytes,
minus(QueryPerformanceRecorderState.getPoolAllocatedBytesForCurrentThread(), startPoolAllocatedBytes));
allocatedBytes = plus(allocatedBytes,
minus(ThreadProfiler.DEFAULT.getCurrentThreadAllocatedBytes(), startAllocatedBytes));

startAllocatedBytes = 0;
Expand All @@ -61,46 +62,76 @@ public void onBaseEntryEnd() {
startTimeNanos = 0;
}

void baseEntryReset() {
synchronized void baseEntryReset() {
Assert.eqZero(startTimeNanos, "startTimeNanos");

intervalUsageNanos = 0;
usageNanos = 0;

intervalCpuNanos = 0;
intervalUserCpuNanos = 0;
cpuNanos = 0;
userCpuNanos = 0;

intervalAllocatedBytes = 0;
intervalPoolAllocatedBytes = 0;
allocatedBytes = 0;
poolAllocatedBytes = 0;
}

public long getIntervalUsageNanos() {
return intervalUsageNanos;
/**
* Get the aggregate usage in nanoseconds. This getter should be called by exclusive owners of the entry, and never
* concurrently with mutators.
*
* @return total wall clock time in nanos
*/
public long getUsageNanos() {
return usageNanos;
}

public long getIntervalCpuNanos() {
return intervalCpuNanos;
/**
* Get the aggregate cpu time in nanoseconds. This getter should be called by exclusive owners of the entry, and
* never concurrently with mutators.
*
* @return total cpu time in nanos
*/
public long getCpuNanos() {
return cpuNanos;
}

public long getIntervalUserCpuNanos() {
return intervalUserCpuNanos;
/**
* Get the aggregate cpu user time in nanoseconds. This getter should be called by exclusive owners of the entry,
* and never concurrently with mutators.
*
* @return total cpu user time in nanos
*/
public long getUserCpuNanos() {
return userCpuNanos;
}

public long getIntervalAllocatedBytes() {
return intervalAllocatedBytes;
/**
* Get the aggregate allocated memory in bytes. This getter should be called by exclusive owners of the entry, and
* never concurrently with mutators.
*
* @return The bytes of allocated memory attributed to the instrumented operation.
*/
public long getAllocatedBytes() {
return allocatedBytes;
}

public long getIntervalPoolAllocatedBytes() {
return intervalPoolAllocatedBytes;
/**
* Get allocated pooled/reusable memory attributed to the instrumented operation in bytes. This getter should be
* called by exclusive owners of the entry, and never concurrently with mutators.
*
* @return total pool allocated memory in bytes
*/
public long getPoolAllocatedBytes() {
return poolAllocatedBytes;
}

@Override
public LogOutput append(LogOutput logOutput) {
public LogOutput append(@NotNull final LogOutput logOutput) {
final LogOutput currentValues = logOutput.append("BasePerformanceEntry{")
.append(", intervalUsageNanos=").append(intervalUsageNanos)
.append(", intervalCpuNanos=").append(intervalCpuNanos)
.append(", intervalUserCpuNanos=").append(intervalUserCpuNanos)
.append(", intervalAllocatedBytes=").append(intervalAllocatedBytes)
.append(", intervalPoolAllocatedBytes=").append(intervalPoolAllocatedBytes);
.append(", intervalUsageNanos=").append(usageNanos)
.append(", intervalCpuNanos=").append(cpuNanos)
.append(", intervalUserCpuNanos=").append(userCpuNanos)
.append(", intervalAllocatedBytes=").append(allocatedBytes)
.append(", intervalPoolAllocatedBytes=").append(poolAllocatedBytes);
return appendStart(currentValues)
.append('}');
}
Expand All @@ -114,12 +145,17 @@ LogOutput appendStart(LogOutput logOutput) {
.append(", startPoolAllocatedBytes=").append(startPoolAllocatedBytes);
}

public void accumulate(BasePerformanceEntry entry) {
this.intervalUsageNanos += entry.intervalUsageNanos;
this.intervalCpuNanos = plus(this.intervalCpuNanos, entry.intervalCpuNanos);
this.intervalUserCpuNanos = plus(this.intervalUserCpuNanos, entry.intervalUserCpuNanos);

this.intervalAllocatedBytes = plus(this.intervalAllocatedBytes, entry.intervalAllocatedBytes);
this.intervalPoolAllocatedBytes = plus(this.intervalPoolAllocatedBytes, entry.intervalPoolAllocatedBytes);
/**
* Accumulate the values from another entry into this one. The provided entry will not be mutated.
*
* @param entry the entry to accumulate
*/
public synchronized void accumulate(@NotNull final BasePerformanceEntry entry) {
this.usageNanos += entry.usageNanos;
this.cpuNanos = plus(this.cpuNanos, entry.cpuNanos);
this.userCpuNanos = plus(this.userCpuNanos, entry.userCpuNanos);

this.allocatedBytes = plus(this.allocatedBytes, entry.allocatedBytes);
this.poolAllocatedBytes = plus(this.poolAllocatedBytes, entry.poolAllocatedBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
import io.deephaven.io.log.impl.LogOutputStringImpl;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.QueryConstants;
import org.jetbrains.annotations.NotNull;

/**
* Entry class for tracking the performance characteristics of a single recurring update event.
*/
public class PerformanceEntry extends BasePerformanceEntry implements TableListener.Entry {
private final int id;
private final int evaluationNumber;
private final long id;
private final long evaluationNumber;
private final int operationNumber;
private final String description;
private final String callerLine;
Expand All @@ -42,7 +43,7 @@ public class PerformanceEntry extends BasePerformanceEntry implements TableListe
private final RuntimeMemory.Sample startSample;
private final RuntimeMemory.Sample endSample;

PerformanceEntry(final int id, final int evaluationNumber, final int operationNumber,
PerformanceEntry(final long id, final long evaluationNumber, final int operationNumber,
final String description, final String callerLine, final String updateGraphName) {
this.id = id;
this.evaluationNumber = evaluationNumber;
Expand Down Expand Up @@ -114,24 +115,24 @@ public String toString() {
}

@Override
public LogOutput append(final LogOutput logOutput) {
public LogOutput append(@NotNull final LogOutput logOutput) {
final LogOutput beginning = logOutput.append("PerformanceEntry{")
.append(", id=").append(id)
.append(", evaluationNumber=").append(evaluationNumber)
.append(", operationNumber=").append(operationNumber)
.append(", description='").append(description).append('\'')
.append(", callerLine='").append(callerLine).append('\'')
.append(", authContext=").append(authContext)
.append(", intervalUsageNanos=").append(getIntervalUsageNanos())
.append(", intervalCpuNanos=").append(getIntervalCpuNanos())
.append(", intervalUserCpuNanos=").append(getIntervalUserCpuNanos())
.append(", intervalUsageNanos=").append(getUsageNanos())
.append(", intervalCpuNanos=").append(getCpuNanos())
.append(", intervalUserCpuNanos=").append(getUserCpuNanos())
.append(", intervalInvocationCount=").append(intervalInvocationCount)
.append(", intervalAdded=").append(intervalAdded)
.append(", intervalRemoved=").append(intervalRemoved)
.append(", intervalModified=").append(intervalModified)
.append(", intervalShifted=").append(intervalShifted)
.append(", intervalAllocatedBytes=").append(getIntervalAllocatedBytes())
.append(", intervalPoolAllocatedBytes=").append(getIntervalPoolAllocatedBytes())
.append(", intervalAllocatedBytes=").append(getAllocatedBytes())
.append(", intervalPoolAllocatedBytes=").append(getPoolAllocatedBytes())
.append(", maxTotalMemory=").append(maxTotalMemory)
.append(", minFreeMemory=").append(minFreeMemory)
.append(", collections=").append(collections)
Expand All @@ -140,11 +141,11 @@ public LogOutput append(final LogOutput logOutput) {
.append('}');
}

public int getId() {
public long getId() {
return id;
}

public int getEvaluationNumber() {
public long getEvaluationNumber() {
return evaluationNumber;
}

Expand Down Expand Up @@ -217,7 +218,7 @@ public long getIntervalInvocationCount() {
*/
boolean shouldLogEntryInterval() {
return intervalInvocationCount > 0 &&
UpdatePerformanceTracker.LOG_THRESHOLD.shouldLog(getIntervalUsageNanos());
UpdatePerformanceTracker.LOG_THRESHOLD.shouldLog(getUsageNanos());
}

public void accumulate(PerformanceEntry entry) {
Expand Down
Loading

0 comments on commit 97b8407

Please sign in to comment.