Skip to content

Commit

Permalink
[opt](MergedIO) no need to merge large columns (#27315) (#27497)
Browse files Browse the repository at this point in the history
1. Fix a profile bug of `MergeRangeFileReader`, and add a profile `ApplyBytes` to show the total bytes  of ranges.
2. There's no need to merge large columns, because `MergeRangeFileReader` will increase the copy time.
  • Loading branch information
AshinGau authored Nov 23, 2023
1 parent 3cb8e88 commit ae697d1
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 14 deletions.
10 changes: 5 additions & 5 deletions be/src/io/fs/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b
Status st = _reader->read_at(offset, result, bytes_read, io_ctx);
_statistics.merged_io++;
_statistics.request_bytes += *bytes_read;
_statistics.read_bytes += *bytes_read;
_statistics.merged_bytes += *bytes_read;
return st;
}
if (offset + result.size > _random_access_ranges[range_index].end_offset) {
Expand All @@ -69,10 +69,10 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b
if (cached_data.contains(offset)) {
// has cached data in box
_read_in_box(cached_data, offset, result, &has_read);
_statistics.request_bytes += has_read;
if (has_read == result.size) {
// all data is read in cache
*bytes_read = has_read;
_statistics.request_bytes += has_read;
return Status::OK();
}
} else if (!cached_data.empty()) {
Expand All @@ -92,7 +92,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b
*bytes_read = has_read + read_size;
_statistics.merged_io++;
_statistics.request_bytes += read_size;
_statistics.read_bytes += read_size;
_statistics.merged_bytes += read_size;
return Status::OK();
}

Expand Down Expand Up @@ -187,7 +187,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b
*bytes_read = has_read + read_size;
_statistics.merged_io++;
_statistics.request_bytes += read_size;
_statistics.read_bytes += read_size;
_statistics.merged_bytes += read_size;
return Status::OK();
}

Expand Down Expand Up @@ -315,7 +315,7 @@ Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, siz
RETURN_IF_ERROR(
_reader->read_at(start_offset, Slice(_read_slice, to_read), bytes_read, io_ctx));
_statistics.merged_io++;
_statistics.read_bytes += *bytes_read;
_statistics.merged_bytes += *bytes_read;
}

SCOPED_RAW_TIMER(&_statistics.copy_time);
Expand Down
16 changes: 12 additions & 4 deletions be/src/io/fs/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class MergeRangeFileReader : public io::FileReader {
int64_t request_io = 0;
int64_t merged_io = 0;
int64_t request_bytes = 0;
int64_t read_bytes = 0;
int64_t merged_bytes = 0;
int64_t apply_bytes = 0;
};

struct RangeCachedData {
Expand Down Expand Up @@ -148,6 +149,9 @@ class MergeRangeFileReader : public io::FileReader {
// Equivalent min size of each IO that can reach the maximum storage speed limit:
// 512KB for oss, 4KB for hdfs
_equivalent_io_size = _is_oss ? OSS_MIN_IO_SIZE : HDFS_MIN_IO_SIZE;
for (const PrefetchRange& range : _random_access_ranges) {
_statistics.apply_bytes += range.end_offset - range.start_offset;
}
if (_profile != nullptr) {
const char* random_profile = "MergedSmallIO";
ADD_TIMER(_profile, random_profile);
Expand All @@ -157,7 +161,9 @@ class MergeRangeFileReader : public io::FileReader {
_merged_io = ADD_CHILD_COUNTER(_profile, "MergedIO", TUnit::UNIT, random_profile);
_request_bytes =
ADD_CHILD_COUNTER(_profile, "RequestBytes", TUnit::BYTES, random_profile);
_read_bytes = ADD_CHILD_COUNTER(_profile, "MergedBytes", TUnit::BYTES, random_profile);
_merged_bytes =
ADD_CHILD_COUNTER(_profile, "MergedBytes", TUnit::BYTES, random_profile);
_apply_bytes = ADD_CHILD_COUNTER(_profile, "ApplyBytes", TUnit::BYTES, random_profile);
}
}

Expand All @@ -182,7 +188,8 @@ class MergeRangeFileReader : public io::FileReader {
COUNTER_UPDATE(_request_io, _statistics.request_io);
COUNTER_UPDATE(_merged_io, _statistics.merged_io);
COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
COUNTER_UPDATE(_read_bytes, _statistics.read_bytes);
COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes);
COUNTER_UPDATE(_apply_bytes, _statistics.apply_bytes);
}
}
return Status::OK();
Expand Down Expand Up @@ -218,7 +225,8 @@ class MergeRangeFileReader : public io::FileReader {
RuntimeProfile::Counter* _request_io;
RuntimeProfile::Counter* _merged_io;
RuntimeProfile::Counter* _request_bytes;
RuntimeProfile::Counter* _read_bytes;
RuntimeProfile::Counter* _merged_bytes;
RuntimeProfile::Counter* _apply_bytes;

int _search_read_range(size_t start_offset, size_t end_offset);
void _clean_cached_data(RangeCachedData& cached_data);
Expand Down
12 changes: 10 additions & 2 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2183,20 +2183,28 @@ void ORCFileInputStream::beforeReadStripe(
// Generate prefetch ranges, build stripe file reader.
uint64_t offset = current_strip_information->getOffset();
std::vector<io::PrefetchRange> prefetch_ranges;
size_t total_io_size = 0;
for (uint64_t stream_id = 0; stream_id < current_strip_information->getNumberOfStreams();
++stream_id) {
std::unique_ptr<orc::StreamInformation> stream =
current_strip_information->getStreamInformation(stream_id);
uint32_t columnId = stream->getColumnId();
uint64_t length = stream->getLength();
if (selected_columns[columnId]) {
total_io_size += length;
doris::io::PrefetchRange prefetch_range = {offset, offset + length};
prefetch_ranges.emplace_back(std::move(prefetch_range));
}
offset += length;
}
// The underlying page reader will prefetch data in column.
_file_reader.reset(new io::MergeRangeFileReader(_profile, _inner_reader, prefetch_ranges));
size_t num_columns = std::count_if(selected_columns.begin(), selected_columns.end(),
[](bool selected) { return selected; });
if (total_io_size / num_columns < io::MergeRangeFileReader::SMALL_IO) {
// The underlying page reader will prefetch data in column.
_file_reader.reset(new io::MergeRangeFileReader(_profile, _inner_reader, prefetch_ranges));
} else {
_file_reader = _inner_reader;
}
}

} // namespace doris::vectorized
6 changes: 3 additions & 3 deletions be/test/io/fs/buffered_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,13 @@ TEST_F(BufferedReaderTest, test_read_amplify) {
merge_reader.read_at(1024 * kb, result, &bytes_read, nullptr);
EXPECT_EQ(bytes_read, 1024 * kb);
EXPECT_EQ(merge_reader.statistics().request_bytes, 1024 * kb);
EXPECT_EQ(merge_reader.statistics().read_bytes, 1024 * kb);
EXPECT_EQ(merge_reader.statistics().merged_bytes, 1024 * kb);
// read column0
result.size = 1 * kb;
// will merge column 0 ~ 3
merge_reader.read_at(0, result, &bytes_read, nullptr);
EXPECT_EQ(bytes_read, 1 * kb);
EXPECT_EQ(merge_reader.statistics().read_bytes, 1024 * kb + 12 * kb);
EXPECT_EQ(merge_reader.statistics().merged_bytes, 1024 * kb + 12 * kb);
// read column1
result.size = 1 * kb;
merge_reader.read_at(3 * kb, result, &bytes_read, nullptr);
Expand All @@ -312,7 +312,7 @@ TEST_F(BufferedReaderTest, test_read_amplify) {
result.size = 5 * kb;
merge_reader.read_at(7 * kb, result, &bytes_read, nullptr);
EXPECT_EQ(merge_reader.statistics().request_bytes, 1024 * kb + 8 * kb);
EXPECT_EQ(merge_reader.statistics().read_bytes, 1024 * kb + 12 * kb);
EXPECT_EQ(merge_reader.statistics().merged_bytes, 1024 * kb + 12 * kb);
}

TEST_F(BufferedReaderTest, test_merged_io) {
Expand Down

0 comments on commit ae697d1

Please sign in to comment.