From 38f3d826d343009205c449387045895b20846794 Mon Sep 17 00:00:00 2001 From: kakachen Date: Fri, 1 Aug 2025 18:05:19 +0800 Subject: [PATCH] [enhancement](be_metrics) update scan bytes metric in file_scanner. --- be/src/apache-orc | 2 +- be/src/io/fs/tracing_file_reader.h | 58 +++++++++++++++ be/src/io/io_common.h | 8 +++ be/src/pipeline/exec/file_scan_operator.h | 1 + be/src/service/internal_service.cpp | 2 + .../exec/format/arrow/arrow_stream_reader.cpp | 15 +++- .../exec/format/arrow/arrow_stream_reader.h | 1 + be/src/vec/exec/format/csv/csv_reader.cpp | 6 +- .../new_plain_text_line_reader.cpp | 20 ++---- .../file_reader/new_plain_text_line_reader.h | 2 - be/src/vec/exec/format/generic_reader.h | 5 ++ .../vec/exec/format/json/new_json_reader.cpp | 15 ++-- be/src/vec/exec/format/json/new_json_reader.h | 2 - be/src/vec/exec/format/orc/vorc_reader.cpp | 40 +++++------ be/src/vec/exec/format/orc/vorc_reader.h | 37 ++++++---- .../exec/format/parquet/vparquet_reader.cpp | 51 +++++++------ .../vec/exec/format/parquet/vparquet_reader.h | 14 ++-- .../exec/format/table/table_format_reader.h | 2 + be/src/vec/exec/scan/new_olap_scanner.cpp | 9 +++ be/src/vec/exec/scan/vfile_scanner.cpp | 71 ++++++++++++++++++- be/src/vec/exec/scan/vfile_scanner.h | 6 ++ 21 files changed, 276 insertions(+), 91 deletions(-) create mode 100644 be/src/io/fs/tracing_file_reader.h diff --git a/be/src/apache-orc b/be/src/apache-orc index ef68c6ff736a84..70b673c7d29969 160000 --- a/be/src/apache-orc +++ b/be/src/apache-orc @@ -1 +1 @@ -Subproject commit ef68c6ff736a84c8c7185d4a08397c67eff53ad6 +Subproject commit 70b673c7d299690ced7c4c600fabaee9e1601198 diff --git a/be/src/io/fs/tracing_file_reader.h b/be/src/io/fs/tracing_file_reader.h new file mode 100644 index 00000000000000..84eb3dfc8fb569 --- /dev/null +++ b/be/src/io/fs/tracing_file_reader.h @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include "common/status.h" +#include "io/fs/file_reader.h" +#include "util/runtime_profile.h" + +namespace doris { + +namespace io { + +class TracingFileReader : public FileReader { +public: + TracingFileReader(doris::io::FileReaderSPtr inner, FileReaderStats* stats) + : _inner(std::move(inner)), _stats(stats) {} + + Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, + const IOContext* io_ctx) override { + SCOPED_RAW_TIMER(&_stats->read_time_ns); + Status st = _inner->read_at(offset, result, bytes_read, io_ctx); + _stats->read_calls++; + _stats->read_bytes += *bytes_read; + return st; + } + + Status close() override { return _inner->close(); } + const doris::io::Path& path() const override { return _inner->path(); } + size_t size() const override { return _inner->size(); } + bool closed() const override { return _inner->closed(); } + const std::string& get_data_dir_path() override { return _inner->get_data_dir_path(); } + + void _collect_profile_at_runtime() override { return _inner->collect_profile_at_runtime(); } + void _collect_profile_before_close() override { return _inner->collect_profile_before_close(); } + + FileReaderStats* stats() const { return _stats; } + +private: + doris::io::FileReaderSPtr _inner; + FileReaderStats* _stats; +}; + +} // namespace io +} // namespace doris diff --git a/be/src/io/io_common.h b/be/src/io/io_common.h index 909941181d3bcb..6934aa6a75a519 100644 --- a/be/src/io/io_common.h +++ b/be/src/io/io_common.h @@ -35,6 +35,13 @@ enum class ReaderType : uint8_t { namespace io { +struct FileReaderStats { + size_t read_calls = 0; + size_t read_bytes = 0; + int64_t read_time_ns = 0; + size_t read_rows = 0; +}; + struct FileCacheStatistics { int64_t num_local_io_total = 0; int64_t num_remote_io_total = 0; @@ -73,6 +80,7 @@ struct IOContext { int64_t expiration_time = 0; const TUniqueId* query_id = nullptr; // Ref FileCacheStatistics* file_cache_stats = nullptr; // Ref + FileReaderStats* file_reader_stats = nullptr; // Ref bool is_inverted_index = false; // if is_dryrun, read IO will download data to cache but return no data to reader // useful to skip cache data read from local disk to accelarate warm up diff --git a/be/src/pipeline/exec/file_scan_operator.h b/be/src/pipeline/exec/file_scan_operator.h index 25635dcdd62c28..f4a89d4bdec914 100644 --- a/be/src/pipeline/exec/file_scan_operator.h +++ b/be/src/pipeline/exec/file_scan_operator.h @@ -55,6 +55,7 @@ class FileScanLocalState final : public ScanLocalState { std::string name_suffix() const override; private: + friend class vectorized::VFileScanner; std::shared_ptr _split_source = nullptr; int _max_scanners; // A in memory cache to save some common components diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index d33a8240a95151..a1c5c98ce45cf3 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -825,6 +825,8 @@ void PInternalService::fetch_table_schema(google::protobuf::RpcController* contr io::IOContext io_ctx; io::FileCacheStatistics file_cache_statis; io_ctx.file_cache_stats = &file_cache_statis; + io::FileReaderStats file_reader_stats; + io_ctx.file_reader_stats = &file_reader_stats; // file_slots is no use, but the lifetime should be longer than reader std::vector file_slots; switch (params.format_type) { diff --git a/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp index efe8e36bf20368..f5888c88ce2480 100644 --- a/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp +++ b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp @@ -25,6 +25,7 @@ #include "arrow_pip_input_stream.h" #include "common/logging.h" #include "io/fs/stream_load_pipe.h" +#include "io/fs/tracing_file_reader.h" #include "runtime/descriptors.h" #include "runtime/runtime_state.h" #include "vec/core/block.h" @@ -43,14 +44,22 @@ ArrowStreamReader::ArrowStreamReader(RuntimeState* state, RuntimeProfile* profil const TFileRangeDesc& range, const std::vector& file_slot_descs, io::IOContext* io_ctx) - : _state(state), _range(range), _file_slot_descs(file_slot_descs), _file_reader(nullptr) { + : _state(state), + _range(range), + _file_slot_descs(file_slot_descs), + _io_ctx(io_ctx), + _file_reader(nullptr) { TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, _ctzz); } ArrowStreamReader::~ArrowStreamReader() = default; Status ArrowStreamReader::init_reader() { - RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state, false)); + io::FileReaderSPtr file_reader; + RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &file_reader, _state, false)); + _file_reader = _io_ctx ? std::make_shared(std::move(file_reader), + _io_ctx->file_reader_stats) + : file_reader; _pip_stream = ArrowPipInputStream::create_unique(_file_reader); return Status::OK(); } @@ -121,4 +130,4 @@ Status ArrowStreamReader::get_columns(std::unordered_map& _file_slot_descs; + io::IOContext* _io_ctx; io::FileReaderSPtr _file_reader; std::unique_ptr _pip_stream; cctz::time_zone _ctzz; diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index a853137eeb0945..8eef40f7875b9b 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -39,6 +39,7 @@ #include "io/fs/buffered_reader.h" #include "io/fs/file_reader.h" #include "io/fs/s3_file_reader.h" +#include "io/fs/tracing_file_reader.h" #include "runtime/descriptors.h" #include "runtime/runtime_state.h" #include "util/string_util.h" @@ -541,10 +542,13 @@ Status CsvReader::_create_file_reader(bool need_schema) { _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0; io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state, _file_description); - _file_reader = DORIS_TRY(io::DelegateReader::create_file_reader( + auto file_reader = DORIS_TRY(io::DelegateReader::create_file_reader( _profile, _system_properties, _file_description, reader_options, io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx, io::PrefetchRange(_range.start_offset, _range.start_offset + _range.size))); + _file_reader = _io_ctx ? std::make_shared(std::move(file_reader), + _io_ctx->file_reader_stats) + : file_reader; } if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM && _params.file_type != TFileType::FILE_BROKER) { diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp index e66c622a1e9aa3..a068a748b1127d 100644 --- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp +++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp @@ -221,12 +221,8 @@ NewPlainTextLineReader::NewPlainTextLineReader(RuntimeProfile* profile, _more_input_bytes(0), _more_output_bytes(0), _current_offset(current_offset), - _bytes_read_counter(nullptr), - _read_timer(nullptr), _bytes_decompress_counter(nullptr), _decompress_timer(nullptr) { - _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES); - _read_timer = ADD_TIMER(_profile, "FileReadTime"); _bytes_decompress_counter = ADD_COUNTER(_profile, "BytesDecompressed", TUnit::BYTES); _decompress_timer = ADD_TIMER(_profile, "DecompressTime"); } @@ -384,16 +380,12 @@ Status NewPlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool } } - { - SCOPED_TIMER(_read_timer); - Slice file_slice(file_buf, buffer_len); - RETURN_IF_ERROR( - _file_reader->read_at(_current_offset, file_slice, &read_len, io_ctx)); - _current_offset += read_len; - if (read_len == 0) { - _file_eof = true; - } - COUNTER_UPDATE(_bytes_read_counter, read_len); + Slice file_slice(file_buf, buffer_len); + RETURN_IF_ERROR( + _file_reader->read_at(_current_offset, file_slice, &read_len, io_ctx)); + _current_offset += read_len; + if (read_len == 0) { + _file_eof = true; } if (_file_eof || read_len == 0) { if (!stream_end) { diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h index 3e8983214c0491..730dc2e9cd9224 100644 --- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h +++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h @@ -332,8 +332,6 @@ class NewPlainTextLineReader : public LineReader { size_t _current_offset; // Profile counters - RuntimeProfile::Counter* _bytes_read_counter = nullptr; - RuntimeProfile::Counter* _read_timer = nullptr; RuntimeProfile::Counter* _bytes_decompress_counter = nullptr; RuntimeProfile::Counter* _decompress_timer = nullptr; }; diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index 2dfe906bf77ea8..c3efc321e2fba6 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -74,6 +74,11 @@ class GenericReader : public ProfileCollector { virtual Status close() { return Status::OK(); } + /// The reader is responsible for counting the number of rows read, + /// because some readers, such as parquet/orc, + /// can skip some pages/rowgroups through indexes. + virtual bool count_read_rows() { return false; } + protected: const size_t _MIN_BATCH_SIZE = 4064; // 4094 - 32(padding) diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index d9956e99333860..fae64caa119d96 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -46,6 +46,7 @@ #include "io/fs/buffered_reader.h" #include "io/fs/file_reader.h" #include "io/fs/stream_load_pipe.h" +#include "io/fs/tracing_file_reader.h" #include "runtime/define_primitive_type.h" #include "runtime/descriptors.h" #include "runtime/runtime_state.h" @@ -95,9 +96,7 @@ NewJsonReader::NewJsonReader(RuntimeState* state, RuntimeProfile* profile, Scann _scanner_eof(scanner_eof), _current_offset(0), _io_ctx(io_ctx) { - _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES); _read_timer = ADD_TIMER(_profile, "ReadTime"); - _file_read_timer = ADD_TIMER(_profile, "FileReadTime"); if (_range.__isset.compress_type) { // for compatibility _file_compress_type = _range.compress_type; @@ -438,10 +437,13 @@ Status NewJsonReader::_open_file_reader(bool need_schema) { _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0; io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state, _file_description); - _file_reader = DORIS_TRY(io::DelegateReader::create_file_reader( + auto file_reader = DORIS_TRY(io::DelegateReader::create_file_reader( _profile, _system_properties, _file_description, reader_options, io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx, io::PrefetchRange(_range.start_offset, _range.size))); + _file_reader = _io_ctx ? std::make_shared(std::move(file_reader), + _io_ctx->file_reader_stats) + : file_reader; } return Status::OK(); } @@ -662,7 +664,7 @@ Status NewJsonReader::_parse_json(bool* is_empty_row, bool* eof) { // return Status::OK() if parse succeed or reach EOF. Status NewJsonReader::_parse_json_doc(size_t* size, bool* eof) { // read a whole message - SCOPED_TIMER(_file_read_timer); + SCOPED_TIMER(_read_timer); const uint8_t* json_str = nullptr; std::unique_ptr json_str_ptr; if (_line_reader != nullptr) { @@ -675,7 +677,6 @@ Status NewJsonReader::_parse_json_doc(size_t* size, bool* eof) { } } - _bytes_read_counter += *size; if (*eof) { return Status::OK(); } @@ -1931,7 +1932,7 @@ Status NewJsonReader::_append_error_msg(simdjson::ondemand::object* obj, std::st Status NewJsonReader::_simdjson_parse_json(size_t* size, bool* is_empty_row, bool* eof, simdjson::error_code* error) { - SCOPED_TIMER(_file_read_timer); + SCOPED_TIMER(_read_timer); // step1: read buf from pipe. if (_line_reader != nullptr) { RETURN_IF_ERROR(_line_reader->read_line(&_json_str, size, eof, _io_ctx)); @@ -1992,7 +1993,7 @@ Status NewJsonReader::_judge_empty_row(size_t size, bool eof, bool* is_empty_row Status NewJsonReader::_get_json_value(size_t* size, bool* eof, simdjson::error_code* error, bool* is_empty_row) { - SCOPED_TIMER(_file_read_timer); + SCOPED_TIMER(_read_timer); auto return_quality_error = [&](fmt::memory_buffer& error_msg, const std::string& doc_info) -> Status { _counter->num_rows_filtered++; diff --git a/be/src/vec/exec/format/json/new_json_reader.h b/be/src/vec/exec/format/json/new_json_reader.h index 7805286bc845c7..9f0a39a98b7502 100644 --- a/be/src/vec/exec/format/json/new_json_reader.h +++ b/be/src/vec/exec/format/json/new_json_reader.h @@ -261,9 +261,7 @@ class NewJsonReader : public GenericReader { io::IOContext* _io_ctx = nullptr; - RuntimeProfile::Counter* _bytes_read_counter = nullptr; RuntimeProfile::Counter* _read_timer = nullptr; - RuntimeProfile::Counter* _file_read_timer = nullptr; // ======SIMD JSON====== // name mapping diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index a35155199eb15c..b8952c7a6957c4 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -113,9 +113,6 @@ static constexpr int decimal_scale_for_hive11 = 10; M(TypeIndex::Float64, Float64, orc::DoubleVectorBatch) void ORCFileInputStream::read(void* buf, uint64_t length, uint64_t offset) { - _statistics->fs_read_calls++; - _statistics->fs_read_bytes += length; - SCOPED_RAW_TIMER(&_statistics->fs_read_time); uint64_t has_read = 0; char* out = reinterpret_cast(buf); while (has_read < length) { @@ -124,7 +121,7 @@ void ORCFileInputStream::read(void* buf, uint64_t length, uint64_t offset) { } size_t loop_read; Slice result(out + has_read, length - has_read); - Status st = _file_reader->read_at(offset + has_read, result, &loop_read, _io_ctx); + Status st = _tracing_file_reader->read_at(offset + has_read, result, &loop_read, _io_ctx); if (!st.ok()) { throw orc::ParseError( strings::Substitute("Failed to read $0: $1", _file_name, st.to_string())); @@ -141,9 +138,6 @@ void ORCFileInputStream::read(void* buf, uint64_t length, uint64_t offset) { } void StripeStreamInputStream::read(void* buf, uint64_t length, uint64_t offset) { - _statistics->fs_read_calls++; - _statistics->fs_read_bytes += length; - SCOPED_RAW_TIMER(&_statistics->fs_read_time); uint64_t has_read = 0; char* out = reinterpret_cast(buf); while (has_read < length) { @@ -217,9 +211,6 @@ OrcReader::~OrcReader() { void OrcReader::_collect_profile_before_close() { if (_profile != nullptr) { - COUNTER_UPDATE(_orc_profile.read_time, _statistics.fs_read_time); - COUNTER_UPDATE(_orc_profile.read_calls, _statistics.fs_read_calls); - COUNTER_UPDATE(_orc_profile.read_bytes, _statistics.fs_read_bytes); COUNTER_UPDATE(_orc_profile.column_read_time, _statistics.column_read_time); COUNTER_UPDATE(_orc_profile.get_batch_time, _statistics.get_batch_time); COUNTER_UPDATE(_orc_profile.create_reader_time, _statistics.create_reader_time); @@ -245,10 +236,6 @@ void OrcReader::_init_profile() { if (_profile != nullptr) { static const char* orc_profile = "OrcReader"; ADD_TIMER_WITH_LEVEL(_profile, orc_profile, 1); - _orc_profile.read_time = ADD_TIMER_WITH_LEVEL(_profile, "FileReadTime", 1); - _orc_profile.read_calls = ADD_COUNTER_WITH_LEVEL(_profile, "FileReadCalls", TUnit::UNIT, 1); - _orc_profile.read_bytes = - ADD_COUNTER_WITH_LEVEL(_profile, "FileReadBytes", TUnit::BYTES, 1); _orc_profile.column_read_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ColumnReadTime", orc_profile, 1); _orc_profile.get_batch_time = @@ -290,7 +277,7 @@ Status OrcReader::_create_file_reader() { _profile, _system_properties, _file_description, reader_options, io::DelegateReader::AccessMode::RANDOM, _io_ctx)); _file_input_stream = std::make_unique( - _scan_range.path, std::move(inner_reader), &_statistics, _io_ctx, _profile, + _scan_range.path, std::move(inner_reader), _io_ctx, _profile, _orc_once_max_read_bytes, _orc_max_merge_distance_bytes); } if (_file_input_stream->getLength() == 0) { @@ -1840,6 +1827,9 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { _reader_metrics.SelectedRowGroupCount); COUNTER_UPDATE(_orc_profile.evaluated_row_group_count, _reader_metrics.EvaluatedRowGroupCount); + if (_io_ctx) { + _io_ctx->file_reader_stats->read_rows += _reader_metrics.ReadRowCount; + } } if (_orc_filter) { RETURN_IF_ERROR(_orc_filter->get_status()); @@ -2757,6 +2747,14 @@ void ORCFileInputStream::_build_small_ranges_input_stripe_streams( auto merge_range_file_reader = std::make_shared(_profile, _file_reader, merged_range); + std::shared_ptr tracing_file_reader; + if (_io_ctx) { + tracing_file_reader = std::make_shared( + std::move(merge_range_file_reader), _io_ctx->file_reader_stats); + } else { + tracing_file_reader = std::move(merge_range_file_reader); + } + // Use binary search to find the starting point in sorted_ranges auto it = std::lower_bound(sorted_ranges.begin(), sorted_ranges.end(), @@ -2769,7 +2767,7 @@ void ORCFileInputStream::_build_small_ranges_input_stripe_streams( ++it) { if (it->second.end_offset <= merged_range.end_offset) { auto stripe_stream_input_stream = std::make_shared( - getName(), merge_range_file_reader, _statistics, _io_ctx, _profile); + getName(), tracing_file_reader, _io_ctx, _profile); streams.emplace(it->first, stripe_stream_input_stream); _stripe_streams.emplace_back(stripe_stream_input_stream); } @@ -2782,10 +2780,12 @@ void ORCFileInputStream::_build_large_ranges_input_stripe_streams( std::unordered_map>& streams) { for (const auto& range : ranges) { auto stripe_stream_input_stream = std::make_shared( - getName(), _file_reader, _statistics, _io_ctx, _profile); - streams.emplace(range.first, - std::make_shared(getName(), _file_reader, - _statistics, _io_ctx, _profile)); + getName(), + _io_ctx ? std::make_shared(_file_reader, + _io_ctx->file_reader_stats) + : _file_reader, + _io_ctx, _profile); + streams.emplace(range.first, stripe_stream_input_stream); _stripe_streams.emplace_back(stripe_stream_input_stream); } } diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index cc33034b2cf1fe..febb72028572f2 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -38,6 +38,7 @@ #include "io/fs/buffered_reader.h" #include "io/fs/file_reader.h" #include "io/fs/file_reader_writer_fwd.h" +#include "io/fs/tracing_file_reader.h" #include "olap/olap_common.h" #include "orc/Reader.hh" #include "orc/Type.hh" @@ -121,9 +122,6 @@ class OrcReader : public GenericReader { } struct Statistics { - int64_t fs_read_time = 0; - int64_t fs_read_calls = 0; - int64_t fs_read_bytes = 0; int64_t column_read_time = 0; int64_t get_batch_time = 0; int64_t create_reader_time = 0; @@ -220,6 +218,8 @@ class OrcReader : public GenericReader { } static const orc::Type& remove_acid(const orc::Type& type); + bool count_read_rows() override { return true; } + protected: void _collect_profile_before_close() override; @@ -686,11 +686,9 @@ class OrcReader : public GenericReader { class StripeStreamInputStream : public orc::InputStream, public ProfileCollector { public: StripeStreamInputStream(const std::string& file_name, io::FileReaderSPtr inner_reader, - OrcReader::Statistics* statistics, const io::IOContext* io_ctx, - RuntimeProfile* profile) + const io::IOContext* io_ctx, RuntimeProfile* profile) : _file_name(file_name), _inner_reader(inner_reader), - _statistics(statistics), _io_ctx(io_ctx), _profile(profile) {} @@ -727,7 +725,6 @@ class StripeStreamInputStream : public orc::InputStream, public ProfileCollector const std::string& _file_name; io::FileReaderSPtr _inner_reader; // Owned by OrcReader - OrcReader::Statistics* _statistics = nullptr; const io::IOContext* _io_ctx = nullptr; RuntimeProfile* _profile = nullptr; }; @@ -735,21 +732,22 @@ class StripeStreamInputStream : public orc::InputStream, public ProfileCollector class ORCFileInputStream : public orc::InputStream, public ProfileCollector { public: ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr inner_reader, - OrcReader::Statistics* statistics, const io::IOContext* io_ctx, - RuntimeProfile* profile, int64_t orc_once_max_read_bytes, - int64_t orc_max_merge_distance_bytes) + const io::IOContext* io_ctx, RuntimeProfile* profile, + int64_t orc_once_max_read_bytes, int64_t orc_max_merge_distance_bytes) : _file_name(file_name), _inner_reader(inner_reader), _file_reader(inner_reader), + _tracing_file_reader(io_ctx ? std::make_shared( + _file_reader, io_ctx->file_reader_stats) + : _file_reader), _orc_once_max_read_bytes(orc_once_max_read_bytes), _orc_max_merge_distance_bytes(orc_max_merge_distance_bytes), - _statistics(statistics), _io_ctx(io_ctx), _profile(profile) {} ~ORCFileInputStream() override { - if (_file_reader != nullptr) { - _file_reader->collect_profile_before_close(); + if (_tracing_file_reader != nullptr) { + _tracing_file_reader->collect_profile_before_close(); } for (const auto& stripe_stream : _stripe_streams) { if (stripe_stream != nullptr) { @@ -759,7 +757,7 @@ class ORCFileInputStream : public orc::InputStream, public ProfileCollector { _stripe_streams.clear(); } - uint64_t getLength() const override { return _file_reader->size(); } + uint64_t getLength() const override { return _tracing_file_reader->size(); } uint64_t getNaturalReadSize() const override { return config::orc_natural_read_size_mb << 20; } @@ -778,6 +776,8 @@ class ORCFileInputStream : public orc::InputStream, public ProfileCollector { io::FileReaderSPtr& get_inner_reader() { return _inner_reader; } + io::FileReaderSPtr& get_tracing_file_reader() { return _tracing_file_reader; } + protected: void _collect_profile_at_runtime() override {}; void _collect_profile_before_close() override; @@ -796,8 +796,16 @@ class ORCFileInputStream : public orc::InputStream, public ProfileCollector { std::unordered_map>& streams); const std::string& _file_name; + + // _inner_reader is original file reader. + // _file_reader == RangeCacheFileReader used by tiny stripe case, if not tiny stripe case, + // _file_reader == _inner_reader. + // _tracing_file_reader is tracing file reader with io context. + // If io_ctx is null, _tracing_file_reader will be the same as _file_reader. io::FileReaderSPtr _inner_reader; io::FileReaderSPtr _file_reader; + io::FileReaderSPtr _tracing_file_reader; + bool _is_all_tiny_stripes = false; int64_t _orc_once_max_read_bytes; int64_t _orc_max_merge_distance_bytes; @@ -805,7 +813,6 @@ class ORCFileInputStream : public orc::InputStream, public ProfileCollector { std::vector> _stripe_streams; // Owned by OrcReader - OrcReader::Statistics* _statistics = nullptr; const io::IOContext* _io_ctx = nullptr; RuntimeProfile* _profile = nullptr; }; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 316b19701c747f..331e96962509c4 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -32,6 +32,7 @@ #include "io/fs/buffered_reader.h" #include "io/fs/file_reader.h" #include "io/fs/file_reader_writer_fwd.h" +#include "io/fs/tracing_file_reader.h" #include "parquet_pred_cmp.h" #include "parquet_thrift_util.h" #include "runtime/define_primitive_type.h" @@ -118,6 +119,12 @@ ParquetReader::~ParquetReader() { _close_internal(); } +// for unit test +void ParquetReader::set_file_reader(io::FileReaderSPtr file_reader) { + _file_reader = file_reader; + _tracing_file_reader = file_reader; +} + void ParquetReader::_init_profile() { if (_profile != nullptr) { static const char* parquet_profile = "ParquetReader"; @@ -157,14 +164,8 @@ void ParquetReader::_init_profile() { ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PageIndexParseTime", parquet_profile, 1); _parquet_profile.row_group_filter_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "RowGroupFilterTime", parquet_profile, 1); - - _parquet_profile.file_read_time = ADD_TIMER_WITH_LEVEL(_profile, "FileReadTime", 1); - _parquet_profile.file_read_calls = - ADD_COUNTER_WITH_LEVEL(_profile, "FileReadCalls", TUnit::UNIT, 1); _parquet_profile.file_meta_read_calls = ADD_COUNTER_WITH_LEVEL(_profile, "FileMetaReadCalls", TUnit::UNIT, 1); - _parquet_profile.file_read_bytes = - ADD_COUNTER_WITH_LEVEL(_profile, "FileReadBytes", TUnit::BYTES, 1); _parquet_profile.decompress_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecompressTime", parquet_profile, 1); _parquet_profile.decompress_cnt = ADD_CHILD_COUNTER_WITH_LEVEL( @@ -215,18 +216,22 @@ Status ParquetReader::_open_file() { _file_reader = DORIS_TRY(io::DelegateReader::create_file_reader( _profile, _system_properties, _file_description, reader_options, io::DelegateReader::AccessMode::RANDOM, _io_ctx)); + _tracing_file_reader = _io_ctx ? std::make_shared( + _file_reader, _io_ctx->file_reader_stats) + : _file_reader; } if (_file_metadata == nullptr) { SCOPED_RAW_TIMER(&_statistics.parse_footer_time); - if (_file_reader->size() <= sizeof(PARQUET_VERSION_NUMBER)) { + if (_tracing_file_reader->size() <= sizeof(PARQUET_VERSION_NUMBER)) { // Some system may generate parquet file with only 4 bytes: PAR1 // Should consider it as empty file. return Status::EndOfFile("open file failed, empty parquet file {} with size: {}", - _scan_range.path, _file_reader->size()); + _scan_range.path, _tracing_file_reader->size()); } size_t meta_size = 0; if (_meta_cache == nullptr) { - auto st = parse_thrift_footer(_file_reader, &_file_metadata, &meta_size, _io_ctx); + auto st = + parse_thrift_footer(_tracing_file_reader, &_file_metadata, &meta_size, _io_ctx); // wrap it with unique ptr, so that it can be released finally. _file_metadata_ptr.reset(_file_metadata); RETURN_IF_ERROR(st); @@ -235,7 +240,7 @@ Status ParquetReader::_open_file() { // parse magic number & parse meta data _column_statistics.meta_read_calls += 1; } else { - RETURN_IF_ERROR(_meta_cache->get_parquet_footer(_file_reader, _io_ctx, + RETURN_IF_ERROR(_meta_cache->get_parquet_footer(_tracing_file_reader, _io_ctx, _file_description.mtime, &meta_size, &_meta_cache_handle)); _column_statistics.read_bytes += meta_size; @@ -608,9 +613,12 @@ Status ParquetReader::_next_row_group_reader() { _profile, _file_reader, io_ranges) : _file_reader; } - _current_group_reader.reset(new RowGroupReader( - group_file_reader, _read_table_columns, row_group_index.row_group_id, row_group, _ctz, - _io_ctx, position_delete_ctx, _lazy_read_ctx, _state)); + _current_group_reader.reset( + new RowGroupReader(_io_ctx ? std::make_shared( + group_file_reader, _io_ctx->file_reader_stats) + : group_file_reader, + _read_table_columns, row_group_index.row_group_id, row_group, _ctz, + _io_ctx, position_delete_ctx, _lazy_read_ctx, _state)); _row_group_eof = false; _current_group_reader->_table_info_node_ptr = _table_info_node_ptr; @@ -748,6 +756,9 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, std::function read_whole_row_group = [&]() { candidate_row_ranges.emplace_back(0, row_group.num_rows); _statistics.read_rows += row_group.num_rows; + if (_io_ctx) { + _io_ctx->file_reader_stats->read_rows += row_group.num_rows; + } }; if ((!_enable_filter_by_min_max) || _lazy_read_ctx.has_complex_type || @@ -766,8 +777,8 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, Slice result(col_index_buff.data(), page_index._column_index_size); { SCOPED_RAW_TIMER(&_statistics.read_page_index_time); - RETURN_IF_ERROR(_file_reader->read_at(page_index._column_index_start, result, &bytes_read, - _io_ctx)); + RETURN_IF_ERROR(_tracing_file_reader->read_at(page_index._column_index_start, result, + &bytes_read, _io_ctx)); } _column_statistics.read_bytes += bytes_read; auto& schema_desc = _file_metadata->schema(); @@ -776,8 +787,8 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, Slice res(off_index_buff.data(), page_index._offset_index_size); { SCOPED_RAW_TIMER(&_statistics.read_page_index_time); - RETURN_IF_ERROR( - _file_reader->read_at(page_index._offset_index_start, res, &bytes_read, _io_ctx)); + RETURN_IF_ERROR(_tracing_file_reader->read_at(page_index._offset_index_start, res, + &bytes_read, _io_ctx)); } _column_statistics.read_bytes += bytes_read; // read twice: parse column index & parse offset index @@ -856,6 +867,9 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, read_rows += row_group.num_rows - skip_end; } _statistics.read_rows += read_rows; + if (_io_ctx) { + _io_ctx->file_reader_stats->read_rows += read_rows; + } _statistics.filtered_page_rows += row_group.num_rows - read_rows; return Status::OK(); } @@ -1006,10 +1020,7 @@ void ParquetReader::_collect_profile() { _column_statistics.parse_page_header_num); COUNTER_UPDATE(_parquet_profile.predicate_filter_time, _statistics.predicate_filter_time); COUNTER_UPDATE(_parquet_profile.dict_filter_rewrite_time, _statistics.dict_filter_rewrite_time); - COUNTER_UPDATE(_parquet_profile.file_read_time, _column_statistics.read_time); - COUNTER_UPDATE(_parquet_profile.file_read_calls, _column_statistics.read_calls); COUNTER_UPDATE(_parquet_profile.file_meta_read_calls, _column_statistics.meta_read_calls); - COUNTER_UPDATE(_parquet_profile.file_read_bytes, _column_statistics.read_bytes); COUNTER_UPDATE(_parquet_profile.decompress_time, _column_statistics.decompress_time); COUNTER_UPDATE(_parquet_profile.decompress_cnt, _column_statistics.decompress_cnt); COUNTER_UPDATE(_parquet_profile.decode_header_time, _column_statistics.decode_header_time); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 814964833db38a..c560b1c3800ed5 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -105,8 +105,8 @@ class ParquetReader : public GenericReader { io::IOContext* io_ctx, RuntimeState* state, bool enable_lazy_mat = true); ~ParquetReader() override; - // for test - void set_file_reader(io::FileReaderSPtr file_reader) { _file_reader = file_reader; } + // for unit test + void set_file_reader(io::FileReaderSPtr file_reader); Status init_reader( const std::vector& all_column_names, @@ -153,6 +153,8 @@ class ParquetReader : public GenericReader { Status get_file_metadata_schema(const FieldDescriptor** ptr); + bool count_read_rows() override { return true; } + protected: void _collect_profile_before_close() override; @@ -176,10 +178,7 @@ class ParquetReader : public GenericReader { RuntimeProfile::Counter* read_page_index_time = nullptr; RuntimeProfile::Counter* parse_page_index_time = nullptr; - RuntimeProfile::Counter* file_read_time = nullptr; - RuntimeProfile::Counter* file_read_calls = nullptr; RuntimeProfile::Counter* file_meta_read_calls = nullptr; - RuntimeProfile::Counter* file_read_bytes = nullptr; RuntimeProfile::Counter* decompress_time = nullptr; RuntimeProfile::Counter* decompress_cnt = nullptr; RuntimeProfile::Counter* decode_header_time = nullptr; @@ -244,7 +243,12 @@ class ParquetReader : public GenericReader { FileMetaData* _file_metadata = nullptr; const tparquet::FileMetaData* _t_metadata = nullptr; + // _tracing_file_reader wraps _file_reader. + // _file_reader is original file reader. + // _tracing_file_reader is tracing file reader with io context. + // If io_ctx is null, _tracing_file_reader will be the same as file_reader. io::FileReaderSPtr _file_reader = nullptr; + io::FileReaderSPtr _tracing_file_reader = nullptr; std::unique_ptr _current_group_reader; // read to the end of current reader bool _row_group_eof = true; diff --git a/be/src/vec/exec/format/table/table_format_reader.h b/be/src/vec/exec/format/table/table_format_reader.h index fbba6a51033530..1f79de54ed4002 100644 --- a/be/src/vec/exec/format/table/table_format_reader.h +++ b/be/src/vec/exec/format/table/table_format_reader.h @@ -104,6 +104,8 @@ class TableFormatReader : public GenericReader { virtual Status init_row_filters() = 0; + bool count_read_rows() override { return _file_format_reader->count_read_rows(); } + protected: std::string _table_format; // hudi, iceberg, paimon std::unique_ptr _file_format_reader; // parquet, orc diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index be3b5d3d61bac1..e65a761a1827d7 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -556,9 +556,18 @@ void NewOlapScanner::update_realtime_counters() { // In case of no cache, we still need to update the IO stats. uncompressed bytes read == local + remote if (stats.file_cache_stats.bytes_read_from_local == 0 && stats.file_cache_stats.bytes_read_from_remote == 0) { + if (_query_statistics) { + _query_statistics->add_scan_bytes_from_local_storage(stats.compressed_bytes_read); + } DorisMetrics::instance()->query_scan_bytes_from_local->increment( stats.compressed_bytes_read); } else { + if (_query_statistics) { + _query_statistics->add_scan_bytes_from_local_storage( + stats.file_cache_stats.bytes_read_from_local); + _query_statistics->add_scan_bytes_from_remote_storage( + stats.file_cache_stats.bytes_read_from_remote); + } DorisMetrics::instance()->query_scan_bytes_from_local->increment( stats.file_cache_stats.bytes_read_from_local); DorisMetrics::instance()->query_scan_bytes_from_remote->increment( diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 0cba6b8a0c28be..b744ae0d174c29 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -38,6 +38,7 @@ #include "common/logging.h" #include "common/status.h" #include "io/cache/block_file_cache_profile.h" +#include "io/fs/tracing_file_reader.h" #include "runtime/descriptors.h" #include "runtime/runtime_state.h" #include "runtime/types.h" @@ -146,14 +147,24 @@ Status VFileScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conju "NotFoundFileNum", TUnit::UNIT, 1); _file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1); + + _file_read_bytes_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), + "FileReadBytes", TUnit::BYTES, 1); + _file_read_calls_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), + "FileReadCalls", TUnit::UNIT, 1); + _file_read_time_counter = + ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileReadTime", 1); + _runtime_filter_partition_pruned_range_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "RuntimeFilterPartitionPrunedRangeNum", TUnit::UNIT, 1); _file_cache_statistics.reset(new io::FileCacheStatistics()); + _file_reader_stats.reset(new io::FileReaderStats()); + _io_ctx.reset(new io::IOContext()); _io_ctx->file_cache_stats = _file_cache_statistics.get(); - _io_ctx->query_id = &_state->query_id(); + _io_ctx->file_reader_stats = _file_reader_stats.get(); if (_is_load) { _src_row_desc.reset(new RowDescriptor(_state->desc_tbl(), @@ -435,6 +446,9 @@ Status VFileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool* // use read_rows instead of _src_block_ptr->rows(), because the first column of _src_block_ptr // may not be filled after calling `get_next_block()`, so _src_block_ptr->rows() may return wrong result. if (read_rows > 0) { + if ((!_cur_reader->count_read_rows()) && _io_ctx) { + _io_ctx->file_reader_stats->read_rows += read_rows; + } // If the push_down_agg_type is COUNT, no need to do the rest, // because we only save a number in block. if (_get_push_down_agg_type() != TPushAggOp::type::COUNT) { @@ -1478,6 +1492,49 @@ void VFileScanner::try_stop() { } } +void VFileScanner::update_realtime_counters() { + pipeline::FileScanLocalState* local_state = + static_cast(_local_state); + + COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes); + COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows); + + if (_query_statistics) { + _query_statistics->add_scan_rows(_file_reader_stats->read_rows); + _query_statistics->add_scan_bytes(_file_reader_stats->read_bytes); + } + + if (_file_cache_statistics->bytes_read_from_local == 0 && + _file_cache_statistics->bytes_read_from_remote == 0) { + if (_query_statistics) { + _query_statistics->add_scan_bytes_from_remote_storage(_file_reader_stats->read_bytes); + } + DorisMetrics::instance()->query_scan_bytes_from_local->increment( + _file_reader_stats->read_bytes); + } else { + if (_query_statistics) { + _query_statistics->add_scan_bytes_from_local_storage( + _file_cache_statistics->bytes_read_from_local); + _query_statistics->add_scan_bytes_from_remote_storage( + _file_cache_statistics->bytes_read_from_remote); + } + DorisMetrics::instance()->query_scan_bytes_from_local->increment( + _file_cache_statistics->bytes_read_from_local); + DorisMetrics::instance()->query_scan_bytes_from_remote->increment( + _file_cache_statistics->bytes_read_from_remote); + } + + COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes); + + DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes); + DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows); + + _file_reader_stats->read_bytes = 0; + _file_reader_stats->read_rows = 0; + _file_cache_statistics->bytes_read_from_local = 0; + _file_cache_statistics->bytes_read_from_remote = 0; +} + void VFileScanner::_collect_profile_before_close() { VScanner::_collect_profile_before_close(); if (config::enable_file_cache && _state->query_options().enable_file_cache && @@ -1489,6 +1546,18 @@ void VFileScanner::_collect_profile_before_close() { if (_cur_reader != nullptr) { _cur_reader->collect_profile_before_close(); } + + pipeline::FileScanLocalState* local_state = + static_cast(_local_state); + COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes); + COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows); + + COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes); + COUNTER_UPDATE(_file_read_calls_counter, _file_reader_stats->read_calls); + COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns); + + DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes); + DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index d2e6a220a15826..705f01962dcda3 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -81,6 +81,8 @@ class VFileScanner : public VScanner { std::string get_current_scan_range_name() override { return _current_range_path; } + void update_realtime_counters() override; + protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override; @@ -165,6 +167,7 @@ class VFileScanner : public VScanner { Block _runtime_filter_partition_prune_block; std::unique_ptr _file_cache_statistics; + std::unique_ptr _file_reader_stats; std::unique_ptr _io_ctx; std::unordered_map> @@ -186,6 +189,9 @@ class VFileScanner : public VScanner { RuntimeProfile::Counter* _empty_file_counter = nullptr; RuntimeProfile::Counter* _not_found_file_counter = nullptr; RuntimeProfile::Counter* _file_counter = nullptr; + RuntimeProfile::Counter* _file_read_bytes_counter = nullptr; + RuntimeProfile::Counter* _file_read_calls_counter = nullptr; + RuntimeProfile::Counter* _file_read_time_counter = nullptr; RuntimeProfile::Counter* _runtime_filter_partition_pruned_range_counter = nullptr; const std::unordered_map* _col_name_to_slot_id = nullptr;