-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
MSQ: Add CPU and thread usage counters. (#16914)
* MSQ: Add CPU and thread usage counters. The main change adds "cpu" and "wall" counters. The "cpu" counter measures CPU time (using JvmUtils.getCurrentThreadCpuTime) taken up by processors in processing threads. The "wall" counter measures the amount of wall time taken up by processors in those same processing threads. Both counters are broken down by type of processor. This patch also includes changes to support adding new counters. Due to an oversight in the original design, older deserializers are not forwards-compatible; they throw errors when encountering an unknown counter type. To manage this, the following changes are made: 1) The defaultImpl NilQueryCounterSnapshot is added to QueryCounterSnapshot's deserialization configuration. This means that any unrecognized counter types will be read as "nil" by deserializers. Going forward, once all servers are on the latest code, this is enough to enable easily adding new counters. 2) A new context parameter "includeAllCounters" is added, which defaults to "false". When this parameter is set "false", only legacy counters are included. When set to "true", all counters are included. This is currently undocumented. In a future version, we should set the default to "true", and at that time, include a release note that people updating from versions prior to Druid 31 should set this to "false" until their upgrade is complete. * Style, coverage. * Fix.
- Loading branch information
Showing
22 changed files
with
871 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
148 changes: 148 additions & 0 deletions
148
...nsions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuCounter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.druid.msq.counters; | ||
|
||
import com.fasterxml.jackson.annotation.JsonCreator; | ||
import com.fasterxml.jackson.annotation.JsonInclude; | ||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import com.fasterxml.jackson.annotation.JsonTypeName; | ||
import org.apache.druid.utils.JvmUtils; | ||
|
||
import java.util.Objects; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
|
||
public class CpuCounter implements QueryCounter | ||
{ | ||
private final AtomicLong cpuTime = new AtomicLong(); | ||
private final AtomicLong wallTime = new AtomicLong(); | ||
|
||
public void accumulate(final long cpu, final long wall) | ||
{ | ||
cpuTime.addAndGet(cpu); | ||
wallTime.addAndGet(wall); | ||
} | ||
|
||
public <E extends Throwable> void run(final Doer<E> doer) throws E | ||
{ | ||
final long startCpu = JvmUtils.getCurrentThreadCpuTime(); | ||
final long startWall = System.nanoTime(); | ||
|
||
try { | ||
doer.run(); | ||
} | ||
finally { | ||
accumulate( | ||
JvmUtils.getCurrentThreadCpuTime() - startCpu, | ||
System.nanoTime() - startWall | ||
); | ||
} | ||
} | ||
|
||
public <T, E extends Throwable> T run(final Returner<T, E> returner) throws E | ||
{ | ||
final long startCpu = JvmUtils.getCurrentThreadCpuTime(); | ||
final long startWall = System.nanoTime(); | ||
|
||
try { | ||
return returner.run(); | ||
} | ||
finally { | ||
accumulate( | ||
JvmUtils.getCurrentThreadCpuTime() - startCpu, | ||
System.nanoTime() - startWall | ||
); | ||
} | ||
} | ||
|
||
@Override | ||
public Snapshot snapshot() | ||
{ | ||
return new Snapshot(cpuTime.get(), wallTime.get()); | ||
} | ||
|
||
@JsonTypeName("cpu") | ||
public static class Snapshot implements QueryCounterSnapshot | ||
{ | ||
private final long cpuTime; | ||
private final long wallTime; | ||
|
||
@JsonCreator | ||
public Snapshot( | ||
@JsonProperty("cpu") long cpuTime, | ||
@JsonProperty("wall") long wallTime | ||
) | ||
{ | ||
this.cpuTime = cpuTime; | ||
this.wallTime = wallTime; | ||
} | ||
|
||
@JsonProperty("cpu") | ||
@JsonInclude(JsonInclude.Include.NON_DEFAULT) | ||
public long getCpuTime() | ||
{ | ||
return cpuTime; | ||
} | ||
|
||
@JsonProperty("wall") | ||
@JsonInclude(JsonInclude.Include.NON_DEFAULT) | ||
public long getWallTime() | ||
{ | ||
return wallTime; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) | ||
{ | ||
if (this == o) { | ||
return true; | ||
} | ||
if (o == null || getClass() != o.getClass()) { | ||
return false; | ||
} | ||
Snapshot snapshot = (Snapshot) o; | ||
return cpuTime == snapshot.cpuTime && wallTime == snapshot.wallTime; | ||
} | ||
|
||
@Override | ||
public int hashCode() | ||
{ | ||
return Objects.hash(cpuTime, wallTime); | ||
} | ||
|
||
@Override | ||
public String toString() | ||
{ | ||
return "CpuCounter.Snapshot{" + | ||
"cpuTime=" + cpuTime + | ||
", wallTime=" + wallTime + | ||
'}'; | ||
} | ||
} | ||
|
||
public interface Doer<E extends Throwable> | ||
{ | ||
void run() throws E; | ||
} | ||
|
||
public interface Returner<T, E extends Throwable> | ||
{ | ||
T run() throws E; | ||
} | ||
} |
106 changes: 106 additions & 0 deletions
106
...sions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuCounters.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.druid.msq.counters; | ||
|
||
import com.fasterxml.jackson.annotation.JsonCreator; | ||
import com.fasterxml.jackson.annotation.JsonTypeName; | ||
import com.fasterxml.jackson.annotation.JsonValue; | ||
import com.google.common.base.Preconditions; | ||
|
||
import javax.annotation.Nullable; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
|
||
public class CpuCounters implements QueryCounter | ||
{ | ||
public static final String LABEL_MAIN = "main"; | ||
public static final String LABEL_KEY_STATISTICS = "collectKeyStatistics"; | ||
public static final String LABEL_MERGE_INPUT = "mergeInput"; | ||
public static final String LABEL_HASH_PARTITION = "hashPartitionOutput"; | ||
public static final String LABEL_MIX = "mixOutput"; | ||
public static final String LABEL_SORT = "sortOutput"; | ||
|
||
private final ConcurrentHashMap<String, CpuCounter> counters = new ConcurrentHashMap<>(); | ||
|
||
public CpuCounter forName(final String name) | ||
{ | ||
return counters.computeIfAbsent(name, k -> new CpuCounter()); | ||
} | ||
|
||
@Nullable | ||
@Override | ||
public CpuCounters.Snapshot snapshot() | ||
{ | ||
final Map<String, CpuCounter.Snapshot> snapshotMap = new HashMap<>(); | ||
for (Map.Entry<String, CpuCounter> entry : counters.entrySet()) { | ||
snapshotMap.put(entry.getKey(), entry.getValue().snapshot()); | ||
} | ||
return new Snapshot(snapshotMap); | ||
} | ||
|
||
@JsonTypeName("cpus") | ||
public static class Snapshot implements QueryCounterSnapshot | ||
{ | ||
// String keys, not enum, so deserialization is forwards-compatible | ||
private final Map<String, CpuCounter.Snapshot> map; | ||
|
||
@JsonCreator | ||
public Snapshot(Map<String, CpuCounter.Snapshot> map) | ||
{ | ||
this.map = Preconditions.checkNotNull(map, "map"); | ||
} | ||
|
||
@JsonValue | ||
public Map<String, CpuCounter.Snapshot> getCountersMap() | ||
{ | ||
return map; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) | ||
{ | ||
|
||
if (this == o) { | ||
return true; | ||
} | ||
if (o == null || getClass() != o.getClass()) { | ||
return false; | ||
} | ||
Snapshot snapshot = (Snapshot) o; | ||
return Objects.equals(map, snapshot.map); | ||
} | ||
|
||
@Override | ||
public int hashCode() | ||
{ | ||
return Objects.hash(map); | ||
} | ||
|
||
@Override | ||
public String toString() | ||
{ | ||
return "CpuCounters.Snapshot{" + | ||
"map=" + map + | ||
'}'; | ||
} | ||
} | ||
} |
Oops, something went wrong.