diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index cae08d284179c4..6277416055550c 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -819,12 +819,10 @@ Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset int64_t buf_remaining = _buf_end_offset - _buf_start_offset; int64_t to_read = std::min(_buf_size - buf_remaining, _file_end_offset - _buf_end_offset); int64_t has_read = 0; - SCOPED_RAW_TIMER(&_statistics.read_time); while (has_read < to_read) { size_t loop_read = 0; Slice result(_buf.get() + buf_remaining + has_read, to_read - has_read); RETURN_IF_ERROR(_file->read_at(_buf_end_offset + has_read, result, &loop_read, io_ctx)); - _statistics.read_calls++; if (loop_read == 0) { break; } @@ -833,7 +831,6 @@ Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset if (has_read != to_read) { return Status::Corruption("Try to read {} bytes, but received {} bytes", to_read, has_read); } - _statistics.read_bytes += to_read; _buf_end_offset += to_read; *buf = _buf.get(); return Status::OK(); diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index 6bcf634aef35ea..5fe071762351d8 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -225,7 +225,6 @@ class MergeRangeFileReader : public io::FileReader { int64_t merged_io = 0; int64_t request_bytes = 0; int64_t merged_bytes = 0; - int64_t apply_bytes = 0; }; struct RangeCachedData { @@ -299,9 +298,6 @@ class MergeRangeFileReader : public io::FileReader { _merged_read_slice_size = READ_SLICE_SIZE; } - for (const PrefetchRange& range : _random_access_ranges) { - _statistics.apply_bytes += range.end_offset - range.start_offset; - } if (_profile != nullptr) { const char* random_profile = "MergedSmallIO"; ADD_TIMER_WITH_LEVEL(_profile, random_profile, 1); @@ -315,8 +311,6 @@ class MergeRangeFileReader : public io::FileReader { random_profile, 1); _merged_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "MergedBytes", TUnit::BYTES, random_profile, 1); - _apply_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "ApplyBytes", TUnit::BYTES, - random_profile, 1); } } @@ -359,7 +353,6 @@ class MergeRangeFileReader : public io::FileReader { COUNTER_UPDATE(_merged_io, _statistics.merged_io); COUNTER_UPDATE(_request_bytes, _statistics.request_bytes); COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes); - COUNTER_UPDATE(_apply_bytes, _statistics.apply_bytes); if (_reader != nullptr) { _reader->collect_profile_before_close(); } @@ -373,7 +366,6 @@ class MergeRangeFileReader : public io::FileReader { RuntimeProfile::Counter* _merged_io = nullptr; RuntimeProfile::Counter* _request_bytes = nullptr; RuntimeProfile::Counter* _merged_bytes = nullptr; - RuntimeProfile::Counter* _apply_bytes = nullptr; int _search_read_range(size_t start_offset, size_t end_offset); void _clean_cached_data(RangeCachedData& cached_data); @@ -619,12 +611,6 @@ class InMemoryFileReader final : public io::FileReader { */ class BufferedStreamReader { public: - struct Statistics { - int64_t read_time = 0; - int64_t read_calls = 0; - int64_t read_bytes = 0; - }; - /** * Return the address of underlying buffer that locates the start of data between [offset, offset + bytes_to_read) * @param buf the buffer address to save the start address of data @@ -637,13 +623,9 @@ class BufferedStreamReader { * Save the data address to slice.data, and the slice.size is the bytes to read. */ virtual Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) = 0; - Statistics& statistics() { return _statistics; } virtual ~BufferedStreamReader() = default; // return the file path virtual std::string path() = 0; - -protected: - Statistics _statistics; }; class BufferedFileStreamReader : public BufferedStreamReader, public ProfileCollector { diff --git a/be/src/io/fs/tracing_file_reader.h b/be/src/io/fs/tracing_file_reader.h index 84eb3dfc8fb569..39b70dfbb63bef 100644 --- a/be/src/io/fs/tracing_file_reader.h +++ b/be/src/io/fs/tracing_file_reader.h @@ -48,6 +48,7 @@ class TracingFileReader : public FileReader { void _collect_profile_before_close() override { return _inner->collect_profile_before_close(); } FileReaderStats* stats() const { return _stats; } + doris::io::FileReaderSPtr inner_reader() { return _inner; } private: doris::io::FileReaderSPtr _inner; diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index 74398e6da4aa5b..621d8a2c505021 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -189,8 +189,8 @@ Status ColumnChunkReader::load_page_data() { // check decompressed buffer size _reserve_decompress_buf(uncompressed_size); _page_data = Slice(_decompress_buf.get(), uncompressed_size); - SCOPED_RAW_TIMER(&_statistics.decompress_time); - _statistics.decompress_cnt++; + SCOPED_RAW_TIMER(&_chunk_statistics.decompress_time); + _chunk_statistics.decompress_cnt++; RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &_page_data)); } else { // Don't need decompress @@ -205,7 +205,7 @@ Status ColumnChunkReader::load_page_data() { // Initialize repetition level and definition level. Skip when level = 0, which means required field. if (_max_rep_level > 0) { - SCOPED_RAW_TIMER(&_statistics.decode_level_time); + SCOPED_RAW_TIMER(&_chunk_statistics.decode_level_time); if (header->__isset.data_page_header_v2) { RETURN_IF_ERROR(_rep_level_decoder.init_v2(_v2_rep_levels, _max_rep_level, _remaining_rep_nums)); @@ -216,7 +216,7 @@ Status ColumnChunkReader::load_page_data() { } } if (_max_def_level > 0) { - SCOPED_RAW_TIMER(&_statistics.decode_level_time); + SCOPED_RAW_TIMER(&_chunk_statistics.decode_level_time); if (header->__isset.data_page_header_v2) { RETURN_IF_ERROR(_def_level_decoder.init_v2(_v2_def_levels, _max_def_level, _remaining_def_nums)); @@ -256,7 +256,7 @@ Status ColumnChunkReader::_decode_dict_page() { const tparquet::PageHeader* header = nullptr; RETURN_IF_ERROR(_page_reader->get_page_header(header)); DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header->type); - SCOPED_RAW_TIMER(&_statistics.decode_dict_time); + SCOPED_RAW_TIMER(&_chunk_statistics.decode_dict_time); // Using the PLAIN_DICTIONARY enum value is deprecated in the Parquet 2.0 specification. // Prefer using RLE_DICTIONARY in a data page and PLAIN in a dictionary page for Parquet 2.0+ files. @@ -315,7 +315,7 @@ Status ColumnChunkReader::skip_values(size_t num_va } _remaining_num_values -= num_values; if (skip_data) { - SCOPED_RAW_TIMER(&_statistics.decode_value_time); + SCOPED_RAW_TIMER(&_chunk_statistics.decode_value_time); return _page_decoder->skip_values(num_values); } else { return Status::OK(); @@ -329,7 +329,7 @@ Status ColumnChunkReader::decode_values( if (select_vector.num_values() == 0) { return Status::OK(); } - SCOPED_RAW_TIMER(&_statistics.decode_value_time); + SCOPED_RAW_TIMER(&_chunk_statistics.decode_value_time); if (UNLIKELY((doris_column->is_column_dictionary() || is_dict_filter) && !_has_dict)) { return Status::IOError("Not dictionary coded"); } diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index 4022eac702b1bd..1270e5e37fcd1e 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -60,6 +60,7 @@ struct ColumnChunkReaderStatistics { int64_t decode_level_time = 0; int64_t skip_page_header_num = 0; int64_t parse_page_header_num = 0; + int64_t read_page_header_time = 0; }; /** @@ -146,11 +147,15 @@ class ColumnChunkReader { // Get page decoder Decoder* get_page_decoder() { return _page_decoder; } - ColumnChunkReaderStatistics& statistics() { - _statistics.decode_header_time = _page_reader->statistics().decode_header_time; - _statistics.skip_page_header_num = _page_reader->statistics().skip_page_header_num; - _statistics.parse_page_header_num = _page_reader->statistics().parse_page_header_num; - return _statistics; + ColumnChunkReaderStatistics& chunk_statistics() { + _chunk_statistics.decode_header_time = _page_reader->page_statistics().decode_header_time; + _chunk_statistics.skip_page_header_num = + _page_reader->page_statistics().skip_page_header_num; + _chunk_statistics.parse_page_header_num = + _page_reader->page_statistics().parse_page_header_num; + _chunk_statistics.read_page_header_time = + _page_reader->page_statistics().read_page_header_time; + return _chunk_statistics; } Status read_dict_values_to_column(MutableColumnPtr& doris_column) { @@ -238,7 +243,7 @@ class ColumnChunkReader { // Map: encoding -> Decoder // Plain or Dictionary encoding. If the dictionary grows too big, the encoding will fall back to the plain encoding std::unordered_map> _decoders; - ColumnChunkReaderStatistics _statistics; + ColumnChunkReaderStatistics _chunk_statistics; }; #include "common/compile_check_end.h" diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 07b07255359201..e24ec85b2e1811 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -25,6 +25,7 @@ #include #include +#include "io/fs/tracing_file_reader.h" #include "runtime/define_primitive_type.h" #include "schema_desc.h" #include "util/runtime_profile.h" @@ -248,7 +249,10 @@ Status ScalarColumnReader::init(io::FileReaderSPtr : chunk_meta.data_page_offset; size_t chunk_len = chunk_meta.total_compressed_size; size_t prefetch_buffer_size = std::min(chunk_len, max_buf_size); - if (typeid_cast(file.get())) { + if ((typeid_cast(file.get()) && + typeid_cast( + ((doris::io::TracingFileReader*)(file.get()))->inner_reader().get())) || + typeid_cast(file.get())) { // turn off prefetch data when using MergeRangeFileReader prefetch_buffer_size = 0; } diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index 0e11f6f55fa139..62ae4eb5fbeb96 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -53,12 +53,9 @@ using ColumnString = ColumnStr; class ParquetColumnReader { public: - struct Statistics { - Statistics() - : read_time(0), - read_calls(0), - page_index_read_calls(0), - read_bytes(0), + struct ColumnStatistics { + ColumnStatistics() + : page_index_read_calls(0), decompress_time(0), decompress_cnt(0), decode_header_time(0), @@ -67,14 +64,11 @@ class ParquetColumnReader { decode_level_time(0), decode_null_map_time(0), skip_page_header_num(0), - parse_page_header_num(0) {} - - Statistics(io::BufferedStreamReader::Statistics& fs, ColumnChunkReaderStatistics& cs, - int64_t null_map_time) - : read_time(fs.read_time), - read_calls(fs.read_calls), - page_index_read_calls(0), - read_bytes(fs.read_bytes), + parse_page_header_num(0), + read_page_header_time(0) {} + + ColumnStatistics(ColumnChunkReaderStatistics& cs, int64_t null_map_time) + : page_index_read_calls(0), decompress_time(cs.decompress_time), decompress_cnt(cs.decompress_cnt), decode_header_time(cs.decode_header_time), @@ -83,12 +77,10 @@ class ParquetColumnReader { decode_level_time(cs.decode_level_time), decode_null_map_time(null_map_time), skip_page_header_num(cs.skip_page_header_num), - parse_page_header_num(cs.parse_page_header_num) {} + parse_page_header_num(cs.parse_page_header_num), + read_page_header_time(cs.read_page_header_time) {} - int64_t read_time; - int64_t read_calls; int64_t page_index_read_calls; - int64_t read_bytes; int64_t decompress_time; int64_t decompress_cnt; int64_t decode_header_time; @@ -98,21 +90,20 @@ class ParquetColumnReader { int64_t decode_null_map_time; int64_t skip_page_header_num; int64_t parse_page_header_num; - - void merge(Statistics& statistics) { - read_time += statistics.read_time; - read_calls += statistics.read_calls; - read_bytes += statistics.read_bytes; - page_index_read_calls += statistics.page_index_read_calls; - decompress_time += statistics.decompress_time; - decompress_cnt += statistics.decompress_cnt; - decode_header_time += statistics.decode_header_time; - decode_value_time += statistics.decode_value_time; - decode_dict_time += statistics.decode_dict_time; - decode_level_time += statistics.decode_level_time; - decode_null_map_time += statistics.decode_null_map_time; - skip_page_header_num += statistics.skip_page_header_num; - parse_page_header_num += statistics.parse_page_header_num; + int64_t read_page_header_time; + + void merge(ColumnStatistics& col_statistics) { + page_index_read_calls += col_statistics.page_index_read_calls; + decompress_time += col_statistics.decompress_time; + decompress_cnt += col_statistics.decompress_cnt; + decode_header_time += col_statistics.decode_header_time; + decode_value_time += col_statistics.decode_value_time; + decode_dict_time += col_statistics.decode_dict_time; + decode_level_time += col_statistics.decode_level_time; + decode_null_map_time += col_statistics.decode_null_map_time; + skip_page_header_num += col_statistics.skip_page_header_num; + parse_page_header_num += col_statistics.parse_page_header_num; + read_page_header_time += col_statistics.read_page_header_time; } }; @@ -145,7 +136,7 @@ class ParquetColumnReader { const std::set& filter_column_ids = {}); virtual const std::vector& get_rep_level() const = 0; virtual const std::vector& get_def_level() const = 0; - virtual Statistics statistics() = 0; + virtual ColumnStatistics column_statistics() = 0; virtual void close() = 0; virtual void reset_filter_map_index() = 0; @@ -188,9 +179,8 @@ class ScalarColumnReader : public ParquetColumnReader { MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) override; const std::vector& get_rep_level() const override { return _rep_levels; } const std::vector& get_def_level() const override { return _def_levels; } - Statistics statistics() override { - return Statistics(_stream_reader->statistics(), _chunk_reader->statistics(), - _decode_null_map_time); + ColumnStatistics column_statistics() override { + return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time); } void close() override {} @@ -304,7 +294,7 @@ class ArrayColumnReader : public ParquetColumnReader { const std::vector& get_def_level() const override { return _element_reader->get_def_level(); } - Statistics statistics() override { return _element_reader->statistics(); } + ColumnStatistics column_statistics() override { return _element_reader->column_statistics(); } void close() override {} void reset_filter_map_index() override { _element_reader->reset_filter_map_index(); } @@ -335,9 +325,9 @@ class MapColumnReader : public ParquetColumnReader { return _key_reader->get_def_level(); } - Statistics statistics() override { - Statistics kst = _key_reader->statistics(); - Statistics vst = _value_reader->statistics(); + ColumnStatistics column_statistics() override { + ColumnStatistics kst = _key_reader->column_statistics(); + ColumnStatistics vst = _value_reader->column_statistics(); kst.merge(vst); return kst; } @@ -392,12 +382,12 @@ class StructColumnReader : public ParquetColumnReader { return _child_readers.begin()->second->get_def_level(); } - Statistics statistics() override { - Statistics st; + ColumnStatistics column_statistics() override { + ColumnStatistics st; for (const auto& column_name : _read_column_names) { auto reader = _child_readers.find(column_name); if (reader != _child_readers.end()) { - Statistics cst = reader->second->statistics(); + ColumnStatistics cst = reader->second->column_statistics(); st.merge(cst); } } @@ -490,8 +480,8 @@ class SkipReadingReader : public ParquetColumnReader { } // Implement required pure virtual methods from base class - Statistics statistics() override { - return Statistics(); // Return empty statistics + ColumnStatistics column_statistics() override { + return ColumnStatistics(); // Return empty statistics } void close() override { diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index cd815e0091b9a0..b9f5e983aad6c5 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -1125,10 +1125,10 @@ void RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) { } } -ParquetColumnReader::Statistics RowGroupReader::statistics() { - ParquetColumnReader::Statistics st; +ParquetColumnReader::ColumnStatistics RowGroupReader::merged_column_statistics() { + ParquetColumnReader::ColumnStatistics st; for (auto& reader : _column_readers) { - auto ost = reader.second->statistics(); + auto ost = reader.second->column_statistics(); st.merge(ost); } return st; diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index b3b1123f82e4a3..8f81e633146aef 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -173,7 +173,7 @@ class RowGroupReader : public ProfileCollector { int64_t predicate_filter_time() const { return _predicate_filter_time; } int64_t dict_filter_rewrite_time() const { return _dict_filter_rewrite_time; } - ParquetColumnReader::Statistics statistics(); + ParquetColumnReader::ColumnStatistics merged_column_statistics(); void set_remaining_rows(int64_t rows) { _remaining_rows = rows; } int64_t get_remaining_rows() { return _remaining_rows; } diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp index 3734dc217f5657..3b6d7fdcb9bbb2 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp @@ -87,9 +87,12 @@ Status PageReader::parse_page_header() { return Status::EndOfFile("stop"); } header_size = std::min(header_size, max_size); - RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size, _io_ctx)); + { + SCOPED_RAW_TIMER(&_page_statistics.read_page_header_time); + RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size, _io_ctx)); + } real_header_size = cast_set(header_size); - SCOPED_RAW_TIMER(&_statistics.decode_header_time); + SCOPED_RAW_TIMER(&_page_statistics.decode_header_time); auto st = deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header); if (st.ok()) { @@ -112,7 +115,7 @@ Status PageReader::parse_page_header() { } } - _statistics.parse_page_header_num++; + _page_statistics.parse_page_header_num++; _offset += real_header_size; _next_header_offset = _offset + _cur_page_header.compressed_page_size; _state = HEADER_PARSED; diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.h b/be/src/vec/exec/format/parquet/vparquet_page_reader.h index c33a7ca8cdbfa6..9246819d59c399 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.h @@ -53,10 +53,11 @@ namespace doris::vectorized { template class PageReader { public: - struct Statistics { + struct PageStatistics { int64_t decode_header_time = 0; int64_t skip_page_header_num = 0; int64_t parse_page_header_num = 0; + int64_t read_page_header_time = 0; }; PageReader(io::BufferedStreamReader* reader, io::IOContext* io_ctx, uint64_t offset, @@ -82,7 +83,7 @@ class PageReader { Status parse_page_header(); Status next_page() { - _statistics.skip_page_header_num += _state == INITIALIZED; + _page_statistics.skip_page_header_num += _state == INITIALIZED; if constexpr (OFFSET_INDEX) { _page_index++; _start_row = _offset_index->page_locations[_page_index].first_row_index; @@ -132,7 +133,7 @@ class PageReader { Status get_page_data(Slice& slice); - Statistics& statistics() { return _statistics; } + PageStatistics& page_statistics() { return _page_statistics; } bool is_header_v2() { return _cur_page_header.__isset.data_page_header_v2; } @@ -143,7 +144,7 @@ class PageReader { private: enum PageReaderState { INITIALIZED, HEADER_PARSED, DATA_LOADED }; PageReaderState _state = INITIALIZED; - Statistics _statistics; + PageStatistics _page_statistics; io::BufferedStreamReader* _reader = nullptr; io::IOContext* _io_ctx = nullptr; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 3eb04608f73676..0b49a7efce3049 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -147,13 +147,15 @@ void ParquetReader::_init_profile() { ADD_TIMER_WITH_LEVEL(_profile, parquet_profile, 1); _parquet_profile.filtered_row_groups = ADD_CHILD_COUNTER_WITH_LEVEL( - _profile, "FilteredGroups", TUnit::UNIT, parquet_profile, 1); + _profile, "RowGroupsFiltered", TUnit::UNIT, parquet_profile, 1); _parquet_profile.filtered_row_groups_by_min_max = ADD_CHILD_COUNTER_WITH_LEVEL( - _profile, "FilteredGroupsByMinMax", TUnit::UNIT, parquet_profile, 1); + _profile, "RowGroupsFilteredByMinMax", TUnit::UNIT, parquet_profile, 1); _parquet_profile.filtered_row_groups_by_bloom_filter = ADD_CHILD_COUNTER_WITH_LEVEL( - _profile, "FilteredGroupsByBloomFilter", TUnit::UNIT, parquet_profile, 1); + _profile, "RowGroupsFilteredByBloomFilter", TUnit::UNIT, parquet_profile, 1); _parquet_profile.to_read_row_groups = ADD_CHILD_COUNTER_WITH_LEVEL( - _profile, "ReadGroups", TUnit::UNIT, parquet_profile, 1); + _profile, "RowGroupsReadNum", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.total_row_groups = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "RowGroupsTotalNum", TUnit::UNIT, parquet_profile, 1); _parquet_profile.filtered_group_rows = ADD_CHILD_COUNTER_WITH_LEVEL( _profile, "FilteredRowsByGroup", TUnit::UNIT, parquet_profile, 1); _parquet_profile.filtered_page_rows = ADD_CHILD_COUNTER_WITH_LEVEL( @@ -164,16 +166,14 @@ void ParquetReader::_init_profile() { _profile, "FilteredBytes", TUnit::BYTES, parquet_profile, 1); _parquet_profile.raw_rows_read = ADD_CHILD_COUNTER_WITH_LEVEL( _profile, "RawRowsRead", TUnit::UNIT, parquet_profile, 1); - _parquet_profile.to_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL( - _profile, "ReadBytes", TUnit::BYTES, parquet_profile, 1); _parquet_profile.column_read_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ColumnReadTime", parquet_profile, 1); _parquet_profile.parse_meta_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ParseMetaTime", parquet_profile, 1); _parquet_profile.parse_footer_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ParseFooterTime", parquet_profile, 1); - _parquet_profile.open_file_time = - ADD_CHILD_TIMER_WITH_LEVEL(_profile, "FileOpenTime", parquet_profile, 1); + _parquet_profile.file_reader_create_time = + ADD_CHILD_TIMER_WITH_LEVEL(_profile, "FileReaderCreateTime", parquet_profile, 1); _parquet_profile.open_file_num = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "FileNum", TUnit::UNIT, parquet_profile, 1); _parquet_profile.page_index_read_calls = @@ -195,7 +195,9 @@ void ParquetReader::_init_profile() { _parquet_profile.decompress_cnt = ADD_CHILD_COUNTER_WITH_LEVEL( _profile, "DecompressCount", TUnit::UNIT, parquet_profile, 1); _parquet_profile.decode_header_time = - ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeHeaderTime", parquet_profile, 1); + ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PageHeaderDecodeTime", parquet_profile, 1); + _parquet_profile.read_page_header_time = + ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PageHeaderReadTime", parquet_profile, 1); _parquet_profile.decode_value_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeValueTime", parquet_profile, 1); _parquet_profile.decode_dict_time = @@ -233,8 +235,8 @@ Status ParquetReader::_open_file() { return Status::EndOfFile("stop"); } if (_file_reader == nullptr) { - SCOPED_RAW_TIMER(&_statistics.open_file_time); - ++_statistics.open_file_num; + SCOPED_RAW_TIMER(&_reader_statistics.file_reader_create_time); + ++_reader_statistics.open_file_num; _file_description.mtime = _scan_range.__isset.modification_time ? _scan_range.modification_time : 0; io::FileReaderOptions reader_options = @@ -248,7 +250,7 @@ Status ParquetReader::_open_file() { } if (_file_metadata == nullptr) { - SCOPED_RAW_TIMER(&_statistics.parse_footer_time); + SCOPED_RAW_TIMER(&_reader_statistics.parse_footer_time); if (_tracing_file_reader->size() <= sizeof(PARQUET_VERSION_NUMBER)) { // Some system may generate parquet file with only 4 bytes: PAR1 // Should consider it as empty file. @@ -264,9 +266,8 @@ Status ParquetReader::_open_file() { RETURN_IF_ERROR(parse_thrift_footer(_tracing_file_reader, &_file_metadata_ptr, &meta_size, _io_ctx, enable_mapping_varbinary)); _file_metadata = _file_metadata_ptr.get(); - _column_statistics.read_bytes += meta_size; // parse magic number & parse meta data - _statistics.file_footer_read_calls += 1; + _reader_statistics.file_footer_read_calls += 1; } else { const auto& file_meta_cache_key = FileMetaCache::get_key(_tracing_file_reader, _file_description); @@ -277,10 +278,9 @@ Status ParquetReader::_open_file() { _meta_cache->insert(file_meta_cache_key, _file_metadata_ptr.release(), &_meta_cache_handle); _file_metadata = _meta_cache_handle.data(); - _column_statistics.read_bytes += meta_size; - _statistics.file_footer_read_calls += 1; + _reader_statistics.file_footer_read_calls += 1; } else { - _statistics.file_footer_hit_cache++; + _reader_statistics.file_footer_hit_cache++; } _file_metadata = _meta_cache_handle.data(); } @@ -289,9 +289,6 @@ Status ParquetReader::_open_file() { return Status::InternalError("failed to get file meta data: {}", _file_description.path); } - _column_statistics.read_bytes += meta_size; - // parse magic number & parse meta data - _column_statistics.read_calls += 1; } return Status::OK(); } @@ -353,7 +350,7 @@ Status ParquetReader::init_reader( return Status::InternalError("failed to init parquet reader, please open reader first"); } - SCOPED_RAW_TIMER(&_statistics.parse_meta_time); + SCOPED_RAW_TIMER(&_reader_statistics.parse_meta_time); _total_groups = _t_metadata->row_groups.size(); if (_total_groups == 0) { return Status::EndOfFile("init reader failed, empty parquet file: " + _scan_range.path); @@ -651,7 +648,7 @@ Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) return Status::OK(); } - SCOPED_RAW_TIMER(&_statistics.column_read_time); + SCOPED_RAW_TIMER(&_reader_statistics.column_read_time); Status batch_st = _current_group_reader->next_batch(block, _batch_size, read_rows, &_row_group_eof); if (batch_st.is()) { @@ -668,11 +665,13 @@ Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) } if (_row_group_eof) { - auto column_st = _current_group_reader->statistics(); + auto column_st = _current_group_reader->merged_column_statistics(); _column_statistics.merge(column_st); - _statistics.lazy_read_filtered_rows += _current_group_reader->lazy_read_filtered_rows(); - _statistics.predicate_filter_time += _current_group_reader->predicate_filter_time(); - _statistics.dict_filter_rewrite_time += _current_group_reader->dict_filter_rewrite_time(); + _reader_statistics.lazy_read_filtered_rows += + _current_group_reader->lazy_read_filtered_rows(); + _reader_statistics.predicate_filter_time += _current_group_reader->predicate_filter_time(); + _reader_statistics.dict_filter_rewrite_time += + _current_group_reader->dict_filter_rewrite_time(); if (_current_row_group_index.row_group_id + 1 == _total_groups) { *eof = true; } else { @@ -764,23 +763,22 @@ Status ParquetReader::_next_row_group_reader() { group_size += column_compressed_size(field); } - _statistics.read_rows += candidate_row_ranges.count(); + _reader_statistics.read_rows += candidate_row_ranges.count(); if (_io_ctx) { _io_ctx->file_reader_stats->read_rows += candidate_row_ranges.count(); } if (candidate_row_ranges.count() != 0) { // need read this row group. - _statistics.read_row_groups++; - _statistics.read_bytes += group_size; - - _statistics.filtered_page_rows += row_group.num_rows - candidate_row_ranges.count(); + _reader_statistics.read_row_groups++; + _reader_statistics.filtered_page_rows += + row_group.num_rows - candidate_row_ranges.count(); break; } else { // this row group be filtered. - _statistics.filtered_row_groups++; - _statistics.filtered_bytes += group_size; - _statistics.filtered_group_rows += row_group.num_rows; + _reader_statistics.filtered_row_groups++; + _reader_statistics.filtered_bytes += group_size; + _reader_statistics.filtered_group_rows += row_group.num_rows; } } @@ -947,11 +945,10 @@ Status ParquetReader::_process_page_index_filter( Slice res(off_index_buff.data(), page_index._offset_index_size); size_t bytes_read = 0; { - SCOPED_RAW_TIMER(&_statistics.read_page_index_time); + SCOPED_RAW_TIMER(&_reader_statistics.read_page_index_time); RETURN_IF_ERROR(_tracing_file_reader->read_at(page_index._offset_index_start, res, &bytes_read, _io_ctx)); } - _column_statistics.read_bytes += bytes_read; _column_statistics.page_index_read_calls++; _col_offsets.clear(); @@ -961,7 +958,7 @@ Status ParquetReader::_process_page_index_filter( continue; } tparquet::OffsetIndex offset_index; - SCOPED_RAW_TIMER(&_statistics.parse_page_index_time); + SCOPED_RAW_TIMER(&_reader_statistics.parse_page_index_time); RETURN_IF_ERROR( page_index.parse_offset_index(chunk, off_index_buff.data(), &offset_index)); _col_offsets[parquet_col_id] = offset_index; @@ -983,14 +980,13 @@ Status ParquetReader::_process_page_index_filter( size_t bytes_read = 0; Slice result(col_index_buff.data(), page_index._column_index_size); { - SCOPED_RAW_TIMER(&_statistics.read_page_index_time); + SCOPED_RAW_TIMER(&_reader_statistics.read_page_index_time); RETURN_IF_ERROR(_tracing_file_reader->read_at(page_index._column_index_start, result, &bytes_read, _io_ctx)); } - _column_statistics.read_bytes += bytes_read; _column_statistics.page_index_read_calls++; - SCOPED_RAW_TIMER(&_statistics.page_index_filter_time); + SCOPED_RAW_TIMER(&_reader_statistics.page_index_filter_time); // Construct a cacheable page index structure to avoid repeatedly reading the page index of the same column. ParquetPredicate::CachedPageIndexStat cached_page_index; @@ -1033,7 +1029,7 @@ Status ParquetReader::_process_page_index_filter( tparquet::ColumnIndex column_index; { - SCOPED_RAW_TIMER(&_statistics.parse_page_index_time); + SCOPED_RAW_TIMER(&_reader_statistics.parse_page_index_time); RETURN_IF_ERROR(page_index.parse_column_index(column_chunk, col_index_buff.data(), &column_index)); } @@ -1095,7 +1091,7 @@ Status ParquetReader::_process_min_max_bloom_filter( const RowGroupReader::RowGroupIndex& row_group_index, const tparquet::RowGroup& row_group, const std::vector>& push_down_pred, RowRanges* row_ranges) { - SCOPED_RAW_TIMER(&_statistics.row_group_filter_time); + SCOPED_RAW_TIMER(&_reader_statistics.row_group_filter_time); if (!_filter_groups) { // No row group filtering is needed; // for example, Iceberg reads position delete files. @@ -1128,10 +1124,10 @@ Status ParquetReader::_process_min_max_bloom_filter( // Update statistics based on filter type if (filter_this_row_group) { if (filtered_by_min_max) { - _statistics.filtered_row_groups_by_min_max++; + _reader_statistics.filtered_row_groups_by_min_max++; } if (filtered_by_bloom_filter) { - _statistics.filtered_row_groups_by_bloom_filter++; + _reader_statistics.filtered_row_groups_by_bloom_filter++; } } @@ -1221,7 +1217,7 @@ Status ParquetReader::_process_column_stat_filter( } if (!stat->bloom_filter) { - SCOPED_RAW_TIMER(&_statistics.bloom_filter_read_time); + SCOPED_RAW_TIMER(&_reader_statistics.bloom_filter_read_time); auto st = ParquetPredicate::read_bloom_filter( meta_data, _tracing_file_reader, _io_ctx, stat); if (!st.ok()) { @@ -1291,41 +1287,53 @@ void ParquetReader::_collect_profile() { if (_current_group_reader != nullptr) { _current_group_reader->collect_profile_before_close(); } - COUNTER_UPDATE(_parquet_profile.filtered_row_groups, _statistics.filtered_row_groups); + COUNTER_UPDATE(_parquet_profile.filtered_row_groups, _reader_statistics.filtered_row_groups); COUNTER_UPDATE(_parquet_profile.filtered_row_groups_by_min_max, - _statistics.filtered_row_groups_by_min_max); + _reader_statistics.filtered_row_groups_by_min_max); COUNTER_UPDATE(_parquet_profile.filtered_row_groups_by_bloom_filter, - _statistics.filtered_row_groups_by_bloom_filter); - COUNTER_UPDATE(_parquet_profile.to_read_row_groups, _statistics.read_row_groups); - COUNTER_UPDATE(_parquet_profile.filtered_group_rows, _statistics.filtered_group_rows); - COUNTER_UPDATE(_parquet_profile.filtered_page_rows, _statistics.filtered_page_rows); - COUNTER_UPDATE(_parquet_profile.lazy_read_filtered_rows, _statistics.lazy_read_filtered_rows); - COUNTER_UPDATE(_parquet_profile.filtered_bytes, _statistics.filtered_bytes); - COUNTER_UPDATE(_parquet_profile.raw_rows_read, _statistics.read_rows); - COUNTER_UPDATE(_parquet_profile.to_read_bytes, _statistics.read_bytes); - COUNTER_UPDATE(_parquet_profile.column_read_time, _statistics.column_read_time); - COUNTER_UPDATE(_parquet_profile.parse_meta_time, _statistics.parse_meta_time); - COUNTER_UPDATE(_parquet_profile.parse_footer_time, _statistics.parse_footer_time); - COUNTER_UPDATE(_parquet_profile.open_file_time, _statistics.open_file_time); - COUNTER_UPDATE(_parquet_profile.open_file_num, _statistics.open_file_num); - COUNTER_UPDATE(_parquet_profile.page_index_filter_time, _statistics.page_index_filter_time); - COUNTER_UPDATE(_parquet_profile.read_page_index_time, _statistics.read_page_index_time); - COUNTER_UPDATE(_parquet_profile.parse_page_index_time, _statistics.parse_page_index_time); - COUNTER_UPDATE(_parquet_profile.row_group_filter_time, _statistics.row_group_filter_time); - COUNTER_UPDATE(_parquet_profile.file_footer_read_calls, _statistics.file_footer_read_calls); - COUNTER_UPDATE(_parquet_profile.file_footer_hit_cache, _statistics.file_footer_hit_cache); + _reader_statistics.filtered_row_groups_by_bloom_filter); + COUNTER_UPDATE(_parquet_profile.to_read_row_groups, _reader_statistics.read_row_groups); + COUNTER_UPDATE(_parquet_profile.total_row_groups, _total_groups); + COUNTER_UPDATE(_parquet_profile.filtered_group_rows, _reader_statistics.filtered_group_rows); + COUNTER_UPDATE(_parquet_profile.filtered_page_rows, _reader_statistics.filtered_page_rows); + COUNTER_UPDATE(_parquet_profile.lazy_read_filtered_rows, + _reader_statistics.lazy_read_filtered_rows); + COUNTER_UPDATE(_parquet_profile.filtered_bytes, _reader_statistics.filtered_bytes); + COUNTER_UPDATE(_parquet_profile.raw_rows_read, _reader_statistics.read_rows); + COUNTER_UPDATE(_parquet_profile.column_read_time, _reader_statistics.column_read_time); + COUNTER_UPDATE(_parquet_profile.parse_meta_time, _reader_statistics.parse_meta_time); + COUNTER_UPDATE(_parquet_profile.parse_footer_time, _reader_statistics.parse_footer_time); + COUNTER_UPDATE(_parquet_profile.file_reader_create_time, + _reader_statistics.file_reader_create_time); + COUNTER_UPDATE(_parquet_profile.open_file_num, _reader_statistics.open_file_num); + COUNTER_UPDATE(_parquet_profile.page_index_filter_time, + _reader_statistics.page_index_filter_time); + COUNTER_UPDATE(_parquet_profile.read_page_index_time, _reader_statistics.read_page_index_time); + COUNTER_UPDATE(_parquet_profile.parse_page_index_time, + _reader_statistics.parse_page_index_time); + COUNTER_UPDATE(_parquet_profile.row_group_filter_time, + _reader_statistics.row_group_filter_time); + COUNTER_UPDATE(_parquet_profile.file_footer_read_calls, + _reader_statistics.file_footer_read_calls); + COUNTER_UPDATE(_parquet_profile.file_footer_hit_cache, + _reader_statistics.file_footer_hit_cache); COUNTER_UPDATE(_parquet_profile.skip_page_header_num, _column_statistics.skip_page_header_num); COUNTER_UPDATE(_parquet_profile.parse_page_header_num, _column_statistics.parse_page_header_num); - COUNTER_UPDATE(_parquet_profile.predicate_filter_time, _statistics.predicate_filter_time); - COUNTER_UPDATE(_parquet_profile.dict_filter_rewrite_time, _statistics.dict_filter_rewrite_time); - COUNTER_UPDATE(_parquet_profile.bloom_filter_read_time, _statistics.bloom_filter_read_time); + COUNTER_UPDATE(_parquet_profile.predicate_filter_time, + _reader_statistics.predicate_filter_time); + COUNTER_UPDATE(_parquet_profile.dict_filter_rewrite_time, + _reader_statistics.dict_filter_rewrite_time); + COUNTER_UPDATE(_parquet_profile.bloom_filter_read_time, + _reader_statistics.bloom_filter_read_time); COUNTER_UPDATE(_parquet_profile.page_index_read_calls, _column_statistics.page_index_read_calls); COUNTER_UPDATE(_parquet_profile.decompress_time, _column_statistics.decompress_time); COUNTER_UPDATE(_parquet_profile.decompress_cnt, _column_statistics.decompress_cnt); COUNTER_UPDATE(_parquet_profile.decode_header_time, _column_statistics.decode_header_time); + COUNTER_UPDATE(_parquet_profile.read_page_header_time, + _column_statistics.read_page_header_time); COUNTER_UPDATE(_parquet_profile.decode_value_time, _column_statistics.decode_value_time); COUNTER_UPDATE(_parquet_profile.decode_dict_time, _column_statistics.decode_dict_time); COUNTER_UPDATE(_parquet_profile.decode_level_time, _column_statistics.decode_level_time); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 31a39b442fcb70..e02266d944b864 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -74,7 +74,7 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper { ENABLE_FACTORY_CREATOR(ParquetReader); public: - struct Statistics { + struct ReaderStatistics { int32_t filtered_row_groups = 0; int32_t filtered_row_groups_by_min_max = 0; int32_t filtered_row_groups_by_bloom_filter = 0; @@ -84,13 +84,12 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper { int64_t lazy_read_filtered_rows = 0; int64_t read_rows = 0; int64_t filtered_bytes = 0; - int64_t read_bytes = 0; int64_t column_read_time = 0; int64_t parse_meta_time = 0; int64_t parse_footer_time = 0; int64_t file_footer_read_calls = 0; int64_t file_footer_hit_cache = 0; - int64_t open_file_time = 0; + int64_t file_reader_create_time = 0; int64_t open_file_num = 0; int64_t row_group_filter_time = 0; int64_t page_index_filter_time = 0; @@ -146,7 +145,7 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper { Status get_parsed_schema(std::vector* col_names, std::vector* col_types) override; - Statistics& statistics() { return _statistics; } + ReaderStatistics& reader_statistics() { return _reader_statistics; } const tparquet::FileMetaData* get_meta_data() const { return _t_metadata; } @@ -178,16 +177,16 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper { RuntimeProfile::Counter* filtered_row_groups_by_min_max = nullptr; RuntimeProfile::Counter* filtered_row_groups_by_bloom_filter = nullptr; RuntimeProfile::Counter* to_read_row_groups = nullptr; + RuntimeProfile::Counter* total_row_groups = nullptr; RuntimeProfile::Counter* filtered_group_rows = nullptr; RuntimeProfile::Counter* filtered_page_rows = nullptr; RuntimeProfile::Counter* lazy_read_filtered_rows = nullptr; RuntimeProfile::Counter* filtered_bytes = nullptr; RuntimeProfile::Counter* raw_rows_read = nullptr; - RuntimeProfile::Counter* to_read_bytes = nullptr; RuntimeProfile::Counter* column_read_time = nullptr; RuntimeProfile::Counter* parse_meta_time = nullptr; RuntimeProfile::Counter* parse_footer_time = nullptr; - RuntimeProfile::Counter* open_file_time = nullptr; + RuntimeProfile::Counter* file_reader_create_time = nullptr; RuntimeProfile::Counter* open_file_num = nullptr; RuntimeProfile::Counter* row_group_filter_time = nullptr; RuntimeProfile::Counter* page_index_read_calls = nullptr; @@ -199,6 +198,7 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper { RuntimeProfile::Counter* decompress_time = nullptr; RuntimeProfile::Counter* decompress_cnt = nullptr; RuntimeProfile::Counter* decode_header_time = nullptr; + RuntimeProfile::Counter* read_page_header_time = nullptr; RuntimeProfile::Counter* decode_value_time = nullptr; RuntimeProfile::Counter* decode_dict_time = nullptr; RuntimeProfile::Counter* decode_level_time = nullptr; @@ -322,8 +322,8 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper { // _table_column_names = _missing_cols + _read_table_columns const std::vector* _table_column_names = nullptr; - Statistics _statistics; - ParquetColumnReader::Statistics _column_statistics; + ReaderStatistics _reader_statistics; + ParquetColumnReader::ColumnStatistics _column_statistics; ParquetProfile _parquet_profile; bool _closed = false; io::IOContext* _io_ctx = nullptr; diff --git a/be/src/vec/exec/scan/file_scanner.cpp b/be/src/vec/exec/scan/file_scanner.cpp index 8a0f296b2b97f2..6eea3334c47a05 100644 --- a/be/src/vec/exec/scan/file_scanner.cpp +++ b/be/src/vec/exec/scan/file_scanner.cpp @@ -1792,6 +1792,10 @@ void FileScanner::update_realtime_counters() { _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes( _file_reader_stats->read_bytes); + int64_t delta_bytes_read_from_local = + _file_cache_statistics->bytes_read_from_local - _last_bytes_read_from_local; + int64_t delta_bytes_read_from_remote = + _file_cache_statistics->bytes_read_from_remote - _last_bytes_read_from_remote; if (_file_cache_statistics->bytes_read_from_local == 0 && _file_cache_statistics->bytes_read_from_remote == 0) { _state->get_query_ctx() @@ -1802,16 +1806,15 @@ void FileScanner::update_realtime_counters() { _file_reader_stats->read_bytes); } else { _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage( - _file_cache_statistics->bytes_read_from_local); + delta_bytes_read_from_local); _state->get_query_ctx() ->resource_ctx() ->io_context() - ->update_scan_bytes_from_remote_storage( - _file_cache_statistics->bytes_read_from_remote); + ->update_scan_bytes_from_remote_storage(delta_bytes_read_from_remote); DorisMetrics::instance()->query_scan_bytes_from_local->increment( - _file_cache_statistics->bytes_read_from_local); + delta_bytes_read_from_local); DorisMetrics::instance()->query_scan_bytes_from_remote->increment( - _file_cache_statistics->bytes_read_from_remote); + delta_bytes_read_from_remote); } COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes); @@ -1821,8 +1824,9 @@ void FileScanner::update_realtime_counters() { _file_reader_stats->read_bytes = 0; _file_reader_stats->read_rows = 0; - _file_cache_statistics->bytes_read_from_local = 0; - _file_cache_statistics->bytes_read_from_remote = 0; + + _last_bytes_read_from_local = _file_cache_statistics->bytes_read_from_local; + _last_bytes_read_from_remote = _file_cache_statistics->bytes_read_from_remote; } void FileScanner::_collect_profile_before_close() { diff --git a/be/src/vec/exec/scan/file_scanner.h b/be/src/vec/exec/scan/file_scanner.h index 379f6a246f62bd..835da10eb27816 100644 --- a/be/src/vec/exec/scan/file_scanner.h +++ b/be/src/vec/exec/scan/file_scanner.h @@ -234,6 +234,8 @@ class FileScanner : public Scanner { std::pair, int> _row_id_column_iterator_pair = {nullptr, -1}; + int64_t _last_bytes_read_from_local = 0; + int64_t _last_bytes_read_from_remote = 0; private: Status _init_expr_ctxes();