Skip to content

Commit

Permalink
[refactor](profilev2) add BlocksProduced RowsProduced counter apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange authored and superdiaodiao committed Nov 21, 2023
1 parent 9b7496f commit 76cba47
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 1 deletion.
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
_split_block_distribute_by_channel_timer =
ADD_TIMER(_profile, "SplitBlockDistributeByChannelTime");
_blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1);
_rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1);
_overall_throughput = _profile->add_derived_counter(
"OverallThroughput", TUnit::BYTES_PER_SECOND,
std::bind<int64_t>(&RuntimeProfile::units_per_second, _bytes_sent_counter,
Expand Down Expand Up @@ -309,6 +310,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
SourceState source_state) {
auto& local_state = get_local_state(state);
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows());
SCOPED_TIMER(local_state.exec_time_counter());
local_state._peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
bool all_receiver_eof = true;
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {

RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; }
RuntimeProfile::Counter* blocks_sent_counter() { return _blocks_sent_counter; }
RuntimeProfile::Counter* rows_sent_counter() { return _rows_sent_counter; }
RuntimeProfile::Counter* local_send_timer() { return _local_send_timer; }
RuntimeProfile::Counter* local_bytes_send_counter() { return _local_bytes_send_counter; }
RuntimeProfile::Counter* local_sent_rows() { return _local_sent_rows; }
Expand Down Expand Up @@ -192,6 +193,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
RuntimeProfile::Counter* _split_block_hash_compute_timer = nullptr;
RuntimeProfile::Counter* _split_block_distribute_by_channel_timer = nullptr;
RuntimeProfile::Counter* _blocks_sent_counter = nullptr;
RuntimeProfile::Counter* _rows_sent_counter = nullptr;
// Throughput per total time spent in sender
RuntimeProfile::Counter* _overall_throughput = nullptr;
// Used to counter send bytes under local data exchange
Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info)
state->execution_timeout()));
_result_sink_dependency =
ResultSinkDependency::create_shared(_parent->operator_id(), _parent->node_id());

_blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1);
_rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1);
((PipBufferControlBlock*)_sender.get())->set_dependency(_result_sink_dependency);
return Status::OK();
}
Expand Down Expand Up @@ -131,6 +132,8 @@ Status ResultSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block,
SourceState source_state) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows());
COUNTER_UPDATE(local_state.blocks_sent_counter(), 1);
if (_fetch_option.use_two_phase_fetch && block->rows() > 0) {
RETURN_IF_ERROR(_second_phase_fetch_data(state, block));
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/result_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class ResultSinkLocalState final : public PipelineXSinkLocalState<> {
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
WriteDependency* dependency() override { return _result_sink_dependency.get(); }
RuntimeProfile::Counter* blocks_sent_counter() { return _blocks_sent_counter; }
RuntimeProfile::Counter* rows_sent_counter() { return _rows_sent_counter; }

private:
friend class ResultSinkOperatorX;
Expand All @@ -71,6 +73,8 @@ class ResultSinkLocalState final : public PipelineXSinkLocalState<> {
std::shared_ptr<BufferControlBlock> _sender;
std::shared_ptr<ResultWriter> _writer;
std::shared_ptr<ResultSinkDependency> _result_sink_dependency;
RuntimeProfile::Counter* _blocks_sent_counter = nullptr;
RuntimeProfile::Counter* _rows_sent_counter = nullptr;
};

class ResultSinkOperatorX final : public DataSinkOperatorX<ResultSinkLocalState> {
Expand Down

0 comments on commit 76cba47

Please sign in to comment.