diff --git a/dbms/src/Storages/DeltaMerge/File/ColumnCache.cpp b/dbms/src/Storages/DeltaMerge/File/ColumnCache.cpp index 1283c2aa1c4..94ac997c3aa 100644 --- a/dbms/src/Storages/DeltaMerge/File/ColumnCache.cpp +++ b/dbms/src/Storages/DeltaMerge/File/ColumnCache.cpp @@ -14,19 +14,18 @@ #include -namespace DB +namespace DB::DM { -namespace DM -{ -RangeWithStrategys ColumnCache::getReadStrategy(size_t pack_id, size_t pack_count, ColId column_id) + +RangeWithStrategys ColumnCache::getReadStrategy(size_t start_pack_idx, size_t pack_count, ColId column_id) { - PackRange target_range{pack_id, pack_id + pack_count}; + PackRange target_range{start_pack_idx, start_pack_idx + pack_count}; RangeWithStrategys range_and_strategys; - - Strategy strategy = Strategy::Unknown; + range_and_strategys.reserve(pack_count); + auto strategy = Strategy::Unknown; size_t range_start = 0; - for (size_t cursor = target_range.first; cursor < target_range.second; cursor++) + for (size_t cursor = target_range.first; cursor < target_range.second; ++cursor) { if (isPackInCache(cursor, column_id)) { @@ -36,7 +35,7 @@ RangeWithStrategys ColumnCache::getReadStrategy(size_t pack_id, size_t pack_coun } else if (strategy == Strategy::Disk) { - range_and_strategys.emplace_back(std::make_pair(PackRange{range_start, cursor}, Strategy::Disk)); + range_and_strategys.emplace_back(PackRange{range_start, cursor}, Strategy::Disk); } range_start = cursor; strategy = Strategy::Memory; @@ -45,7 +44,7 @@ RangeWithStrategys ColumnCache::getReadStrategy(size_t pack_id, size_t pack_coun { if (strategy == Strategy::Memory) { - range_and_strategys.emplace_back(std::make_pair(PackRange{range_start, cursor}, Strategy::Memory)); + range_and_strategys.emplace_back(PackRange{range_start, cursor}, Strategy::Memory); } else if (strategy == Strategy::Disk) { @@ -55,8 +54,53 @@ RangeWithStrategys ColumnCache::getReadStrategy(size_t pack_id, size_t pack_coun strategy = Strategy::Disk; } } - range_and_strategys.emplace_back(std::make_pair(PackRange{range_start, target_range.second}, strategy)); + range_and_strategys.emplace_back(PackRange{range_start, target_range.second}, strategy); + range_and_strategys.shrink_to_fit(); + return range_and_strategys; +} +RangeWithStrategys ColumnCache::getReadStrategy( + size_t start_pack_idx, + size_t pack_count, + const std::vector & clean_read_pack_idx) +{ + PackRange target_range{start_pack_idx, start_pack_idx + pack_count}; + + RangeWithStrategys range_and_strategys; + range_and_strategys.reserve(pack_count); + auto strategy = Strategy::Unknown; + size_t range_start = 0; + for (size_t cursor = target_range.first; cursor < target_range.second; ++cursor) + { + if (std::find(clean_read_pack_idx.cbegin(), clean_read_pack_idx.cend(), cursor) != clean_read_pack_idx.cend()) + { + if (strategy == Strategy::Memory) + { + continue; + } + else if (strategy == Strategy::Disk) + { + range_and_strategys.emplace_back(PackRange{range_start, cursor}, Strategy::Disk); + } + range_start = cursor; + strategy = Strategy::Memory; + } + else + { + if (strategy == Strategy::Memory) + { + range_and_strategys.emplace_back(PackRange{range_start, cursor}, Strategy::Memory); + } + else if (strategy == Strategy::Disk) + { + continue; + } + range_start = cursor; + strategy = Strategy::Disk; + } + } + range_and_strategys.emplace_back(PackRange{range_start, target_range.second}, strategy); + range_and_strategys.shrink_to_fit(); return range_and_strategys; } @@ -125,5 +169,4 @@ bool ColumnCache::isPackInCache(PackId pack_id, ColId column_id) return false; } -} // namespace DM -} // namespace DB +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/ColumnCache.h b/dbms/src/Storages/DeltaMerge/File/ColumnCache.h index 1bc57f3f86e..1f7b4d0c1ae 100644 --- a/dbms/src/Storages/DeltaMerge/File/ColumnCache.h +++ b/dbms/src/Storages/DeltaMerge/File/ColumnCache.h @@ -14,18 +14,15 @@ #pragma once -#include #include #include -#include #include #include -namespace DB -{ -namespace DM +namespace DB::DM { + using ColId = DB::ColumnID; using PackId = size_t; using PackRange = std::pair; @@ -46,7 +43,11 @@ class ColumnCache using RangeWithStrategy = std::pair; using RangeWithStrategys = std::vector; - RangeWithStrategys getReadStrategy(size_t pack_id, size_t pack_count, ColId column_id); + RangeWithStrategys getReadStrategy(size_t start_pack_idx, size_t pack_count, ColId column_id); + static RangeWithStrategys getReadStrategy( + size_t start_pack_idx, + size_t pack_count, + const std::vector & clean_read_pack_idx); void tryPutColumn(size_t pack_id, ColId column_id, const ColumnPtr & column, size_t rows_offset, size_t rows_count); @@ -74,5 +75,5 @@ using ColumnCachePtrs = std::vector; using RangeWithStrategy = ColumnCache::RangeWithStrategy; using RangeWithStrategys = ColumnCache::RangeWithStrategys; using ColumnCacheElement = ColumnCache::ColumnCacheElement; -} // namespace DM -} // namespace DB + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index d0c7387df80..355372ab783 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -13,30 +13,19 @@ // limitations under the License. #include -#include #include #include #include #include #include -#include -#include -#include -#include -#include +#include #include #include #include -#include -#include +#include +#include #include -#include - -namespace CurrentMetrics -{ -extern const Metric OpenFileForRead; -} namespace DB::ErrorCodes { @@ -80,7 +69,6 @@ DMFileReader::DMFileReader( , enable_column_cache(enable_column_cache_ && column_cache_) , max_read_version(max_read_version_) , pack_filter(std::move(pack_filter_)) - , skip_packs_by_column(read_columns.size(), 0) , mark_cache(mark_cache_) , column_cache(column_cache_) , scan_context(scan_context_) @@ -114,19 +102,9 @@ DMFileReader::DMFileReader( if (max_sharing_column_bytes > 0) { col_data_cache = std::make_unique(path(), read_columns, log); - for (const auto & cd : read_columns) - { - last_read_from_cache[cd.id] = false; - } } } -bool DMFileReader::shouldSeek(size_t pack_id) const -{ - // If current pack is the first one, or we just finished reading the last pack, then no need to seek. - return pack_id != 0 && !pack_filter.getUsePacksConst()[pack_id - 1]; -} - bool DMFileReader::getSkippedRows(size_t & skip_rows) { skip_rows = 0; @@ -142,111 +120,104 @@ bool DMFileReader::getSkippedRows(size_t & skip_rows) return next_pack_id < use_packs.size(); } +// Skip the block which should be returned by next read() size_t DMFileReader::skipNextBlock() { - // Go to next available pack. - size_t skip; - if (!getSkippedRows(skip)) + // find the first pack which is used + if (size_t skip_rows; !getSkippedRows(skip_rows)) + { + // no block left in the stream return 0; + } - // Find the next contiguous packs will be read in next read, - // let next_pack_id point to the next pack of the contiguous packs. - // For example, if we have 10 packs, use_packs is [0, 1, 1, 0, 1, 1, 0, 0, 1, 1], - // and now next_pack_id is 1, then we will skip 2 packs(index 1 and 2), and next_pack_id will be 3. - const size_t read_pack_limit = read_one_pack_every_time ? 1 : 0; - const std::vector & handle_res = pack_filter.getHandleRes(); - const RSResult expected_handle_res = handle_res[next_pack_id]; - auto & use_packs = pack_filter.getUsePacks(); - size_t start_pack_id = next_pack_id; + // move forward next_pack_id and next_row_offset + const size_t read_rows = getReadRows(); + if (read_rows == 0) + return 0; + + addSkippedRows(read_rows); + scan_context->late_materialization_skip_rows += read_rows; + return read_rows; +} + +// Get the number of rows to read in the next block +// Move forward next_pack_id and next_row_offset +size_t DMFileReader::getReadRows() +{ + const auto & use_packs = pack_filter.getUsePacksConst(); + const size_t start_pack_id = next_pack_id; + // When read_one_pack_every_time is true, we can just read one pack every time. + // std::numeric_limits::max() means no limit + const size_t read_pack_limit = read_one_pack_every_time ? 1 : std::numeric_limits::max(); const auto & pack_stats = dmfile->getPackStats(); size_t read_rows = 0; for (; next_pack_id < use_packs.size() && use_packs[next_pack_id] && read_rows < rows_threshold_per_read; ++next_pack_id) { - if (read_pack_limit != 0 && next_pack_id - start_pack_id >= read_pack_limit) - break; - if (enable_handle_clean_read && handle_res[next_pack_id] != expected_handle_res) + if (next_pack_id - start_pack_id >= read_pack_limit) break; - read_rows += pack_stats[next_pack_id].rows; } - addSkippedRows(read_rows); next_row_offset += read_rows; - - // When we read dmfile, if the previous pack is not read, - // then we should seek to the right offset of dmfile. - // So if skip some packs successfully, - // then we set the last pack to false to indicate that we should seek before read. - if (likely(read_rows > 0)) - use_packs[next_pack_id - 1] = false; - - scan_context->late_materialization_skip_rows += read_rows; return read_rows; } Block DMFileReader::readWithFilter(const IColumn::Filter & filter) { - size_t skip_rows; - if (!getSkippedRows(skip_rows)) + /// 1. Skip filtered out packs. + if (size_t skip_rows; !getSkippedRows(skip_rows)) + { + // no block left in the stream return {}; + } + + /// 2. Mark use_packs[i] = false if all rows in the i-th pack are filtered out by filter. const auto & pack_stats = dmfile->getPackStats(); auto & use_packs = pack_filter.getUsePacks(); size_t start_row_offset = next_row_offset; - size_t next_pack_id_cp = next_pack_id; - - { - // Use std::find to find the first 1 in the filter, these rows before the first 1 should be skipped. - // For example, filter is [0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0], each pack has 3 rows - // 1. begin points to index 0, it points to index 3, skip = 3 >= 3, so use_packs[0] = false, read_rows = 3, next_pack_id_cp = 1 - // 2. it != filter.cend(), so use_packs[1] = true, read_rows = 6, next_pack_id_cp = 2 - // 3. begin points to index 6, and it points to index 6, skip = 0 < 3 - // 4. it != filter.cend(), so use_packs[2] = true, read_rows = 9, next_pack_id_cp = 3 - // 5. begin points to index 9, and it points to index 12, skip = 3 >= 3, so use_packs[3] = false, read_rows = 12, next_pack_id_cp = 4 - // 6. it == filter.cend(), break - // read_rows = filter.size() = 12, next_pack_id_cp = 4 - // This algorithm should be more efficient than check each pack one by one. - size_t read_rows = 0; - while (read_rows < filter.size()) + size_t start_pack_id = next_pack_id; + const size_t read_rows = getReadRows(); + RUNTIME_CHECK(read_rows == filter.size(), read_rows, filter.size()); + size_t last_pack_id = next_pack_id; + { + size_t offset = 0; + for (size_t i = start_pack_id; i < last_pack_id; ++i) { - const auto begin = filter.cbegin() + read_rows; - const auto it = std::find(begin, filter.cend(), 1); - auto skip = std::distance(begin, it); - while (next_pack_id_cp < use_packs.size() && skip >= pack_stats[next_pack_id_cp].rows) - { - scan_context->late_materialization_skip_rows += pack_stats[next_pack_id_cp].rows; - use_packs[next_pack_id_cp] = false; - skip -= pack_stats[next_pack_id_cp].rows; - read_rows += pack_stats[next_pack_id_cp].rows; - ++next_pack_id_cp; - } - if (it == filter.cend()) - break; - use_packs[next_pack_id_cp] = true; - read_rows += pack_stats[next_pack_id_cp].rows; - ++next_pack_id_cp; + if (countBytesInFilter(filter, offset, pack_stats[i].rows) == 0) + use_packs[i] = false; + offset += pack_stats[i].rows; } - // filter.size() equals to the number of rows in the next block - // so read_rows should be equal to filter.size() here. - RUNTIME_CHECK(read_rows == filter.size()); } - // mark the next pack after next read as not used temporarily - // to avoid reading it and its following packs in this round + /// 3. Mark the use_packs[last_pack_id] as false temporarily to avoid reading it and its following packs in this round + bool next_pack_id_use_packs_cp = false; - if (next_pack_id_cp < use_packs.size()) + if (last_pack_id < use_packs.size()) { - next_pack_id_use_packs_cp = use_packs[next_pack_id_cp]; - use_packs[next_pack_id_cp] = false; + next_pack_id_use_packs_cp = use_packs[last_pack_id]; + use_packs[last_pack_id] = false; } - Blocks blocks; - blocks.reserve(next_pack_id_cp - next_pack_id); + /// 4. Read and filter packs - size_t read_rows = 0; - for (size_t i = next_pack_id; i < next_pack_id_cp; ++i) + MutableColumns columns; + columns.reserve(read_columns.size()); + size_t total_passed_count = countBytesInFilter(filter); + for (const auto & cd : read_columns) + { + auto col = cd.type->createColumn(); + col->reserve(total_passed_count); + columns.emplace_back(std::move(col)); + } + + size_t offset = 0; + // reset next_pack_id to start_pack_id, next_row_offset to start_row_offset + next_pack_id = start_pack_id; + next_row_offset = start_row_offset; + for (size_t pack_id = start_pack_id; pack_id < last_pack_id; ++pack_id) { // When the next pack is not used or the pack is the last pack, call read() to read theses packs and filter them // For example: @@ -254,44 +225,53 @@ Block DMFileReader::readWithFilter(const IColumn::Filter & filter) // The algorithm runs as follows: // When i = next_pack_id + 2, call read() to read {next_pack_id, next_pack_id + 1}th packs // When i = next_pack_id + 5, call read() to read {next_pack_id + 3, next_pack_id + 4, next_pack_id + 5}th packs - if (use_packs[i] && (i + 1 == use_packs.size() || !use_packs[i + 1])) + if (use_packs[pack_id] && (pack_id + 1 == use_packs.size() || !use_packs[pack_id + 1])) { Block block = read(); + size_t rows = block.rows(); - IColumn::Filter block_filter(filter.cbegin() + read_rows, filter.cbegin() + read_rows + block.rows()); - read_rows += block.rows(); - - if (size_t passed_count = countBytesInFilter(block_filter); passed_count != block.rows()) + if (size_t passed_count = countBytesInFilter(filter, offset, rows); passed_count != rows) { - for (auto & col : block) + std::vector positions; + positions.reserve(passed_count); + for (size_t p = offset; p < offset + rows; ++p) { - col.column = col.column->filter(block_filter, passed_count); + if (filter[p]) + positions.push_back(p - offset); + } + for (size_t i = 0; i < block.columns(); ++i) + { + columns[i]->insertDisjunctFrom(*block.getByPosition(i).column, positions); } } - - blocks.emplace_back(std::move(block)); + else + { + for (size_t i = 0; i < block.columns(); ++i) + { + columns[i]->insertRangeFrom( + *block.getByPosition(i).column, + 0, + block.getByPosition(i).column->size()); + } + } + offset += rows; } - else if (!use_packs[i]) + else if (!use_packs[pack_id]) { - read_rows += pack_stats[i].rows; + offset += pack_stats[pack_id].rows; } } - // restore the use_packs of next pack after next read - if (next_pack_id_cp < use_packs.size()) - use_packs[next_pack_id_cp] = next_pack_id_use_packs_cp; + /// 5. Restore the use_packs[last_pack_id] - // merge blocks - Block res = vstackBlocks(std::move(blocks)); + if (last_pack_id < use_packs.size()) + use_packs[last_pack_id] = next_pack_id_use_packs_cp; + + Block res = getHeader().cloneWithColumns(std::move(columns)); res.setStartOffset(start_row_offset); return res; } -inline bool isExtraColumn(const ColumnDefine & cd) -{ - return cd.id == EXTRA_HANDLE_COLUMN_ID || cd.id == VERSION_COLUMN_ID || cd.id == TAG_COLUMN_ID; -} - bool DMFileReader::isCacheableColumn(const ColumnDefine & cd) { return cd.id == EXTRA_HANDLE_COLUMN_ID || cd.id == VERSION_COLUMN_ID; @@ -302,244 +282,295 @@ Block DMFileReader::read() Stopwatch watch; SCOPE_EXIT(scan_context->total_dmfile_read_time_ns += watch.elapsed();); - // Go to next available pack. - size_t skip_rows; + /// 1. Skip filtered out packs. + if (size_t skip_rows; !getSkippedRows(skip_rows)) + return {}; - getSkippedRows(skip_rows); - const auto & use_packs = pack_filter.getUsePacksConst(); + /// 2. Find the max continuous rows can be read. - if (next_pack_id >= use_packs.size()) - return {}; - // Find max continuing rows we can read. size_t start_pack_id = next_pack_id; size_t start_row_offset = next_row_offset; - // When read_one_pack_every_time is true, we can just read one pack every time. - // 0 means no limit - size_t read_pack_limit = read_one_pack_every_time ? 1 : 0; + const size_t read_rows = getReadRows(); + if (read_rows == 0) + return {}; + addScannedRows(read_rows); - const auto & pack_stats = dmfile->getPackStats(); + /// 3. Find packs can do clean read. + const auto & pack_stats = dmfile->getPackStats(); const auto & pack_properties = dmfile->getPackProperties(); + const auto & handle_res = pack_filter.getHandleRes(); // alias of handle_res in pack_filter + const size_t read_packs = next_pack_id - start_pack_id; + std::vector handle_column_clean_read_packs; + std::vector del_column_clean_read_packs; + std::vector version_column_clean_read_packs; + if (is_fast_scan) + { + if (enable_del_clean_read) + { + del_column_clean_read_packs.reserve(read_packs); + for (size_t i = start_pack_id; i < next_pack_id; ++i) + { + // If delete rows is 0, we do not need to read del column. + if (static_cast(pack_properties.property_size()) > i + && pack_properties.property(i).has_deleted_rows() + && (pack_properties.property(i).deleted_rows() == 0 + || pack_properties.property(i).deleted_rows() == pack_stats[i].rows)) + { + del_column_clean_read_packs.push_back(i); + } + } + } + if (enable_handle_clean_read) + { + handle_column_clean_read_packs.reserve(read_packs); + for (size_t i = start_pack_id; i < next_pack_id; ++i) + { + // If all handle in a pack are in the given range, and del column do clean read, we do not need to read handle column. + if (handle_res[i] == All + && std::find(del_column_clean_read_packs.cbegin(), del_column_clean_read_packs.cend(), i) + != del_column_clean_read_packs.cend()) + { + handle_column_clean_read_packs.push_back(i); + } + // If all handle in a pack are in the given range, but disable del clean read, we do not need to read handle column. + else if (!enable_del_clean_read && handle_res[i] == All) + { + handle_column_clean_read_packs.push_back(i); + } + } + } + } + else if (enable_handle_clean_read) + { + handle_column_clean_read_packs.reserve(read_packs); + version_column_clean_read_packs.reserve(read_packs); + del_column_clean_read_packs.reserve(read_packs); + for (size_t i = start_pack_id; i < next_pack_id; ++i) + { + // If all handle in a pack are in the given range, no not_clean rows, and max version <= max_read_version, + // we do not need to read handle column. + if (handle_res[i] == All && pack_stats[i].not_clean == 0 + && pack_filter.getMaxVersion(i) <= max_read_version) + { + handle_column_clean_read_packs.push_back(i); + version_column_clean_read_packs.push_back(i); + del_column_clean_read_packs.push_back(i); + } + } + } - size_t read_rows = 0; - size_t not_clean_rows = 0; - size_t deleted_rows = 0; + /// 4. Read columns. - const std::vector & handle_res = pack_filter.getHandleRes(); // alias of handle_res in pack_filter - RSResult expected_handle_res = handle_res[next_pack_id]; - for (; next_pack_id < use_packs.size() && use_packs[next_pack_id] && read_rows < rows_threshold_per_read; - ++next_pack_id) + ColumnsWithTypeAndName columns; + columns.reserve(read_columns.size()); + for (auto & cd : read_columns) { - if (read_pack_limit != 0 && next_pack_id - start_pack_id >= read_pack_limit) - break; - if (enable_handle_clean_read && handle_res[next_pack_id] != expected_handle_res) - break; - - read_rows += pack_stats[next_pack_id].rows; - not_clean_rows += pack_stats[next_pack_id].not_clean; - // Because deleted_rows is a new field in pack_properties, we need to check whehter this pack has this field. - // If this pack doesn't have this field, then we can't know whether this pack contains deleted rows. - // Thus we just deleted_rows += 1, to make sure we will not do the optimization with del column(just to make deleted_rows != 0). - if (static_cast(pack_properties.property_size()) > next_pack_id - && pack_properties.property(next_pack_id).has_deleted_rows()) + try { - deleted_rows += pack_properties.property(next_pack_id).deleted_rows(); + ColumnPtr col; + // For handle, tag and version column, we can try to do clean read. + switch (cd.id) + { + case EXTRA_HANDLE_COLUMN_ID: + col = readExtraColumn(cd, start_pack_id, read_packs, read_rows, handle_column_clean_read_packs); + break; + case TAG_COLUMN_ID: + col = readExtraColumn(cd, start_pack_id, read_packs, read_rows, del_column_clean_read_packs); + break; + case VERSION_COLUMN_ID: + col = readExtraColumn(cd, start_pack_id, read_packs, read_rows, version_column_clean_read_packs); + break; + default: + col = readColumn(cd, start_pack_id, read_packs, read_rows); + break; + } + columns.emplace_back(std::move(col), cd.type, cd.name, cd.id); } - else + catch (DB::Exception & e) { - deleted_rows += 1; + e.addMessage( + fmt::format("(while reading from DTFile: {}, column: {}:{})", this->dmfile->path(), cd.id, cd.name)); + e.rethrow(); } } - next_row_offset += read_rows; - if (read_rows == 0) - return {}; - - Block res; + Block res(std::move(columns)); res.setStartOffset(start_row_offset); + return res; +} - size_t read_packs = next_pack_id - start_pack_id; +ColumnPtr DMFileReader::cleanRead( + const ColumnDefine & cd, + size_t rows_count, + std::pair range, + const DMFile::PackStats & pack_stats) +{ + switch (cd.id) + { + case EXTRA_HANDLE_COLUMN_ID: + { + if (is_common_handle) + { + StringRef min_handle = pack_filter.getMinStringHandle(range.first); + return cd.type->createColumnConst(rows_count, Field(min_handle.data, min_handle.size)); + } + else + { + Handle min_handle = pack_filter.getMinHandle(range.first); + return cd.type->createColumnConst(rows_count, Field(min_handle)); + } + } + case TAG_COLUMN_ID: + { + return cd.type->createColumnConst(rows_count, Field(static_cast(pack_stats[range.first].first_tag))); + } + case VERSION_COLUMN_ID: + { + return cd.type->createColumnConst( + rows_count, + Field(static_cast(pack_stats[range.first].first_version))); + } + default: + { + // This should not happen + throw Exception("Unknown column id", ErrorCodes::LOGICAL_ERROR); + } + } +} - addScannedRows(read_rows); +/** + * Read the hidden column (handle, tag, version). + */ +ColumnPtr DMFileReader::readExtraColumn( + const ColumnDefine & cd, + size_t start_pack_id, + size_t pack_count, + size_t read_rows, + const std::vector & clean_read_packs) +{ + assert(cd.id == EXTRA_HANDLE_COLUMN_ID || cd.id == TAG_COLUMN_ID || cd.id == VERSION_COLUMN_ID); - // TODO: this will need better algorithm: we should separate those packs which can and can not do clean read. - bool do_clean_read_on_normal_mode - = enable_handle_clean_read && expected_handle_res == All && not_clean_rows == 0 && (!is_fast_scan); + const auto & pack_stats = dmfile->getPackStats(); + auto read_strategy = ColumnCache::getReadStrategy(start_pack_id, pack_count, clean_read_packs); + if (read_strategy.size() != 1 && cd.id == EXTRA_HANDLE_COLUMN_ID) + { + // If size of read_strategy is not 1, handle can not do clean read. + read_strategy.clear(); + read_strategy.emplace_back( + std::make_pair(start_pack_id, start_pack_id + pack_count), + ColumnCache::Strategy::Disk); + } + auto column = cd.type->createColumn(); + column->reserve(read_rows); + for (const auto & [range, strategy] : read_strategy) + { + size_t rows_count = 0; + for (size_t cursor = range.first; cursor < range.second; cursor++) + { + rows_count += pack_stats[cursor].rows; + } + // TODO: this create a temp `src_col` then copy the data into `column`. + // we can try to elimiate the copying + ColumnPtr src_col; + switch (strategy) + { + case ColumnCache::Strategy::Memory: + { + src_col = cleanRead(cd, rows_count, range, pack_stats); + break; + } + case ColumnCache::Strategy::Disk: + { + src_col = readColumn(cd, range.first, range.second - range.first, rows_count); + break; + } + default: + throw Exception("Unknown strategy", ErrorCodes::LOGICAL_ERROR); + } + if (read_strategy.size() == 1) + return src_col; + if (src_col->isColumnConst()) + { + // The src_col get from `cleanRead` may be a ColumnConst, fill the `column` + // with the value of ColumnConst + auto v = (*src_col)[0]; + column->insertMany(v, src_col->size()); + } + else + { + column->insertRangeFrom(*src_col, 0, src_col->size()); + } + } + return column; +} - bool do_clean_read_on_handle_on_fast_mode = enable_handle_clean_read && is_fast_scan && expected_handle_res == All; - bool do_clean_read_on_del_on_fast_mode = enable_del_clean_read && is_fast_scan && deleted_rows == 0; +ColumnPtr DMFileReader::readColumn(const ColumnDefine & cd, size_t start_pack_id, size_t pack_count, size_t read_rows) +{ + // New column after ddl is not exist in this DMFile, fill with default value + if (!column_streams.contains(DMFile::getFileNameBase(cd.id))) + return createColumnWithDefaultValue(cd, read_rows); - if (do_clean_read_on_normal_mode) + // Not cached + if (!enable_column_cache || !isCacheableColumn(cd)) { - UInt64 start_ts = 0; - for (size_t pack_id = start_pack_id; pack_id < next_pack_id; ++pack_id) - start_ts = std::max(pack_filter.getMaxVersion(pack_id), start_ts); - do_clean_read_on_normal_mode = start_ts <= max_read_version; + auto data_type = dmfile->getColumnStat(cd.id).type; + ColumnPtr column; + readFromDiskOrSharingCache(cd, column, start_pack_id, pack_count, read_rows); + return convertColumnByColumnDefineIfNeed(data_type, std::move(column), cd); } - for (size_t i = 0; i < read_columns.size(); ++i) + // enable_column_cache && isCacheableColumn(cd) + auto read_strategy = column_cache->getReadStrategy(start_pack_id, pack_count, cd.id); + const auto & pack_stats = dmfile->getPackStats(); + auto data_type = dmfile->getColumnStat(cd.id).type; + auto column = data_type->createColumn(); + column->reserve(read_rows); + for (auto & [range, strategy] : read_strategy) { - try + if (strategy == ColumnCache::Strategy::Memory) { - // For clean read of column pk, version, tag, instead of loading data from disk, just create placeholder column is OK. - const auto & cd = read_columns[i]; - if (cd.id == EXTRA_HANDLE_COLUMN_ID && do_clean_read_on_handle_on_fast_mode) + for (size_t cursor = range.first; cursor < range.second; cursor++) { - // Return the first row's handle - ColumnPtr column; - if (is_common_handle) - { - StringRef min_handle = pack_filter.getMinStringHandle(start_pack_id); - column = cd.type->createColumnConst(read_rows, Field(min_handle.data, min_handle.size)); - } - else - { - Handle min_handle = pack_filter.getMinHandle(start_pack_id); - column = cd.type->createColumnConst(read_rows, Field(min_handle)); - } - res.insert(ColumnWithTypeAndName{std::move(column), cd.type, cd.name, cd.id}); - skip_packs_by_column[i] = read_packs; + auto cache_element = column_cache->getColumn(cursor, cd.id); + column->insertRangeFrom( + *(cache_element.first), + cache_element.second.first, + cache_element.second.second); } - else if (cd.id == TAG_COLUMN_ID && do_clean_read_on_del_on_fast_mode) - { - ColumnPtr column = cd.type->createColumnConst( - read_rows, - Field(static_cast(pack_stats[start_pack_id].first_tag))); - res.insert(ColumnWithTypeAndName{std::move(column), cd.type, cd.name, cd.id}); - - skip_packs_by_column[i] = read_packs; - } - else if (do_clean_read_on_normal_mode && isExtraColumn(cd)) - { - ColumnPtr column; - if (cd.id == EXTRA_HANDLE_COLUMN_ID) - { - // Return the first row's handle - if (is_common_handle) - { - StringRef min_handle = pack_filter.getMinStringHandle(start_pack_id); - column = cd.type->createColumnConst(read_rows, Field(min_handle.data, min_handle.size)); - } - else - { - Handle min_handle = pack_filter.getMinHandle(start_pack_id); - column = cd.type->createColumnConst(read_rows, Field(min_handle)); - } - } - else if (cd.id == VERSION_COLUMN_ID) - { - column = cd.type->createColumnConst(read_rows, Field(pack_stats[start_pack_id].first_version)); - } - else if (cd.id == TAG_COLUMN_ID) - { - column = cd.type->createColumnConst( - read_rows, - Field(static_cast(pack_stats[start_pack_id].first_tag))); - } - - res.insert(ColumnWithTypeAndName{column, cd.type, cd.name, cd.id}); - - skip_packs_by_column[i] = read_packs; - } - else + } + else if (strategy == ColumnCache::Strategy::Disk) + { + size_t rows_count = 0; + for (size_t cursor = range.first; cursor < range.second; cursor++) { - const auto stream_name = DMFile::getFileNameBase(cd.id); - if (auto iter = column_streams.find(stream_name); iter != column_streams.end()) - { - if (enable_column_cache && isCacheableColumn(cd)) - { - auto read_strategy = column_cache->getReadStrategy(start_pack_id, read_packs, cd.id); - - auto data_type = dmfile->getColumnStat(cd.id).type; - auto column = data_type->createColumn(); - column->reserve(read_rows); - for (auto & [range, strategy] : read_strategy) - { - if (strategy == ColumnCache::Strategy::Memory) - { - for (size_t cursor = range.first; cursor < range.second; cursor++) - { - auto cache_element = column_cache->getColumn(cursor, cd.id); - column->insertRangeFrom( - *(cache_element.first), - cache_element.second.first, - cache_element.second.second); - } - skip_packs_by_column[i] += (range.second - range.first); - } - else if (strategy == ColumnCache::Strategy::Disk) - { - size_t rows_count = 0; - for (size_t cursor = range.first; cursor < range.second; cursor++) - { - rows_count += pack_stats[cursor].rows; - } - ColumnPtr col; - readColumn( - cd, - col, - range.first, - range.second - range.first, - rows_count, - skip_packs_by_column[i]); - column->insertRangeFrom(*col, 0, col->size()); - skip_packs_by_column[i] = 0; - } - else - { - throw Exception("Unknown strategy", ErrorCodes::LOGICAL_ERROR); - } - } - ColumnPtr result_column = std::move(column); - size_t rows_offset = 0; - for (size_t cursor = start_pack_id; cursor < start_pack_id + read_packs; cursor++) - { - column_cache - ->tryPutColumn(cursor, cd.id, result_column, rows_offset, pack_stats[cursor].rows); - rows_offset += pack_stats[cursor].rows; - } - // Cast column's data from DataType in disk to what we need now - auto converted_column - = convertColumnByColumnDefineIfNeed(data_type, std::move(result_column), cd); - res.insert(ColumnWithTypeAndName{converted_column, cd.type, cd.name, cd.id}); - } - else - { - auto data_type = dmfile->getColumnStat(cd.id).type; - ColumnPtr column; - readColumn(cd, column, start_pack_id, read_packs, read_rows, skip_packs_by_column[i]); - auto converted_column = convertColumnByColumnDefineIfNeed(data_type, std::move(column), cd); - - res.insert(ColumnWithTypeAndName{std::move(converted_column), cd.type, cd.name, cd.id}); - skip_packs_by_column[i] = 0; - } - } - else - { - // New column after ddl is not exist in this DMFile, fill with default value - ColumnPtr column = createColumnWithDefaultValue(cd, read_rows); - - res.insert(ColumnWithTypeAndName{std::move(column), cd.type, cd.name, cd.id}); - skip_packs_by_column[i] = 0; - } + rows_count += pack_stats[cursor].rows; } + ColumnPtr col; + readFromDiskOrSharingCache(cd, col, range.first, range.second - range.first, rows_count); + column->insertRangeFrom(*col, 0, col->size()); } - catch (DB::Exception & e) + else { - e.addMessage("(while reading from DTFile: " + this->dmfile->path() + ")"); - e.rethrow(); + throw Exception("Unknown strategy", ErrorCodes::LOGICAL_ERROR); } } - return res; + ColumnPtr result_column = std::move(column); + size_t rows_offset = 0; + for (size_t cursor = start_pack_id; cursor < start_pack_id + pack_count; cursor++) + { + column_cache->tryPutColumn(cursor, cd.id, result_column, rows_offset, pack_stats[cursor].rows); + rows_offset += pack_stats[cursor].rows; + } + // Cast column's data from DataType in disk to what we need now + return convertColumnByColumnDefineIfNeed(data_type, std::move(result_column), cd); } void DMFileReader::readFromDisk( const ColumnDefine & column_define, MutableColumnPtr & column, size_t start_pack_id, - size_t read_rows, - size_t skip_packs, - bool force_seek) + size_t read_rows) { const auto stream_name = DMFile::getFileNameBase(column_define.id); auto iter = column_streams.find(stream_name); @@ -549,23 +580,17 @@ void DMFileReader::readFromDisk( "Can not find column_stream, column_id={} stream_name={}", column_define.id, stream_name); - #endif auto & top_stream = iter->second; - bool should_seek = force_seek || shouldSeek(start_pack_id) || skip_packs > 0; auto data_type = dmfile->getColumnStat(column_define.id).type; data_type->deserializeBinaryBulkWithMultipleStreams( // *column, [&](const IDataType::SubstreamPath & substream_path) { const auto substream_name = DMFile::getFileNameBase(column_define.id, substream_path); auto & sub_stream = column_streams.at(substream_name); - - if (should_seek) - { - sub_stream->buf->seek( - sub_stream->getOffsetInFile(start_pack_id), - sub_stream->getOffsetInDecompressedBlock(start_pack_id)); - } + sub_stream->buf->seek( + sub_stream->getOffsetInFile(start_pack_id), + sub_stream->getOffsetInDecompressedBlock(start_pack_id)); return sub_stream->buf.get(); }, read_rows, @@ -575,13 +600,12 @@ void DMFileReader::readFromDisk( IDataType::updateAvgValueSizeHint(*column, top_stream->avg_size_hint); } -void DMFileReader::readColumn( +void DMFileReader::readFromDiskOrSharingCache( const ColumnDefine & column_define, ColumnPtr & column, size_t start_pack_id, size_t pack_count, - size_t read_rows, - size_t skip_packs) + size_t read_rows) { bool has_concurrent_reader = DMFileReaderPool::instance().hasConcurrentReader(*this); bool reach_sharing_column_memory_limit = shared_column_data_mem_tracker != nullptr @@ -601,13 +625,8 @@ void DMFileReader::readColumn( = enable_sharing_column ? std::make_optional(true, nullptr) : std::nullopt; auto data_type = dmfile->getColumnStat(column_define.id).type; auto col = data_type->createColumn(); - readFromDisk(column_define, col, start_pack_id, read_rows, skip_packs, last_read_from_cache[column_define.id]); + readFromDisk(column_define, col, start_pack_id, read_rows); column = std::move(col); - last_read_from_cache[column_define.id] = false; - } - else - { - last_read_from_cache[column_define.id] = true; } if (enable_sharing_column && col_data_cache != nullptr) diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index 64eb5d948f6..d9c06703fbd 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -96,22 +96,30 @@ class DMFileReader friend class tests::DMFileMetaV2Test; private: - bool shouldSeek(size_t pack_id) const; - + size_t getReadRows(); + ColumnPtr readExtraColumn( + const ColumnDefine & cd, + size_t start_pack_id, + size_t pack_count, + size_t read_rows, + const std::vector & clean_read_packs); void readFromDisk( const ColumnDefine & column_define, MutableColumnPtr & column, size_t start_pack_id, - size_t read_rows, - size_t skip_packs, - bool force_seek); - void readColumn( + size_t read_rows); + void readFromDiskOrSharingCache( const ColumnDefine & column_define, ColumnPtr & column, size_t start_pack_id, size_t pack_count, - size_t read_rows, - size_t skip_packs); + size_t read_rows); + ColumnPtr readColumn(const ColumnDefine & cd, size_t start_pack_id, size_t pack_count, size_t read_rows); + ColumnPtr cleanRead( + const ColumnDefine & cd, + size_t rows_count, + std::pair range, + const DMFile::PackStats & pack_stats); bool getCachedPacks(ColId col_id, size_t start_pack_id, size_t pack_count, size_t read_rows, ColumnPtr & col) const; void addScannedRows(UInt64 rows); @@ -142,8 +150,6 @@ class DMFileReader /// Filters DMFilePackFilter pack_filter; - std::vector skip_packs_by_column{}; - /// Caches MarkCachePtr mark_cache; ColumnCachePtr column_cache; @@ -163,7 +169,6 @@ class DMFileReader // DataSharing std::unique_ptr col_data_cache{}; - std::unordered_map last_read_from_cache{}; }; } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 731819e4228..9e46f952bab 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -3356,7 +3356,7 @@ class DeltaMergeStoreMergeDeltaBySegmentTest DMContextPtr dm_context; UInt64 ps_ver{}; - DMTestEnv::PkType pk_type; + DMTestEnv::PkType pk_type{}; }; INSTANTIATE_TEST_CASE_P( @@ -3701,6 +3701,84 @@ try } CATCH +TEST_P(DeltaMergeStoreRWTest, TestForCleanRead) +try +{ + static constexpr const char * pk_name = "_tidb_rowid"; + store = reload(DMTestEnv::getDefaultColumns(DMTestEnv::PkType::HiddenTiDBRowID, true)); + DMTestEnv::getDefaultColumns(DMTestEnv::PkType::HiddenTiDBRowID, true); + constexpr size_t pack_block_count = 2; + constexpr size_t num_rows_each_block = DEFAULT_MERGE_BLOCK_SIZE / pack_block_count; + constexpr size_t num_block = 10; + auto write_block = [&](Block block) { + switch (mode) + { + case TestMode::V1_BlockOnly: + case TestMode::V2_BlockOnly: + case TestMode::V3_BlockOnly: + store->write(*db_context, db_context->getSettingsRef(), block); + break; + default: + { + auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef()); + auto [range, file_ids] = genDMFile(*dm_context, block); + store->ingestFiles(dm_context, range, file_ids, false); + break; + } + } + }; + { + for (size_t i = 0; i < num_block; ++i) + { + Block block = DMTestEnv::prepareSimpleWriteBlock( + i * num_rows_each_block, + (i + 1) * num_rows_each_block, + false, + std::numeric_limits::max(), // max version to make sure it's the latest + pk_name, + EXTRA_HANDLE_COLUMN_ID, + EXTRA_HANDLE_COLUMN_INT_TYPE, + false, + 1, + true, + i == 3 || i == 5, // the 4th and 7th block mark as delete. + /*with_nullable_uint64*/ true); + write_block(block); + } + } + + // After compact, there are 5 pack. The [0,3,4] is clean, and the [1,2] contains deletes. + store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); + store->compact(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); + store->mergeDeltaAll(*db_context); + + // only pack 0,3,4 can do clean read + { + const auto & columns = store->getTableColumns(); + ColumnDefines real_columns; + for (const auto & col : columns) + { + if (col.name != EXTRA_HANDLE_COLUMN_NAME && col.name != TAG_COLUMN_NAME && col.name != VERSION_COLUMN_NAME) + { + real_columns.emplace_back(col); + } + } + BlockInputStreamPtr in = store->read( + *db_context, + db_context->getSettingsRef(), + real_columns, + {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + /* num_streams= */ 1, + /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, + std::vector{}, + 0, + TRACING_NAME, + /* keep_order= */ true)[0]; // set keep order to let read_mode = Normal + ASSERT_INPUTSTREAM_NROWS(in, num_rows_each_block * num_block - num_rows_each_block * 2); + } +} +CATCH } // namespace tests } // namespace DM diff --git a/release-centos7-llvm/gen_coverage.sh b/release-centos7-llvm/gen_coverage.sh index 95bb4a615ee..138d5eed3b8 100755 --- a/release-centos7-llvm/gen_coverage.sh +++ b/release-centos7-llvm/gen_coverage.sh @@ -54,7 +54,7 @@ cd - ## TODO: Add more testing binaries # Collect the prof raw data and generate cov report -llvm-profdata merge -sparse "${COVERAGE_DIR}/*.profraw" -o "${COVERAGE_DIR}/merged.profdata" +llvm-profdata merge -sparse ${COVERAGE_DIR}/*.profraw -o "${COVERAGE_DIR}/merged.profdata" export LD_LIBRARY_PATH=. llvm-cov export \