diff --git a/velox/exec/Exchange.cpp b/velox/exec/Exchange.cpp index f41a229a16e0..7599f115b1b1 100644 --- a/velox/exec/Exchange.cpp +++ b/velox/exec/Exchange.cpp @@ -141,10 +141,23 @@ void Exchange::recordExchangeClientStats() { } auto lockedStats = stats_.wlock(); + const auto exchangeClientStats = exchangeClient_->stats(); for (const auto& [name, value] : exchangeClient_->stats()) { lockedStats->runtimeStats.erase(name); lockedStats->runtimeStats.insert({name, value}); } + + auto backgroundCpuTimeMs = + exchangeClientStats.find(ExchangeClient::kBackgroundCpuTimeMs); + if (backgroundCpuTimeMs != exchangeClientStats.end()) { + const CpuWallTiming backgroundTiming{ + static_cast(backgroundCpuTimeMs->second.count), + 0, + static_cast(backgroundCpuTimeMs->second.sum) * + Timestamp::kNanosecondsInMillisecond}; + lockedStats->backgroundTiming.clear(); + lockedStats->backgroundTiming.add(backgroundTiming); + } } VectorSerde* Exchange::getSerde() { diff --git a/velox/exec/ExchangeClient.h b/velox/exec/ExchangeClient.h index c17d787cb811..2f714e028570 100644 --- a/velox/exec/ExchangeClient.h +++ b/velox/exec/ExchangeClient.h @@ -27,6 +27,7 @@ class ExchangeClient { public: static constexpr int32_t kDefaultMaxQueuedBytes = 32 << 20; // 32 MB. static constexpr int32_t kDefaultMaxWaitSeconds = 2; + static inline const std::string kBackgroundCpuTimeMs = "backgroundCpuTimeMs"; ExchangeClient( std::string taskId, @@ -61,6 +62,8 @@ class ExchangeClient { void close(); // Returns runtime statistics aggregated across all of the exchange sources. + // ExchangeClient is expected to report background CPU time by including a + // runtime metric named ExchangeClient::kBackgroundCpuTimeMs. folly::F14FastMap stats() const; std::shared_ptr queue() const { diff --git a/velox/exec/ExchangeSource.h b/velox/exec/ExchangeSource.h index 635ce620f5b6..4fd34d02d6fa 100644 --- a/velox/exec/ExchangeSource.h +++ b/velox/exec/ExchangeSource.h @@ -91,7 +91,9 @@ class ExchangeSource : public std::enable_shared_from_this { /// once it received enough data. virtual void close() = 0; - /// Returns runtime statistics. + // Returns runtime statistics. ExchangeSource is expected to report + // background CPU time by including a runtime metric named + // ExchangeClient::kBackgroundCpuTimeMs. virtual folly::F14FastMap stats() const = 0; virtual std::string toString() { diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 823cb93d5419..24b939bca0d4 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -438,6 +438,8 @@ void OperatorStats::add(const OperatorStats& other) { finishTiming.add(other.finishTiming); + backgroundTiming.add(other.backgroundTiming); + memoryStats.add(other.memoryStats); for (const auto& [name, stats] : other.runtimeStats) { @@ -475,6 +477,8 @@ void OperatorStats::clear() { finishTiming.clear(); + backgroundTiming.clear(); + memoryStats.clear(); runtimeStats.clear(); diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index ef2d66d6737d..fad3883b005d 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -124,6 +124,11 @@ struct OperatorStats { CpuWallTiming finishTiming; + // CPU time spent on background activities (activities that are not + // running on driver threads). Operators are responsible to report background + // CPU time at a reasonable time granularity. + CpuWallTiming backgroundTiming; + MemoryStats memoryStats; // Total bytes in memory for spilling diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index 778158913803..8053081e4d60 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -146,12 +146,20 @@ class MultiFragmentTest : public HiveConnectorTestBase { void verifyExchangeStats( const std::shared_ptr& task, - int32_t expectedCount) const { + int32_t expectedNumPagesCount, + int32_t expectedBackgroundCpuCount) const { auto exchangeStats = task->taskStats().pipelineStats[0].operatorStats[0].runtimeStats; ASSERT_EQ(1, exchangeStats.count("localExchangeSource.numPages")); ASSERT_EQ( - expectedCount, exchangeStats.at("localExchangeSource.numPages").count); + expectedNumPagesCount, + exchangeStats.at("localExchangeSource.numPages").count); + ASSERT_EQ( + expectedBackgroundCpuCount, + exchangeStats.at(ExchangeClient::kBackgroundCpuTimeMs).count); + auto cpuBackgroundTimeMs = + task->taskStats().pipelineStats[0].operatorStats[0].backgroundTiming; + ASSERT_EQ(expectedBackgroundCpuCount, cpuBackgroundTimeMs.count); } RowTypePtr rowType_{ @@ -316,7 +324,7 @@ TEST_F(MultiFragmentTest, distributedTableScan) { auto task = assertQuery(op, {leafTaskId}, "SELECT c2, c1 % 2, c0 % 10 FROM tmp"); - verifyExchangeStats(task, 1); + verifyExchangeStats(task, 1, 1); ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); } @@ -464,7 +472,7 @@ TEST_F(MultiFragmentTest, partitionedOutput) { auto task = assertQuery(op, intermediateTaskIds, "SELECT c3, c0, c2 FROM tmp"); - verifyExchangeStats(task, kFanout); + verifyExchangeStats(task, kFanout, kFanout); ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); } diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index 06dd1706a179..5051da08946d 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -1074,6 +1074,7 @@ DEBUG_ONLY_TEST_F(TaskTest, liveStats) { EXPECT_EQ(3 * i, operatorStats.outputPositions); EXPECT_EQ(i, operatorStats.outputVectors); EXPECT_EQ(0, operatorStats.finishTiming.count); + EXPECT_EQ(0, operatorStats.backgroundTiming.count); EXPECT_EQ(1, liveStats[i].numTotalDrivers); EXPECT_EQ(0, liveStats[i].numCompletedDrivers); @@ -1094,6 +1095,8 @@ DEBUG_ONLY_TEST_F(TaskTest, liveStats) { EXPECT_EQ(3 * numBatches, operatorStats.outputPositions); EXPECT_EQ(numBatches, operatorStats.outputVectors); EXPECT_EQ(1, operatorStats.finishTiming.count); + // No operators with background CPU time yet. + EXPECT_EQ(0, operatorStats.backgroundTiming.count); } TEST_F(TaskTest, outputBufferSize) { diff --git a/velox/exec/tests/utils/LocalExchangeSource.cpp b/velox/exec/tests/utils/LocalExchangeSource.cpp index a7f6d934e23a..5cff372652f0 100644 --- a/velox/exec/tests/utils/LocalExchangeSource.cpp +++ b/velox/exec/tests/utils/LocalExchangeSource.cpp @@ -158,6 +158,7 @@ class LocalExchangeSource : public exec::ExchangeSource { return { {"localExchangeSource.numPages", numPages_}, {"localExchangeSource.totalBytes", totalBytes_}, + {ExchangeClient::kBackgroundCpuTimeMs, 123}, }; }