Skip to content

Commit

Permalink
Add support for background cpu time stats (#6518)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #6518

This change contains two parts:
* Adds OperatorStats::backgroundTiming, a new metric for operators to report CPU time spent on background activities (activities that are not running on driver threads).
  * It's the responsibility of individual operators to report background CPU time at a reasonable time granularity, ideally in a lively manner.
* Background CPU time reporting for the Exchange operator.
  * With this change, the Exchange operator now adds background CPU reporting to the existing logic for live reporting of metrics.
  * ExchangeClient is expected to report background CPU time as a runtime metric named "backgroundCpuTimeMs".

Support for background CPU time reporting for other operators will follow in separate changes.

Reviewed By: mbasmanova

Differential Revision: D49155712

fbshipit-source-id: ea63f085d2a4310e9875eb21726751053eadd996
  • Loading branch information
Patrick Stuedi authored and facebook-github-bot committed Sep 14, 2023
1 parent ae6ae03 commit f26912e
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 5 deletions.
13 changes: 13 additions & 0 deletions velox/exec/Exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,23 @@ void Exchange::recordExchangeClientStats() {
}

auto lockedStats = stats_.wlock();
const auto exchangeClientStats = exchangeClient_->stats();
for (const auto& [name, value] : exchangeClient_->stats()) {
lockedStats->runtimeStats.erase(name);
lockedStats->runtimeStats.insert({name, value});
}

auto backgroundCpuTimeMs =
exchangeClientStats.find(ExchangeClient::kBackgroundCpuTimeMs);
if (backgroundCpuTimeMs != exchangeClientStats.end()) {
const CpuWallTiming backgroundTiming{
static_cast<uint64_t>(backgroundCpuTimeMs->second.count),
0,
static_cast<uint64_t>(backgroundCpuTimeMs->second.sum) *
Timestamp::kNanosecondsInMillisecond};
lockedStats->backgroundTiming.clear();
lockedStats->backgroundTiming.add(backgroundTiming);
}
}

VectorSerde* Exchange::getSerde() {
Expand Down
3 changes: 3 additions & 0 deletions velox/exec/ExchangeClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class ExchangeClient {
public:
static constexpr int32_t kDefaultMaxQueuedBytes = 32 << 20; // 32 MB.
static constexpr int32_t kDefaultMaxWaitSeconds = 2;
static inline const std::string kBackgroundCpuTimeMs = "backgroundCpuTimeMs";

ExchangeClient(
std::string taskId,
Expand Down Expand Up @@ -61,6 +62,8 @@ class ExchangeClient {
void close();

// Returns runtime statistics aggregated across all of the exchange sources.
// ExchangeClient is expected to report background CPU time by including a
// runtime metric named ExchangeClient::kBackgroundCpuTimeMs.
folly::F14FastMap<std::string, RuntimeMetric> stats() const;

std::shared_ptr<ExchangeQueue> queue() const {
Expand Down
4 changes: 3 additions & 1 deletion velox/exec/ExchangeSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ class ExchangeSource : public std::enable_shared_from_this<ExchangeSource> {
/// once it received enough data.
virtual void close() = 0;

/// Returns runtime statistics.
// Returns runtime statistics. ExchangeSource is expected to report
// background CPU time by including a runtime metric named
// ExchangeClient::kBackgroundCpuTimeMs.
virtual folly::F14FastMap<std::string, int64_t> stats() const = 0;

virtual std::string toString() {
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,8 @@ void OperatorStats::add(const OperatorStats& other) {

finishTiming.add(other.finishTiming);

backgroundTiming.add(other.backgroundTiming);

memoryStats.add(other.memoryStats);

for (const auto& [name, stats] : other.runtimeStats) {
Expand Down Expand Up @@ -475,6 +477,8 @@ void OperatorStats::clear() {

finishTiming.clear();

backgroundTiming.clear();

memoryStats.clear();

runtimeStats.clear();
Expand Down
5 changes: 5 additions & 0 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ struct OperatorStats {

CpuWallTiming finishTiming;

// CPU time spent on background activities (activities that are not
// running on driver threads). Operators are responsible to report background
// CPU time at a reasonable time granularity.
CpuWallTiming backgroundTiming;

MemoryStats memoryStats;

// Total bytes in memory for spilling
Expand Down
16 changes: 12 additions & 4 deletions velox/exec/tests/MultiFragmentTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,20 @@ class MultiFragmentTest : public HiveConnectorTestBase {

void verifyExchangeStats(
const std::shared_ptr<Task>& task,
int32_t expectedCount) const {
int32_t expectedNumPagesCount,
int32_t expectedBackgroundCpuCount) const {
auto exchangeStats =
task->taskStats().pipelineStats[0].operatorStats[0].runtimeStats;
ASSERT_EQ(1, exchangeStats.count("localExchangeSource.numPages"));
ASSERT_EQ(
expectedCount, exchangeStats.at("localExchangeSource.numPages").count);
expectedNumPagesCount,
exchangeStats.at("localExchangeSource.numPages").count);
ASSERT_EQ(
expectedBackgroundCpuCount,
exchangeStats.at(ExchangeClient::kBackgroundCpuTimeMs).count);
auto cpuBackgroundTimeMs =
task->taskStats().pipelineStats[0].operatorStats[0].backgroundTiming;
ASSERT_EQ(expectedBackgroundCpuCount, cpuBackgroundTimeMs.count);
}

RowTypePtr rowType_{
Expand Down Expand Up @@ -316,7 +324,7 @@ TEST_F(MultiFragmentTest, distributedTableScan) {
auto task =
assertQuery(op, {leafTaskId}, "SELECT c2, c1 % 2, c0 % 10 FROM tmp");

verifyExchangeStats(task, 1);
verifyExchangeStats(task, 1, 1);

ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId();
}
Expand Down Expand Up @@ -464,7 +472,7 @@ TEST_F(MultiFragmentTest, partitionedOutput) {
auto task =
assertQuery(op, intermediateTaskIds, "SELECT c3, c0, c2 FROM tmp");

verifyExchangeStats(task, kFanout);
verifyExchangeStats(task, kFanout, kFanout);

ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId();
}
Expand Down
3 changes: 3 additions & 0 deletions velox/exec/tests/TaskTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,7 @@ DEBUG_ONLY_TEST_F(TaskTest, liveStats) {
EXPECT_EQ(3 * i, operatorStats.outputPositions);
EXPECT_EQ(i, operatorStats.outputVectors);
EXPECT_EQ(0, operatorStats.finishTiming.count);
EXPECT_EQ(0, operatorStats.backgroundTiming.count);

EXPECT_EQ(1, liveStats[i].numTotalDrivers);
EXPECT_EQ(0, liveStats[i].numCompletedDrivers);
Expand All @@ -1094,6 +1095,8 @@ DEBUG_ONLY_TEST_F(TaskTest, liveStats) {
EXPECT_EQ(3 * numBatches, operatorStats.outputPositions);
EXPECT_EQ(numBatches, operatorStats.outputVectors);
EXPECT_EQ(1, operatorStats.finishTiming.count);
// No operators with background CPU time yet.
EXPECT_EQ(0, operatorStats.backgroundTiming.count);
}

TEST_F(TaskTest, outputBufferSize) {
Expand Down
1 change: 1 addition & 0 deletions velox/exec/tests/utils/LocalExchangeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ class LocalExchangeSource : public exec::ExchangeSource {
return {
{"localExchangeSource.numPages", numPages_},
{"localExchangeSource.totalBytes", totalBytes_},
{ExchangeClient::kBackgroundCpuTimeMs, 123},
};
}

Expand Down

0 comments on commit f26912e

Please sign in to comment.