diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterNames.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterNames.java index 350dc958e7bf..19d5da36efb0 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterNames.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterNames.java @@ -34,6 +34,7 @@ public class CounterNames private static final String INPUT = "input"; private static final String OUTPUT = "output"; private static final String SHUFFLE = "shuffle"; + private static final String CPU = "cpu"; private static final String SORT_PROGRESS = "sortProgress"; private static final String SEGMENT_GENERATION_PROGRESS = "segmentGenerationProgress"; private static final String WARNINGS = "warnings"; @@ -68,6 +69,14 @@ public static String shuffleChannel() return SHUFFLE; } + /** + * Standard name for CPU counters created by {@link CounterTracker#cpu}. + */ + public static String cpu() + { + return CPU; + } + /** * Standard name for a sort progress counter created by {@link CounterTracker#sortProgress()}. */ diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterSnapshots.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterSnapshots.java index d75eb1ce8cbe..b1595e9a0ae4 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterSnapshots.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterSnapshots.java @@ -27,6 +27,7 @@ /** * Named counter snapshots. Immutable. Often part of a {@link CounterSnapshotsTree}. + * Created by {@link CounterTracker#snapshot()}. */ public class CounterSnapshots { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterTracker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterTracker.java index c73ead63c119..599a4df92b07 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterTracker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterTracker.java @@ -19,7 +19,12 @@ package org.apache.druid.msq.counters; +import org.apache.druid.frame.processor.FrameProcessor; import org.apache.druid.frame.processor.SuperSorterProgressTracker; +import org.apache.druid.frame.processor.manager.ProcessorManager; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.QueryContext; +import org.apache.druid.utils.JvmUtils; import java.util.HashMap; import java.util.Map; @@ -37,11 +42,55 @@ public class CounterTracker { private final ConcurrentHashMap countersMap = new ConcurrentHashMap<>(); + /** + * See {@link MultiStageQueryContext#getIncludeAllCounters(QueryContext)}. + */ + private final boolean includeAllCounters; + + public CounterTracker(boolean includeAllCounters) + { + this.includeAllCounters = includeAllCounters; + } + public ChannelCounters channel(final String name) { return counter(name, ChannelCounters::new); } + /** + * Returns a {@link CpuCounter} that can be used to accumulate CPU time under a particular label. + */ + public CpuCounter cpu(final String name) + { + return counter(CounterNames.cpu(), CpuCounters::new).forName(name); + } + + /** + * Decorates a {@link FrameProcessor} such that it accumulates CPU time under a particular label. + */ + public FrameProcessor trackCpu(final FrameProcessor processor, final String name) + { + if (JvmUtils.isThreadCpuTimeEnabled()) { + final CpuCounter counter = counter(CounterNames.cpu(), CpuCounters::new).forName(name); + return new CpuTimeAccumulatingFrameProcessor<>(processor, counter); + } else { + return processor; + } + } + + /** + * Decorates a {@link ProcessorManager} such that it accumulates CPU time under a particular label. + */ + public ProcessorManager trackCpu(final ProcessorManager processorManager, final String name) + { + if (JvmUtils.isThreadCpuTimeEnabled()) { + final CpuCounter counter = counter(CounterNames.cpu(), CpuCounters::new).forName(name); + return new CpuTimeAccumulatingProcessorManager<>(processorManager, counter); + } else { + return processorManager; + } + } + public SuperSorterProgressTracker sortProgress() { return counter(CounterNames.sortProgress(), SuperSorterProgressTrackerCounter::new).tracker(); @@ -69,11 +118,23 @@ public CounterSnapshots snapshot() for (final Map.Entry entry : countersMap.entrySet()) { final QueryCounterSnapshot counterSnapshot = entry.getValue().snapshot(); - if (counterSnapshot != null) { + if (counterSnapshot != null && (includeAllCounters || isLegacyCounter(counterSnapshot))) { m.put(entry.getKey(), counterSnapshot); } } return new CounterSnapshots(m); } + + /** + * Returns whether a counter is a "legacy counter" that can be snapshotted regardless of the value of + * {@link MultiStageQueryContext#getIncludeAllCounters(QueryContext)}. + */ + private static boolean isLegacyCounter(final QueryCounterSnapshot counterSnapshot) + { + return counterSnapshot instanceof ChannelCounters.Snapshot + || counterSnapshot instanceof SuperSorterProgressTrackerCounter.Snapshot + || counterSnapshot instanceof WarningCounters.Snapshot + || counterSnapshot instanceof SegmentGenerationProgressCounter.Snapshot; + } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuCounter.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuCounter.java new file mode 100644 index 000000000000..747d0e5aa3bd --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuCounter.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.counters; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.druid.utils.JvmUtils; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; + +public class CpuCounter implements QueryCounter +{ + private final AtomicLong cpuTime = new AtomicLong(); + private final AtomicLong wallTime = new AtomicLong(); + + public void accumulate(final long cpu, final long wall) + { + cpuTime.addAndGet(cpu); + wallTime.addAndGet(wall); + } + + public void run(final Doer doer) throws E + { + final long startCpu = JvmUtils.getCurrentThreadCpuTime(); + final long startWall = System.nanoTime(); + + try { + doer.run(); + } + finally { + accumulate( + JvmUtils.getCurrentThreadCpuTime() - startCpu, + System.nanoTime() - startWall + ); + } + } + + public T run(final Returner returner) throws E + { + final long startCpu = JvmUtils.getCurrentThreadCpuTime(); + final long startWall = System.nanoTime(); + + try { + return returner.run(); + } + finally { + accumulate( + JvmUtils.getCurrentThreadCpuTime() - startCpu, + System.nanoTime() - startWall + ); + } + } + + @Override + public Snapshot snapshot() + { + return new Snapshot(cpuTime.get(), wallTime.get()); + } + + @JsonTypeName("cpu") + public static class Snapshot implements QueryCounterSnapshot + { + private final long cpuTime; + private final long wallTime; + + @JsonCreator + public Snapshot( + @JsonProperty("cpu") long cpuTime, + @JsonProperty("wall") long wallTime + ) + { + this.cpuTime = cpuTime; + this.wallTime = wallTime; + } + + @JsonProperty("cpu") + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + public long getCpuTime() + { + return cpuTime; + } + + @JsonProperty("wall") + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + public long getWallTime() + { + return wallTime; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Snapshot snapshot = (Snapshot) o; + return cpuTime == snapshot.cpuTime && wallTime == snapshot.wallTime; + } + + @Override + public int hashCode() + { + return Objects.hash(cpuTime, wallTime); + } + + @Override + public String toString() + { + return "CpuCounter.Snapshot{" + + "cpuTime=" + cpuTime + + ", wallTime=" + wallTime + + '}'; + } + } + + public interface Doer + { + void run() throws E; + } + + public interface Returner + { + T run() throws E; + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuCounters.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuCounters.java new file mode 100644 index 000000000000..8ab79b302c66 --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuCounters.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.counters; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.annotation.JsonValue; +import com.google.common.base.Preconditions; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +public class CpuCounters implements QueryCounter +{ + public static final String LABEL_MAIN = "main"; + public static final String LABEL_KEY_STATISTICS = "collectKeyStatistics"; + public static final String LABEL_MERGE_INPUT = "mergeInput"; + public static final String LABEL_HASH_PARTITION = "hashPartitionOutput"; + public static final String LABEL_MIX = "mixOutput"; + public static final String LABEL_SORT = "sortOutput"; + + private final ConcurrentHashMap counters = new ConcurrentHashMap<>(); + + public CpuCounter forName(final String name) + { + return counters.computeIfAbsent(name, k -> new CpuCounter()); + } + + @Nullable + @Override + public CpuCounters.Snapshot snapshot() + { + final Map snapshotMap = new HashMap<>(); + for (Map.Entry entry : counters.entrySet()) { + snapshotMap.put(entry.getKey(), entry.getValue().snapshot()); + } + return new Snapshot(snapshotMap); + } + + @JsonTypeName("cpus") + public static class Snapshot implements QueryCounterSnapshot + { + // String keys, not enum, so deserialization is forwards-compatible + private final Map map; + + @JsonCreator + public Snapshot(Map map) + { + this.map = Preconditions.checkNotNull(map, "map"); + } + + @JsonValue + public Map getCountersMap() + { + return map; + } + + @Override + public boolean equals(Object o) + { + + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Snapshot snapshot = (Snapshot) o; + return Objects.equals(map, snapshot.map); + } + + @Override + public int hashCode() + { + return Objects.hash(map); + } + + @Override + public String toString() + { + return "CpuCounters.Snapshot{" + + "map=" + map + + '}'; + } + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuTimeAccumulatingFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuTimeAccumulatingFrameProcessor.java new file mode 100644 index 000000000000..5cb6866a5c43 --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuTimeAccumulatingFrameProcessor.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.counters; + +import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.frame.processor.ReturnOrAwait; +import org.apache.druid.utils.JvmUtils; + +import java.io.IOException; +import java.util.List; + +/** + * Wrapper around {@link FrameProcessor} that accumulates time taken into a {@link CpuCounter}. + */ +public class CpuTimeAccumulatingFrameProcessor implements FrameProcessor +{ + private final FrameProcessor delegate; + private final CpuCounter counter; + + public CpuTimeAccumulatingFrameProcessor(final FrameProcessor delegate, final CpuCounter counter) + { + this.delegate = delegate; + this.counter = counter; + } + + @Override + public List inputChannels() + { + return delegate.inputChannels(); + } + + @Override + public List outputChannels() + { + return delegate.outputChannels(); + } + + @Override + public ReturnOrAwait runIncrementally(IntSet readableInputs) throws InterruptedException, IOException + { + // Can't use counter.run, because it turns "throws InterruptedException, IOException" into "throws Exception". + final long startCpu = JvmUtils.getCurrentThreadCpuTime(); + final long startWall = System.nanoTime(); + + try { + return delegate.runIncrementally(readableInputs); + } + finally { + counter.accumulate( + JvmUtils.getCurrentThreadCpuTime() - startCpu, + System.nanoTime() - startWall + ); + } + } + + @Override + public void cleanup() throws IOException + { + final long startCpu = JvmUtils.getCurrentThreadCpuTime(); + final long startWall = System.nanoTime(); + + try { + delegate.cleanup(); + } + finally { + counter.accumulate( + JvmUtils.getCurrentThreadCpuTime() - startCpu, + System.nanoTime() - startWall + ); + } + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuTimeAccumulatingProcessorManager.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuTimeAccumulatingProcessorManager.java new file mode 100644 index 000000000000..6e04e1d3c18d --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuTimeAccumulatingProcessorManager.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.counters; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.frame.processor.manager.ProcessorAndCallback; +import org.apache.druid.frame.processor.manager.ProcessorManager; + +import java.util.Optional; + +/** + * Wrapper around {@link ProcessorManager} that accumulates time taken into a {@link CpuCounter}. + */ +public class CpuTimeAccumulatingProcessorManager implements ProcessorManager +{ + private final ProcessorManager delegate; + private final CpuCounter counter; + + public CpuTimeAccumulatingProcessorManager(ProcessorManager delegate, CpuCounter counter) + { + this.delegate = delegate; + this.counter = counter; + } + + @Override + public ListenableFuture>> next() + { + // Measure time taken by delegate.next() + final ListenableFuture>> delegateNext = counter.run(delegate::next); + + return FutureUtils.transform( + delegateNext, + + // Don't bother measuring time taken by opt.map, it's very quick. + opt -> opt.map( + pac -> new ProcessorAndCallback<>( + new CpuTimeAccumulatingFrameProcessor<>(pac.processor(), counter), + // Do measure time taken by onComplete(t), though. + t -> counter.run(() -> pac.onComplete(t)) + ) + ) + ); + } + + @Override + public R result() + { + return counter.run(delegate::result); + } + + @Override + public void close() + { + counter.run(delegate::close); + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/NilQueryCounterSnapshot.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/NilQueryCounterSnapshot.java new file mode 100644 index 000000000000..eca3f6d62282 --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/NilQueryCounterSnapshot.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.counters; + +import com.fasterxml.jackson.annotation.JsonTypeName; + +/** + * Represents an unknown counter type. This is the "defaultType" for {@link QueryCounterSnapshot}, so it is + * substituted at deserialization time if the type is unknown. This can happen when running mixed versions, where some + * servers support a newer counter type and some don't. + */ +@JsonTypeName("nil") +public class NilQueryCounterSnapshot implements QueryCounterSnapshot +{ + private NilQueryCounterSnapshot() + { + // Singleton + } + + @Override + public int hashCode() + { + return 0; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + return o != null && getClass() == o.getClass(); + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/QueryCounterSnapshot.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/QueryCounterSnapshot.java index c065f7f8252a..0fe03088ae78 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/QueryCounterSnapshot.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/QueryCounterSnapshot.java @@ -24,8 +24,14 @@ /** * Marker interface for the results of {@link QueryCounter#snapshot()}. No methods, because the only purpose of these * snapshots is to pass things along from worker -> controller -> report. + * + * To support easy adding of new counters, implementations must use forward-compatible deserialization setups. + * In particular, implementations should avoid using enums where new values may be added in the future. + * + * The default impl is {@link NilQueryCounterSnapshot}. This means that readers will see {@link NilQueryCounterSnapshot} + * if they don't understand the particular counter type in play. */ -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = NilQueryCounterSnapshot.class) public interface QueryCounterSnapshot { } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 46a24611351f..77a0b7d48d69 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -2705,6 +2705,7 @@ private void startQueryResultsReader() () -> ArenaMemoryAllocator.createOnHeap(5_000_000), resultReaderExec, cancellationId, + null, MultiStageQueryContext.removeNullBytes(querySpec.getQuery().context()) ); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java index 27779f53251c..e48f1ef098a6 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java @@ -44,6 +44,7 @@ import org.apache.druid.frame.processor.FrameChannelHashPartitioner; import org.apache.druid.frame.processor.FrameChannelMixer; import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.frame.processor.FrameProcessorDecorator; import org.apache.druid.frame.processor.FrameProcessorExecutor; import org.apache.druid.frame.processor.OutputChannel; import org.apache.druid.frame.processor.OutputChannelFactory; @@ -62,6 +63,7 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.msq.counters.CounterNames; import org.apache.druid.msq.counters.CounterTracker; +import org.apache.druid.msq.counters.CpuCounters; import org.apache.druid.msq.indexing.CountingOutputChannelFactory; import org.apache.druid.msq.indexing.InputChannelFactory; import org.apache.druid.msq.indexing.InputChannelsImpl; @@ -243,6 +245,7 @@ private void makeInputSliceReader() () -> ArenaMemoryAllocator.createOnHeap(frameContext.memoryParameters().getStandardFrameSize()), exec, cancellationId, + counterTracker, removeNullBytes ); @@ -348,7 +351,7 @@ private workResultFuture = exec.runAllFully( - processorManager, + counterTracker.trackCpu(processorManager, CpuCounters.LABEL_MAIN), maxOutstandingProcessors, frameContext.processorBouncer(), cancellationId @@ -641,7 +644,7 @@ public void mix(final OutputChannelFactory outputChannelFactory) ); return new ResultAndChannels<>( - exec.runFully(mixer, cancellationId), + exec.runFully(counterTracker.trackCpu(mixer, CpuCounters.LABEL_MIX), cancellationId), OutputChannels.wrap(Collections.singletonList(outputChannel.readOnly())) ); } @@ -723,6 +726,14 @@ public void globalSort( stageDefinition.getSortKey(), partitionBoundariesFuture, exec, + new FrameProcessorDecorator() + { + @Override + public FrameProcessor decorate(FrameProcessor processor) + { + return counterTracker.trackCpu(processor, CpuCounters.LABEL_SORT); + } + }, outputChannelFactory, makeSuperSorterIntermediateOutputChannelFactory(sorterTmpDir), memoryParameters.getSuperSorterMaxActiveProcessors(), @@ -770,7 +781,11 @@ public void hashPartition(final OutputChannelFactory outputChannelFactory) ) ); - final ListenableFuture partitionerFuture = exec.runFully(partitioner, cancellationId); + final ListenableFuture partitionerFuture = + exec.runFully( + counterTracker.trackCpu(partitioner, CpuCounters.LABEL_HASH_PARTITION), + cancellationId + ); final ResultAndChannels retVal = new ResultAndChannels<>(partitionerFuture, OutputChannels.wrap(outputChannels)); @@ -844,6 +859,14 @@ public OutputChannel openNilChannel(int expectedZero) stageDefinition.getSortKey(), Futures.immediateFuture(ClusterByPartitions.oneUniversalPartition()), exec, + new FrameProcessorDecorator() + { + @Override + public FrameProcessor decorate(FrameProcessor processor) + { + return counterTracker.trackCpu(processor, CpuCounters.LABEL_SORT); + } + }, partitionOverrideOutputChannelFactory, makeSuperSorterIntermediateOutputChannelFactory(sorterTmpDir), 1, @@ -929,13 +952,16 @@ private ResultAndChannels gatherResultKeyStatistics(final OutputChannels chan final ListenableFuture clusterByStatisticsCollectorFuture = exec.runAllFully( - ProcessorManagers.of(processors) - .withAccumulation( - stageDefinition.createResultKeyStatisticsCollector( - frameContext.memoryParameters().getPartitionStatisticsMaxRetainedBytes() + counterTracker.trackCpu( + ProcessorManagers.of(processors) + .withAccumulation( + stageDefinition.createResultKeyStatisticsCollector( + frameContext.memoryParameters().getPartitionStatisticsMaxRetainedBytes() + ), + ClusterByStatisticsCollector::addAll ), - ClusterByStatisticsCollector::addAll - ), + CpuCounters.LABEL_KEY_STATISTICS + ), // Run all processors simultaneously. They are lightweight and this keeps things moving. processors.size(), Bouncer.unlimited(), diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index 90f018d07aec..92664feeabbb 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -407,13 +407,14 @@ private void handleNewWorkOrder( kernel.startReading(); final QueryContext queryContext = task != null ? QueryContext.of(task.getContext()) : QueryContext.empty(); + final boolean includeAllCounters = MultiStageQueryContext.getIncludeAllCounters(queryContext); final RunWorkOrder runWorkOrder = new RunWorkOrder( task.getControllerTaskId(), workOrder, inputChannelFactory, stageCounters.computeIfAbsent( IntObjectPair.of(workOrder.getWorkerNumber(), stageDefinition.getId()), - ignored -> new CounterTracker() + ignored -> new CounterTracker(includeAllCounters) ), workerExec, cancellationId, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java index 4e28edc3ac16..341496f7842f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java @@ -27,6 +27,9 @@ import org.apache.druid.initialization.DruidModule; import org.apache.druid.msq.counters.ChannelCounters; import org.apache.druid.msq.counters.CounterSnapshotsSerializer; +import org.apache.druid.msq.counters.CpuCounter; +import org.apache.druid.msq.counters.CpuCounters; +import org.apache.druid.msq.counters.NilQueryCounterSnapshot; import org.apache.druid.msq.counters.SegmentGenerationProgressCounter; import org.apache.druid.msq.counters.SuperSorterProgressTrackerCounter; import org.apache.druid.msq.counters.WarningCounters; @@ -175,6 +178,9 @@ public List getJacksonModules() SuperSorterProgressTrackerCounter.Snapshot.class, WarningCounters.Snapshot.class, SegmentGenerationProgressCounter.Snapshot.class, + CpuCounters.Snapshot.class, + CpuCounter.Snapshot.class, + NilQueryCounterSnapshot.class, // InputSpec classes ExternalInputSpec.class, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index 0fef9d32e6d4..1037aa6c2af0 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -264,13 +264,15 @@ public static Map makeTaskContext( final ImmutableMap.Builder taskContextOverridesBuilder = ImmutableMap.builder(); final long maxParseExceptions = MultiStageQueryContext.getMaxParseExceptions(querySpec.getQuery().context()); final boolean removeNullBytes = MultiStageQueryContext.removeNullBytes(querySpec.getQuery().context()); + final boolean includeAllCounters = MultiStageQueryContext.getIncludeAllCounters(querySpec.getQuery().context()); taskContextOverridesBuilder .put(MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, queryKernelConfig.isDurableStorage()) .put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, maxParseExceptions) .put(MultiStageQueryContext.CTX_IS_REINDEX, MSQControllerTask.isReplaceInputDataSourceTask(querySpec)) .put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, queryKernelConfig.getMaxConcurrentStages()) - .put(MultiStageQueryContext.CTX_REMOVE_NULL_BYTES, removeNullBytes); + .put(MultiStageQueryContext.CTX_REMOVE_NULL_BYTES, removeNullBytes) + .put(MultiStageQueryContext.CTX_INCLUDE_ALL_COUNTERS, includeAllCounters); // Put the lookup loading info in the task context to facilitate selective loading of lookups. if (controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE) != null) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java index 63358467489b..0b3063ef48ba 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java @@ -80,6 +80,7 @@ public class IndexerWorkerContext implements WorkerContext private final ServiceClientFactory clientFactory; private final MemoryIntrospector memoryIntrospector; private final int maxConcurrentStages; + private final boolean includeAllCounters; @GuardedBy("this") private ServiceLocator controllerLocator; @@ -105,7 +106,10 @@ public IndexerWorkerContext( this.clientFactory = clientFactory; this.memoryIntrospector = memoryIntrospector; this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory; - this.maxConcurrentStages = MultiStageQueryContext.getMaxConcurrentStages(QueryContext.of(task.getContext())); + + final QueryContext queryContext = QueryContext.of(task.getContext()); + this.maxConcurrentStages = MultiStageQueryContext.getMaxConcurrentStages(queryContext); + this.includeAllCounters = MultiStageQueryContext.getIncludeAllCounters(queryContext); } public static IndexerWorkerContext createProductionInstance( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/InputChannelsImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/InputChannelsImpl.java index 2c6539f59307..c1429625e993 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/InputChannelsImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/InputChannelsImpl.java @@ -30,6 +30,8 @@ import org.apache.druid.frame.processor.FrameProcessorExecutor; import org.apache.druid.frame.read.FrameReader; import org.apache.druid.frame.write.FrameWriters; +import org.apache.druid.msq.counters.CounterTracker; +import org.apache.druid.msq.counters.CpuCounters; import org.apache.druid.msq.input.stage.InputChannels; import org.apache.druid.msq.input.stage.ReadablePartition; import org.apache.druid.msq.input.stage.ReadablePartitions; @@ -38,6 +40,7 @@ import org.apache.druid.msq.kernel.StageId; import org.apache.druid.msq.kernel.StagePartition; +import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -55,17 +58,23 @@ public class InputChannelsImpl implements InputChannels private final InputChannelFactory channelFactory; private final Supplier allocatorMaker; private final FrameProcessorExecutor exec; - private final String cancellationId; private final Map readablePartitionMap; private final boolean removeNullBytes; + @Nullable + private final String cancellationId; + + @Nullable + private final CounterTracker counterTracker; + public InputChannelsImpl( final QueryDefinition queryDefinition, final ReadablePartitions readablePartitions, final InputChannelFactory channelFactory, final Supplier allocatorMaker, final FrameProcessorExecutor exec, - final String cancellationId, + @Nullable final String cancellationId, + @Nullable final CounterTracker counterTracker, final boolean removeNullBytes ) { @@ -75,6 +84,7 @@ public InputChannelsImpl( this.allocatorMaker = allocatorMaker; this.exec = exec; this.cancellationId = cancellationId; + this.counterTracker = counterTracker; this.removeNullBytes = removeNullBytes; for (final ReadablePartition readablePartition : readablePartitions) { @@ -133,8 +143,6 @@ private ReadableFrameChannel openSorted( FrameWriters.makeRowBasedFrameWriterFactory( new SingleMemoryAllocatorFactory(allocatorMaker.get()), stageDefinition.getFrameReader().signature(), - - // No sortColumns, because FrameChannelMerger generates frames that are sorted all on its own Collections.emptyList(), removeNullBytes ), @@ -146,7 +154,10 @@ private ReadableFrameChannel openSorted( // Discard future, since there is no need to keep it. We aren't interested in its return value. If it fails, // downstream processors are notified through fail(e) on in-memory channels. If we need to cancel it, we use // the cancellationId. - exec.runFully(merger, cancellationId); + exec.runFully( + counterTracker == null ? merger : counterTracker.trackCpu(merger, CpuCounters.LABEL_MERGE_INPUT), + cancellationId + ); return queueChannel.readable(); } @@ -171,7 +182,10 @@ private ReadableFrameChannel openUnsorted( // Discard future, since there is no need to keep it. We aren't interested in its return value. If it fails, // downstream processors are notified through fail(e) on in-memory channels. If we need to cancel it, we use // the cancellationId. - exec.runFully(muxer, cancellationId); + exec.runFully( + counterTracker == null ? muxer : counterTracker.trackCpu(muxer, CpuCounters.LABEL_MERGE_INPUT), + cancellationId + ); return queueChannel.readable(); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java index 9b715f8c8cf9..ed6a7c0e7b9b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java @@ -30,12 +30,14 @@ import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.msq.counters.NilQueryCounterSnapshot; import org.apache.druid.msq.exec.ClusterStatisticsMergeMode; import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.exec.SegmentSource; import org.apache.druid.msq.indexing.destination.MSQSelectDestination; import org.apache.druid.msq.indexing.error.MSQWarnings; import org.apache.druid.msq.kernel.WorkerAssignmentStrategy; +import org.apache.druid.msq.rpc.ControllerResource; import org.apache.druid.msq.sql.MSQMode; import org.apache.druid.query.QueryContext; import org.apache.druid.query.QueryContexts; @@ -168,6 +170,19 @@ public class MultiStageQueryContext public static final String CTX_ARRAY_INGEST_MODE = "arrayIngestMode"; public static final ArrayIngestMode DEFAULT_ARRAY_INGEST_MODE = ArrayIngestMode.ARRAY; + /** + * Whether new counters (anything other than channel, sortProgress, warnings, segmentGenerationProgress) should + * be included in reports. This parameter is necessary because prior to Druid 31, we lacked + * {@link NilQueryCounterSnapshot} as a default counter, which means that {@link SqlStatementResourceHelper} and + * {@link ControllerResource#httpPostCounters} would throw errors when encountering new counter types that they do + * not yet recognize. This causes problems during rolling updates. + * + * Once all servers are on Druid 31 or later, this can safely be flipped to "true". At that point, unknown counters + * are represented on the deserialization side using {@link NilQueryCounterSnapshot}. + */ + public static final String CTX_INCLUDE_ALL_COUNTERS = "includeAllCounters"; + public static final boolean DEFAULT_INCLUDE_ALL_COUNTERS = false; + public static final String CTX_FORCE_TIME_SORT = DimensionsSpec.PARAMETER_FORCE_TIME_SORT; private static final boolean DEFAULT_FORCE_TIME_SORT = DimensionsSpec.DEFAULT_FORCE_TIME_SORT; @@ -365,6 +380,14 @@ public static ArrayIngestMode getArrayIngestMode(final QueryContext queryContext return queryContext.getEnum(CTX_ARRAY_INGEST_MODE, ArrayIngestMode.class, DEFAULT_ARRAY_INGEST_MODE); } + /** + * See {@link #CTX_INCLUDE_ALL_COUNTERS}. + */ + public static boolean getIncludeAllCounters(final QueryContext queryContext) + { + return queryContext.getBoolean(CTX_INCLUDE_ALL_COUNTERS, DEFAULT_INCLUDE_ALL_COUNTERS); + } + public static boolean isForceSegmentSortByTime(final QueryContext queryContext) { return queryContext.getBoolean(CTX_FORCE_TIME_SORT, DEFAULT_FORCE_TIME_SORT); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/counters/CountersSnapshotTreeTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/counters/CountersSnapshotTreeTest.java index a6b0e830a20d..f595ee643dd2 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/counters/CountersSnapshotTreeTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/counters/CountersSnapshotTreeTest.java @@ -19,13 +19,20 @@ package org.apache.druid.msq.counters; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import org.apache.druid.msq.guice.MSQIndexingModule; import org.apache.druid.segment.TestHelper; +import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; import org.junit.Assert; import org.junit.Test; +import java.util.Objects; + public class CountersSnapshotTreeTest { @Test @@ -46,4 +53,93 @@ public void testSerde() throws Exception Assert.assertEquals(snapshotsTree.copyMap(), snapshotsTree2.copyMap()); } + + @Test + public void testSerdeUnknownCounter() throws Exception + { + final ObjectMapper serializationMapper = + TestHelper.makeJsonMapper().registerModules(new MSQIndexingModule().getJacksonModules()); + serializationMapper.registerSubtypes(TestCounterSnapshot.class); + + final ObjectMapper deserializationMapper = + TestHelper.makeJsonMapper().registerModules(new MSQIndexingModule().getJacksonModules()); + + final TestCounter testCounter = new TestCounter(10); + final CounterSnapshotsTree snapshotsTree = new CounterSnapshotsTree(); + snapshotsTree.put(1, 2, new CounterSnapshots(ImmutableMap.of("ctr", testCounter.snapshot()))); + + final String json = serializationMapper.writeValueAsString(snapshotsTree); + final CounterSnapshotsTree snapshotsTree2 = serializationMapper.readValue(json, CounterSnapshotsTree.class); + final CounterSnapshotsTree snapshotsTree3 = deserializationMapper.readValue(json, CounterSnapshotsTree.class); + + Assert.assertEquals(snapshotsTree.copyMap(), snapshotsTree2.copyMap()); + Assert.assertNotEquals(snapshotsTree.copyMap(), snapshotsTree3.copyMap()); + + // Confirm that deserializationMapper reads the TestCounterSnapshot as a NilQueryCounterSnapshot. + MatcherAssert.assertThat( + snapshotsTree3.copyMap().get(1).get(2).getMap().get("ctr"), + CoreMatchers.instanceOf(NilQueryCounterSnapshot.class) + ); + } + + private static class TestCounter implements QueryCounter + { + private final int n; + + public TestCounter(int n) + { + this.n = n; + } + + @Override + public QueryCounterSnapshot snapshot() + { + return new TestCounterSnapshot(n); + } + } + + @JsonTypeName("test") + private static class TestCounterSnapshot implements QueryCounterSnapshot + { + private final int n; + + @JsonCreator + public TestCounterSnapshot(@JsonProperty("n") int n) + { + this.n = n; + } + + @JsonProperty("n") + public int getN() + { + return n; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestCounterSnapshot that = (TestCounterSnapshot) o; + return n == that.n; + } + + @Override + public int hashCode() + { + return Objects.hashCode(n); + } + + @Override + public String toString() + { + return "TestCounterSnapshot{" + + "n=" + n + + '}'; + } + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/counters/CpuCountersTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/counters/CpuCountersTest.java new file mode 100644 index 000000000000..d9fee44ec2db --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/counters/CpuCountersTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.counters; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.Assert; +import org.junit.Test; + +public class CpuCountersTest +{ + @Test + public void test_forName_runDoer() + { + final CpuCounters counters = new CpuCounters(); + final CpuCounter counter = counters.forName("xyz"); + counter.run(() -> { /* Nothing in particular */ }); + final CpuCounters.Snapshot snapshot = counters.snapshot(); + Assert.assertEquals(ImmutableSet.of("xyz"), snapshot.getCountersMap().keySet()); + } + + @Test + public void test_forName_runReturner() + { + final CpuCounters counters = new CpuCounters(); + final CpuCounter counter = counters.forName("xyz"); + Assert.assertEquals("boo", counter.run(() -> "boo")); + final CpuCounters.Snapshot snapshot = counters.snapshot(); + Assert.assertEquals(ImmutableSet.of("xyz"), snapshot.getCountersMap().keySet()); + } + + @Test + public void test_forName_accumulate() + { + final CpuCounters counters = new CpuCounters(); + final CpuCounter counter = counters.forName("xyz"); + counter.accumulate(1L, 1L); + final CpuCounters.Snapshot snapshot = counters.snapshot(); + Assert.assertEquals( + ImmutableMap.of("xyz", new CpuCounter.Snapshot(1L, 1L)), + snapshot.getCountersMap() + ); + } + + @Test + public void test_counter_snapshot_equals() + { + EqualsVerifier.forClass(CpuCounter.Snapshot.class) + .usingGetClass() + .verify(); + } + + @Test + public void test_counters_snapshot_equals() + { + EqualsVerifier.forClass(CpuCounters.Snapshot.class) + .usingGetClass() + .verify(); + } + + @Test + public void test_nil_equals() + { + EqualsVerifier.forClass(NilQueryCounterSnapshot.class) + .usingGetClass() + .verify(); + } +} diff --git a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorDecorator.java b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorDecorator.java new file mode 100644 index 000000000000..f72765dc19fa --- /dev/null +++ b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorDecorator.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor; + +/** + * Passed to {@link SuperSorter} to decorate the processors it launches. + */ +public interface FrameProcessorDecorator +{ + FrameProcessorDecorator NONE = new FrameProcessorDecorator() + { + @Override + public FrameProcessor decorate(FrameProcessor processor) + { + return processor; + } + }; + + FrameProcessor decorate(FrameProcessor processor); +} diff --git a/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java b/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java index 5cb9c0830e49..d2b6934d2926 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java @@ -131,6 +131,7 @@ public class SuperSorter private final List sortKey; private final ListenableFuture outputPartitionsFuture; private final FrameProcessorExecutor exec; + private final FrameProcessorDecorator processorDecorator; private final OutputChannelFactory outputChannelFactory; private final OutputChannelFactory intermediateOutputChannelFactory; private final int maxChannelsPerMerger; @@ -224,6 +225,7 @@ public SuperSorter( final List sortKey, final ListenableFuture outputPartitionsFuture, final FrameProcessorExecutor exec, + final FrameProcessorDecorator processorDecorator, final OutputChannelFactory outputChannelFactory, final OutputChannelFactory intermediateOutputChannelFactory, final int maxActiveProcessors, @@ -239,6 +241,7 @@ public SuperSorter( this.sortKey = sortKey; this.outputPartitionsFuture = outputPartitionsFuture; this.exec = exec; + this.processorDecorator = processorDecorator; this.outputChannelFactory = outputChannelFactory; this.intermediateOutputChannelFactory = intermediateOutputChannelFactory; this.maxChannelsPerMerger = maxChannelsPerMerger; @@ -743,7 +746,7 @@ private void runMerger( private void runWorker(final FrameProcessor worker, final Consumer outConsumer) { Futures.addCallback( - exec.runFully(worker, cancellationId), + exec.runFully(processorDecorator.decorate(worker), cancellationId), new FutureCallback() { @Override diff --git a/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java b/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java index 2690d79c7a5c..36644f6d7715 100644 --- a/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java +++ b/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java @@ -126,6 +126,7 @@ public void testSingleEmptyInputChannel_fileStorage() throws Exception Collections.emptyList(), outputPartitionsFuture, exec, + FrameProcessorDecorator.NONE, new FileOutputChannelFactory(tempFolder, FRAME_SIZE, null), new FileOutputChannelFactory(tempFolder, FRAME_SIZE, null), 2, @@ -161,6 +162,7 @@ public void testSingleEmptyInputChannel_immediately_fileStorage() throws Excepti Collections.emptyList(), Futures.immediateFuture(ClusterByPartitions.oneUniversalPartition()), exec, + FrameProcessorDecorator.NONE, new FileOutputChannelFactory(tempFolder, FRAME_SIZE, null), new FileOutputChannelFactory(tempFolder, FRAME_SIZE, null), 2, @@ -345,6 +347,7 @@ private void verifySuperSorter( clusterBy.getColumns(), clusterByPartitionsFuture, exec, + FrameProcessorDecorator.NONE, new FileOutputChannelFactory(tempFolder, maxBytesPerFrame, null), outputChannelFactory, maxActiveProcessors,