Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions be/src/io/fs/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -819,12 +819,10 @@ Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset
int64_t buf_remaining = _buf_end_offset - _buf_start_offset;
int64_t to_read = std::min(_buf_size - buf_remaining, _file_end_offset - _buf_end_offset);
int64_t has_read = 0;
SCOPED_RAW_TIMER(&_statistics.read_time);
while (has_read < to_read) {
size_t loop_read = 0;
Slice result(_buf.get() + buf_remaining + has_read, to_read - has_read);
RETURN_IF_ERROR(_file->read_at(_buf_end_offset + has_read, result, &loop_read, io_ctx));
_statistics.read_calls++;
if (loop_read == 0) {
break;
}
Expand All @@ -833,7 +831,6 @@ Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset
if (has_read != to_read) {
return Status::Corruption("Try to read {} bytes, but received {} bytes", to_read, has_read);
}
_statistics.read_bytes += to_read;
_buf_end_offset += to_read;
*buf = _buf.get();
return Status::OK();
Expand Down
18 changes: 0 additions & 18 deletions be/src/io/fs/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ class MergeRangeFileReader : public io::FileReader {
int64_t merged_io = 0;
int64_t request_bytes = 0;
int64_t merged_bytes = 0;
int64_t apply_bytes = 0;
};

struct RangeCachedData {
Expand Down Expand Up @@ -299,9 +298,6 @@ class MergeRangeFileReader : public io::FileReader {
_merged_read_slice_size = READ_SLICE_SIZE;
}

for (const PrefetchRange& range : _random_access_ranges) {
_statistics.apply_bytes += range.end_offset - range.start_offset;
}
if (_profile != nullptr) {
const char* random_profile = "MergedSmallIO";
ADD_TIMER_WITH_LEVEL(_profile, random_profile, 1);
Expand All @@ -315,8 +311,6 @@ class MergeRangeFileReader : public io::FileReader {
random_profile, 1);
_merged_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "MergedBytes", TUnit::BYTES,
random_profile, 1);
_apply_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "ApplyBytes", TUnit::BYTES,
random_profile, 1);
}
}

Expand Down Expand Up @@ -359,7 +353,6 @@ class MergeRangeFileReader : public io::FileReader {
COUNTER_UPDATE(_merged_io, _statistics.merged_io);
COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes);
COUNTER_UPDATE(_apply_bytes, _statistics.apply_bytes);
if (_reader != nullptr) {
_reader->collect_profile_before_close();
}
Expand All @@ -373,7 +366,6 @@ class MergeRangeFileReader : public io::FileReader {
RuntimeProfile::Counter* _merged_io = nullptr;
RuntimeProfile::Counter* _request_bytes = nullptr;
RuntimeProfile::Counter* _merged_bytes = nullptr;
RuntimeProfile::Counter* _apply_bytes = nullptr;

int _search_read_range(size_t start_offset, size_t end_offset);
void _clean_cached_data(RangeCachedData& cached_data);
Expand Down Expand Up @@ -619,12 +611,6 @@ class InMemoryFileReader final : public io::FileReader {
*/
class BufferedStreamReader {
public:
struct Statistics {
int64_t read_time = 0;
int64_t read_calls = 0;
int64_t read_bytes = 0;
};

/**
* Return the address of underlying buffer that locates the start of data between [offset, offset + bytes_to_read)
* @param buf the buffer address to save the start address of data
Expand All @@ -637,13 +623,9 @@ class BufferedStreamReader {
* Save the data address to slice.data, and the slice.size is the bytes to read.
*/
virtual Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) = 0;
Statistics& statistics() { return _statistics; }
virtual ~BufferedStreamReader() = default;
// return the file path
virtual std::string path() = 0;

protected:
Statistics _statistics;
};

class BufferedFileStreamReader : public BufferedStreamReader, public ProfileCollector {
Expand Down
1 change: 1 addition & 0 deletions be/src/io/fs/tracing_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class TracingFileReader : public FileReader {
void _collect_profile_before_close() override { return _inner->collect_profile_before_close(); }

FileReaderStats* stats() const { return _stats; }
doris::io::FileReaderSPtr inner_reader() { return _inner; }

private:
doris::io::FileReaderSPtr _inner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::load_page_data() {
// check decompressed buffer size
_reserve_decompress_buf(uncompressed_size);
_page_data = Slice(_decompress_buf.get(), uncompressed_size);
SCOPED_RAW_TIMER(&_statistics.decompress_time);
_statistics.decompress_cnt++;
SCOPED_RAW_TIMER(&_chunk_statistics.decompress_time);
_chunk_statistics.decompress_cnt++;
RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &_page_data));
} else {
// Don't need decompress
Expand All @@ -205,7 +205,7 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::load_page_data() {

// Initialize repetition level and definition level. Skip when level = 0, which means required field.
if (_max_rep_level > 0) {
SCOPED_RAW_TIMER(&_statistics.decode_level_time);
SCOPED_RAW_TIMER(&_chunk_statistics.decode_level_time);
if (header->__isset.data_page_header_v2) {
RETURN_IF_ERROR(_rep_level_decoder.init_v2(_v2_rep_levels, _max_rep_level,
_remaining_rep_nums));
Expand All @@ -216,7 +216,7 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::load_page_data() {
}
}
if (_max_def_level > 0) {
SCOPED_RAW_TIMER(&_statistics.decode_level_time);
SCOPED_RAW_TIMER(&_chunk_statistics.decode_level_time);
if (header->__isset.data_page_header_v2) {
RETURN_IF_ERROR(_def_level_decoder.init_v2(_v2_def_levels, _max_def_level,
_remaining_def_nums));
Expand Down Expand Up @@ -256,7 +256,7 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::_decode_dict_page() {
const tparquet::PageHeader* header = nullptr;
RETURN_IF_ERROR(_page_reader->get_page_header(header));
DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header->type);
SCOPED_RAW_TIMER(&_statistics.decode_dict_time);
SCOPED_RAW_TIMER(&_chunk_statistics.decode_dict_time);

// Using the PLAIN_DICTIONARY enum value is deprecated in the Parquet 2.0 specification.
// Prefer using RLE_DICTIONARY in a data page and PLAIN in a dictionary page for Parquet 2.0+ files.
Expand Down Expand Up @@ -315,7 +315,7 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::skip_values(size_t num_va
}
_remaining_num_values -= num_values;
if (skip_data) {
SCOPED_RAW_TIMER(&_statistics.decode_value_time);
SCOPED_RAW_TIMER(&_chunk_statistics.decode_value_time);
return _page_decoder->skip_values(num_values);
} else {
return Status::OK();
Expand All @@ -329,7 +329,7 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::decode_values(
if (select_vector.num_values() == 0) {
return Status::OK();
}
SCOPED_RAW_TIMER(&_statistics.decode_value_time);
SCOPED_RAW_TIMER(&_chunk_statistics.decode_value_time);
if (UNLIKELY((doris_column->is_column_dictionary() || is_dict_filter) && !_has_dict)) {
return Status::IOError("Not dictionary coded");
}
Expand Down
17 changes: 11 additions & 6 deletions be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ struct ColumnChunkReaderStatistics {
int64_t decode_level_time = 0;
int64_t skip_page_header_num = 0;
int64_t parse_page_header_num = 0;
int64_t read_page_header_time = 0;
};

/**
Expand Down Expand Up @@ -146,11 +147,15 @@ class ColumnChunkReader {
// Get page decoder
Decoder* get_page_decoder() { return _page_decoder; }

ColumnChunkReaderStatistics& statistics() {
_statistics.decode_header_time = _page_reader->statistics().decode_header_time;
_statistics.skip_page_header_num = _page_reader->statistics().skip_page_header_num;
_statistics.parse_page_header_num = _page_reader->statistics().parse_page_header_num;
return _statistics;
ColumnChunkReaderStatistics& chunk_statistics() {
_chunk_statistics.decode_header_time = _page_reader->page_statistics().decode_header_time;
_chunk_statistics.skip_page_header_num =
_page_reader->page_statistics().skip_page_header_num;
_chunk_statistics.parse_page_header_num =
_page_reader->page_statistics().parse_page_header_num;
_chunk_statistics.read_page_header_time =
_page_reader->page_statistics().read_page_header_time;
return _chunk_statistics;
}

Status read_dict_values_to_column(MutableColumnPtr& doris_column) {
Expand Down Expand Up @@ -238,7 +243,7 @@ class ColumnChunkReader {
// Map: encoding -> Decoder
// Plain or Dictionary encoding. If the dictionary grows too big, the encoding will fall back to the plain encoding
std::unordered_map<int, std::unique_ptr<Decoder>> _decoders;
ColumnChunkReaderStatistics _statistics;
ColumnChunkReaderStatistics _chunk_statistics;
};
#include "common/compile_check_end.h"

Expand Down
6 changes: 5 additions & 1 deletion be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <algorithm>
#include <utility>

#include "io/fs/tracing_file_reader.h"
#include "runtime/define_primitive_type.h"
#include "schema_desc.h"
#include "util/runtime_profile.h"
Expand Down Expand Up @@ -248,7 +249,10 @@ Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::init(io::FileReaderSPtr
: chunk_meta.data_page_offset;
size_t chunk_len = chunk_meta.total_compressed_size;
size_t prefetch_buffer_size = std::min(chunk_len, max_buf_size);
if (typeid_cast<io::MergeRangeFileReader*>(file.get())) {
if ((typeid_cast<doris::io::TracingFileReader*>(file.get()) &&
typeid_cast<io::MergeRangeFileReader*>(
((doris::io::TracingFileReader*)(file.get()))->inner_reader().get())) ||
typeid_cast<io::MergeRangeFileReader*>(file.get())) {
// turn off prefetch data when using MergeRangeFileReader
prefetch_buffer_size = 0;
}
Expand Down
82 changes: 36 additions & 46 deletions be/src/vec/exec/format/parquet/vparquet_column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,9 @@ using ColumnString = ColumnStr<UInt32>;

class ParquetColumnReader {
public:
struct Statistics {
Statistics()
: read_time(0),
read_calls(0),
page_index_read_calls(0),
read_bytes(0),
struct ColumnStatistics {
ColumnStatistics()
: page_index_read_calls(0),
decompress_time(0),
decompress_cnt(0),
decode_header_time(0),
Expand All @@ -67,14 +64,11 @@ class ParquetColumnReader {
decode_level_time(0),
decode_null_map_time(0),
skip_page_header_num(0),
parse_page_header_num(0) {}

Statistics(io::BufferedStreamReader::Statistics& fs, ColumnChunkReaderStatistics& cs,
int64_t null_map_time)
: read_time(fs.read_time),
read_calls(fs.read_calls),
page_index_read_calls(0),
read_bytes(fs.read_bytes),
parse_page_header_num(0),
read_page_header_time(0) {}

ColumnStatistics(ColumnChunkReaderStatistics& cs, int64_t null_map_time)
: page_index_read_calls(0),
decompress_time(cs.decompress_time),
decompress_cnt(cs.decompress_cnt),
decode_header_time(cs.decode_header_time),
Expand All @@ -83,12 +77,10 @@ class ParquetColumnReader {
decode_level_time(cs.decode_level_time),
decode_null_map_time(null_map_time),
skip_page_header_num(cs.skip_page_header_num),
parse_page_header_num(cs.parse_page_header_num) {}
parse_page_header_num(cs.parse_page_header_num),
read_page_header_time(cs.read_page_header_time) {}

int64_t read_time;
int64_t read_calls;
int64_t page_index_read_calls;
int64_t read_bytes;
int64_t decompress_time;
int64_t decompress_cnt;
int64_t decode_header_time;
Expand All @@ -98,21 +90,20 @@ class ParquetColumnReader {
int64_t decode_null_map_time;
int64_t skip_page_header_num;
int64_t parse_page_header_num;

void merge(Statistics& statistics) {
read_time += statistics.read_time;
read_calls += statistics.read_calls;
read_bytes += statistics.read_bytes;
page_index_read_calls += statistics.page_index_read_calls;
decompress_time += statistics.decompress_time;
decompress_cnt += statistics.decompress_cnt;
decode_header_time += statistics.decode_header_time;
decode_value_time += statistics.decode_value_time;
decode_dict_time += statistics.decode_dict_time;
decode_level_time += statistics.decode_level_time;
decode_null_map_time += statistics.decode_null_map_time;
skip_page_header_num += statistics.skip_page_header_num;
parse_page_header_num += statistics.parse_page_header_num;
int64_t read_page_header_time;

void merge(ColumnStatistics& col_statistics) {
page_index_read_calls += col_statistics.page_index_read_calls;
decompress_time += col_statistics.decompress_time;
decompress_cnt += col_statistics.decompress_cnt;
decode_header_time += col_statistics.decode_header_time;
decode_value_time += col_statistics.decode_value_time;
decode_dict_time += col_statistics.decode_dict_time;
decode_level_time += col_statistics.decode_level_time;
decode_null_map_time += col_statistics.decode_null_map_time;
skip_page_header_num += col_statistics.skip_page_header_num;
parse_page_header_num += col_statistics.parse_page_header_num;
read_page_header_time += col_statistics.read_page_header_time;
}
};

Expand Down Expand Up @@ -145,7 +136,7 @@ class ParquetColumnReader {
const std::set<uint64_t>& filter_column_ids = {});
virtual const std::vector<level_t>& get_rep_level() const = 0;
virtual const std::vector<level_t>& get_def_level() const = 0;
virtual Statistics statistics() = 0;
virtual ColumnStatistics column_statistics() = 0;
virtual void close() = 0;

virtual void reset_filter_map_index() = 0;
Expand Down Expand Up @@ -188,9 +179,8 @@ class ScalarColumnReader : public ParquetColumnReader {
MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) override;
const std::vector<level_t>& get_rep_level() const override { return _rep_levels; }
const std::vector<level_t>& get_def_level() const override { return _def_levels; }
Statistics statistics() override {
return Statistics(_stream_reader->statistics(), _chunk_reader->statistics(),
_decode_null_map_time);
ColumnStatistics column_statistics() override {
return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time);
}
void close() override {}

Expand Down Expand Up @@ -304,7 +294,7 @@ class ArrayColumnReader : public ParquetColumnReader {
const std::vector<level_t>& get_def_level() const override {
return _element_reader->get_def_level();
}
Statistics statistics() override { return _element_reader->statistics(); }
ColumnStatistics column_statistics() override { return _element_reader->column_statistics(); }
void close() override {}

void reset_filter_map_index() override { _element_reader->reset_filter_map_index(); }
Expand Down Expand Up @@ -335,9 +325,9 @@ class MapColumnReader : public ParquetColumnReader {
return _key_reader->get_def_level();
}

Statistics statistics() override {
Statistics kst = _key_reader->statistics();
Statistics vst = _value_reader->statistics();
ColumnStatistics column_statistics() override {
ColumnStatistics kst = _key_reader->column_statistics();
ColumnStatistics vst = _value_reader->column_statistics();
kst.merge(vst);
return kst;
}
Expand Down Expand Up @@ -392,12 +382,12 @@ class StructColumnReader : public ParquetColumnReader {
return _child_readers.begin()->second->get_def_level();
}

Statistics statistics() override {
Statistics st;
ColumnStatistics column_statistics() override {
ColumnStatistics st;
for (const auto& column_name : _read_column_names) {
auto reader = _child_readers.find(column_name);
if (reader != _child_readers.end()) {
Statistics cst = reader->second->statistics();
ColumnStatistics cst = reader->second->column_statistics();
st.merge(cst);
}
}
Expand Down Expand Up @@ -490,8 +480,8 @@ class SkipReadingReader : public ParquetColumnReader {
}

// Implement required pure virtual methods from base class
Statistics statistics() override {
return Statistics(); // Return empty statistics
ColumnStatistics column_statistics() override {
return ColumnStatistics(); // Return empty statistics
}

void close() override {
Expand Down
6 changes: 3 additions & 3 deletions be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1125,10 +1125,10 @@ void RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
}
}

ParquetColumnReader::Statistics RowGroupReader::statistics() {
ParquetColumnReader::Statistics st;
ParquetColumnReader::ColumnStatistics RowGroupReader::merged_column_statistics() {
ParquetColumnReader::ColumnStatistics st;
for (auto& reader : _column_readers) {
auto ost = reader.second->statistics();
auto ost = reader.second->column_statistics();
st.merge(ost);
}
return st;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/parquet/vparquet_group_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class RowGroupReader : public ProfileCollector {
int64_t predicate_filter_time() const { return _predicate_filter_time; }
int64_t dict_filter_rewrite_time() const { return _dict_filter_rewrite_time; }

ParquetColumnReader::Statistics statistics();
ParquetColumnReader::ColumnStatistics merged_column_statistics();
void set_remaining_rows(int64_t rows) { _remaining_rows = rows; }
int64_t get_remaining_rows() { return _remaining_rows; }

Expand Down
Loading
Loading