Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](MergedIO) no need to merge large columns (#27315) #27497

Merged
merged 1 commit into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions be/src/io/fs/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b
Status st = _reader->read_at(offset, result, bytes_read, io_ctx);
_statistics.merged_io++;
_statistics.request_bytes += *bytes_read;
_statistics.read_bytes += *bytes_read;
_statistics.merged_bytes += *bytes_read;
return st;
}
if (offset + result.size > _random_access_ranges[range_index].end_offset) {
Expand All @@ -69,10 +69,10 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b
if (cached_data.contains(offset)) {
// has cached data in box
_read_in_box(cached_data, offset, result, &has_read);
_statistics.request_bytes += has_read;
if (has_read == result.size) {
// all data is read in cache
*bytes_read = has_read;
_statistics.request_bytes += has_read;
return Status::OK();
}
} else if (!cached_data.empty()) {
Expand All @@ -92,7 +92,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b
*bytes_read = has_read + read_size;
_statistics.merged_io++;
_statistics.request_bytes += read_size;
_statistics.read_bytes += read_size;
_statistics.merged_bytes += read_size;
return Status::OK();
}

Expand Down Expand Up @@ -187,7 +187,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b
*bytes_read = has_read + read_size;
_statistics.merged_io++;
_statistics.request_bytes += read_size;
_statistics.read_bytes += read_size;
_statistics.merged_bytes += read_size;
return Status::OK();
}

Expand Down Expand Up @@ -315,7 +315,7 @@ Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, siz
RETURN_IF_ERROR(
_reader->read_at(start_offset, Slice(_read_slice, to_read), bytes_read, io_ctx));
_statistics.merged_io++;
_statistics.read_bytes += *bytes_read;
_statistics.merged_bytes += *bytes_read;
}

SCOPED_RAW_TIMER(&_statistics.copy_time);
Expand Down
16 changes: 12 additions & 4 deletions be/src/io/fs/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class MergeRangeFileReader : public io::FileReader {
int64_t request_io = 0;
int64_t merged_io = 0;
int64_t request_bytes = 0;
int64_t read_bytes = 0;
int64_t merged_bytes = 0;
int64_t apply_bytes = 0;
};

struct RangeCachedData {
Expand Down Expand Up @@ -148,6 +149,9 @@ class MergeRangeFileReader : public io::FileReader {
// Equivalent min size of each IO that can reach the maximum storage speed limit:
// 512KB for oss, 4KB for hdfs
_equivalent_io_size = _is_oss ? OSS_MIN_IO_SIZE : HDFS_MIN_IO_SIZE;
for (const PrefetchRange& range : _random_access_ranges) {
_statistics.apply_bytes += range.end_offset - range.start_offset;
}
if (_profile != nullptr) {
const char* random_profile = "MergedSmallIO";
ADD_TIMER(_profile, random_profile);
Expand All @@ -157,7 +161,9 @@ class MergeRangeFileReader : public io::FileReader {
_merged_io = ADD_CHILD_COUNTER(_profile, "MergedIO", TUnit::UNIT, random_profile);
_request_bytes =
ADD_CHILD_COUNTER(_profile, "RequestBytes", TUnit::BYTES, random_profile);
_read_bytes = ADD_CHILD_COUNTER(_profile, "MergedBytes", TUnit::BYTES, random_profile);
_merged_bytes =
ADD_CHILD_COUNTER(_profile, "MergedBytes", TUnit::BYTES, random_profile);
_apply_bytes = ADD_CHILD_COUNTER(_profile, "ApplyBytes", TUnit::BYTES, random_profile);
}
}

Expand All @@ -182,7 +188,8 @@ class MergeRangeFileReader : public io::FileReader {
COUNTER_UPDATE(_request_io, _statistics.request_io);
COUNTER_UPDATE(_merged_io, _statistics.merged_io);
COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
COUNTER_UPDATE(_read_bytes, _statistics.read_bytes);
COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes);
COUNTER_UPDATE(_apply_bytes, _statistics.apply_bytes);
}
}
return Status::OK();
Expand Down Expand Up @@ -218,7 +225,8 @@ class MergeRangeFileReader : public io::FileReader {
RuntimeProfile::Counter* _request_io;
RuntimeProfile::Counter* _merged_io;
RuntimeProfile::Counter* _request_bytes;
RuntimeProfile::Counter* _read_bytes;
RuntimeProfile::Counter* _merged_bytes;
RuntimeProfile::Counter* _apply_bytes;

int _search_read_range(size_t start_offset, size_t end_offset);
void _clean_cached_data(RangeCachedData& cached_data);
Expand Down
12 changes: 10 additions & 2 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2183,20 +2183,28 @@ void ORCFileInputStream::beforeReadStripe(
// Generate prefetch ranges, build stripe file reader.
uint64_t offset = current_strip_information->getOffset();
std::vector<io::PrefetchRange> prefetch_ranges;
size_t total_io_size = 0;
for (uint64_t stream_id = 0; stream_id < current_strip_information->getNumberOfStreams();
++stream_id) {
std::unique_ptr<orc::StreamInformation> stream =
current_strip_information->getStreamInformation(stream_id);
uint32_t columnId = stream->getColumnId();
uint64_t length = stream->getLength();
if (selected_columns[columnId]) {
total_io_size += length;
doris::io::PrefetchRange prefetch_range = {offset, offset + length};
prefetch_ranges.emplace_back(std::move(prefetch_range));
}
offset += length;
}
// The underlying page reader will prefetch data in column.
_file_reader.reset(new io::MergeRangeFileReader(_profile, _inner_reader, prefetch_ranges));
size_t num_columns = std::count_if(selected_columns.begin(), selected_columns.end(),
[](bool selected) { return selected; });
if (total_io_size / num_columns < io::MergeRangeFileReader::SMALL_IO) {
// The underlying page reader will prefetch data in column.
_file_reader.reset(new io::MergeRangeFileReader(_profile, _inner_reader, prefetch_ranges));
} else {
_file_reader = _inner_reader;
}
}

} // namespace doris::vectorized
6 changes: 3 additions & 3 deletions be/test/io/fs/buffered_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,13 @@ TEST_F(BufferedReaderTest, test_read_amplify) {
merge_reader.read_at(1024 * kb, result, &bytes_read, nullptr);
EXPECT_EQ(bytes_read, 1024 * kb);
EXPECT_EQ(merge_reader.statistics().request_bytes, 1024 * kb);
EXPECT_EQ(merge_reader.statistics().read_bytes, 1024 * kb);
EXPECT_EQ(merge_reader.statistics().merged_bytes, 1024 * kb);
// read column0
result.size = 1 * kb;
// will merge column 0 ~ 3
merge_reader.read_at(0, result, &bytes_read, nullptr);
EXPECT_EQ(bytes_read, 1 * kb);
EXPECT_EQ(merge_reader.statistics().read_bytes, 1024 * kb + 12 * kb);
EXPECT_EQ(merge_reader.statistics().merged_bytes, 1024 * kb + 12 * kb);
// read column1
result.size = 1 * kb;
merge_reader.read_at(3 * kb, result, &bytes_read, nullptr);
Expand All @@ -312,7 +312,7 @@ TEST_F(BufferedReaderTest, test_read_amplify) {
result.size = 5 * kb;
merge_reader.read_at(7 * kb, result, &bytes_read, nullptr);
EXPECT_EQ(merge_reader.statistics().request_bytes, 1024 * kb + 8 * kb);
EXPECT_EQ(merge_reader.statistics().read_bytes, 1024 * kb + 12 * kb);
EXPECT_EQ(merge_reader.statistics().merged_bytes, 1024 * kb + 12 * kb);
}

TEST_F(BufferedReaderTest, test_merged_io) {
Expand Down
Loading