Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix the issue that the RU returned by the DataStreamExecutor was too large #6802

Merged
merged 9 commits into from
Feb 15, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions dbms/src/Flash/Executor/DataStreamExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

#include <Common/FmtUtils.h>
#include <Common/TiFlashMetrics.h>
#include <Common/getNumberOfCPUCores.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Flash/Executor/DataStreamExecutor.h>
Expand All @@ -24,6 +26,8 @@ DataStreamExecutor::DataStreamExecutor(const BlockIO & block_io)
, data_stream(block_io.in)
{
assert(data_stream);
thread_cnt_before_execute = GET_METRIC(tiflash_thread_count, type_active_threads_of_thdpool).Value();
estimate_thread_cnt = std::max(data_stream->estimateNewThreadCount(), 1);
}

ExecutionResult DataStreamExecutor::execute(ResultHandler result_handler)
Expand Down Expand Up @@ -65,11 +69,44 @@ String DataStreamExecutor::toString() const

int DataStreamExecutor::estimateNewThreadCount()
{
return data_stream->estimateNewThreadCount();
return estimate_thread_cnt;
}

RU DataStreamExecutor::collectRequestUnit()
{
return toRU(data_stream->estimateCPUTimeNs());
// The cputime returned by BlockInputSrream is a count of the execution time of each thread.
// However, cputime here is imprecise and is affected by thread scheduling and condition_cv.wait.
// The following attempts to eliminate the effects of thread scheduling.

auto execute_time_ns = data_stream->estimateCPUTimeNs();
UInt64 total_thread_cnt = thread_cnt_before_execute + estimate_thread_cnt;
size_t logical_cpu_cores = getNumberOfLogicalCPUCores();
// When the number of threads is greater than the number of cpu cores,
// BlockInputStream's estimated cpu time will be much greater than the actual value.
if (execute_time_ns <= 0 || total_thread_cnt <= logical_cpu_cores)
return toRU(execute_time_ns);

// Here we use `execute_time_ns / thread_cnt` to get the average execute time of each thread.
// So we have `per_thread_execute_time_ns = execute_time_ns / estimate_thread_cnt`.
// Since a cpu core executes several threads, think of `execute_time_of_thread_A = cputime_of_thread_A + cputime_of_other_thread`.
// Assuming that the cputime of other threads is the same as thread A, then we have `execute_time_of_thread_A = total_thread_cnt * cputime_of_thread_A`.
// Assuming that threads is divided equally among all cpu cores, then `the number of threads allocated to cpu core A = total_thread_cnt / logical_cpu_cores`.
// So we have `per_thread_cputime_ns = per_thread_execute_time_ns / (total_thread_cnt / logical_cpu_cores)`.
// And the number of threads of `data_stream` executed by one cpu core is `estimate_thread_cnt / logical_cpu_cores`.
// So we have `per_core_cputime_ns = per_thread_cputime_ns * (estimate_thread_cnt / logical_cpu_cores)`
// So it can be assumed that
// per_core_cpu_time = per_thread_cputime_ns * (estimate_thread_cnt / logical_cpu_cores)
// = per_thread_execute_time_ns / (total_thread_cnt / logical_cpu_cores) * (estimate_thread_cnt / logical_cpu_cores)
// = per_thread_execute_time_ns / total_thread_cnt * estimate_thread_cnt
// So that we have
// cputime_ns = per_core_cpu_time * logical_cpu_cores
// = per_thread_execute_time_ns / total_thread_cnt * estimate_thread_cnt * logical_cpu_cores
// = execute_time_ns / total_thread_cnt / estimate_thread_cnt * estimate_thread_cnt * logical_cpu_cores
// = execute_time_ns / total_thread_cnt * logical_cpu_cores
auto cpu_time_ns = static_cast<double>(execute_time_ns) / total_thread_cnt * logical_cpu_cores;
// But there is still no way to eliminate the effect of `condition_cv.wait` here...
// We can assume `condition.wait` takes half of datastream execute time.
cpu_time_ns /= 2;
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
return toRU(ceil(cpu_time_ns));
}
} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Flash/Executor/DataStreamExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,7 @@ class DataStreamExecutor : public QueryExecutor

protected:
BlockInputStreamPtr data_stream;
uint64_t thread_cnt_before_execute = 0;
uint64_t estimate_thread_cnt = 0;
};
} // namespace DB