diff --git a/dbms/src/Debug/dbgFuncRegion.cpp b/dbms/src/Debug/dbgFuncRegion.cpp index e8ddeda2575..1f9205bc9d7 100644 --- a/dbms/src/Debug/dbgFuncRegion.cpp +++ b/dbms/src/Debug/dbgFuncRegion.cpp @@ -272,28 +272,4 @@ size_t executeQueryAndCountRows(Context & context,const std::string & query) return count; } -std::vector> getRegionRanges( - Context& context, TableID table_id, std::vector* vec = nullptr) -{ - std::vector> handle_ranges; - auto callback = [&](Regions regions) - { - for (auto region : regions) - { - auto [start_key, end_key] = region->getRange(); - HandleID start_handle = TiKVRange::getRangeHandle(start_key, table_id); - HandleID end_handle = TiKVRange::getRangeHandle(end_key, table_id); - handle_ranges.push_back({start_handle, end_handle, region->id()}); - if (vec) - { - vec->push_back(region->getRange()); - } - } - }; - - TMTContext & tmt = context.getTMTContext(); - tmt.region_table.traverseRegionsByTable(table_id, callback); - return handle_ranges; -} - } diff --git a/dbms/src/Debug/dbgTools.cpp b/dbms/src/Debug/dbgTools.cpp index fe18dade0e5..9ef94cf6be8 100644 --- a/dbms/src/Debug/dbgTools.cpp +++ b/dbms/src/Debug/dbgTools.cpp @@ -316,8 +316,14 @@ Int64 concurrentRangeOperate(const TiDB::TableInfo & table_info, HandleID start_ { TMTContext & tmt = context.getTMTContext(); - tmt.region_table.traverseRegionsByTable(table_info.id, [&](Regions d) { - regions.insert(regions.end(), d.begin(), d.end()); + tmt.region_table.traverseRegionsByTable(table_info.id, [&](std::vector> & d) { + for (auto && [_, r] : d) + { + std::ignore = _; + if (r == nullptr) + continue; + regions.push_back(r); + } }); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 5d263ba7a27..b06fe475498 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -661,12 +661,13 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart ranges.push_back(region.range_in_table); }); + // ranged may overlap, should merge them std::sort(ranges.begin(), ranges.end()); size_t size = 0; for (size_t i = 1; i < ranges.size(); ++i) { - if (ranges[i].first == ranges[size].second) - ranges[size].second = ranges[i].second; + if (ranges[i].first <= ranges[size].second) + ranges[size].second = std::max(ranges[i].second, ranges[size].second); else ranges[++size] = ranges[i]; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 7eb43993f9d..6a056a1a188 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -229,7 +229,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( bool is_txn_engine = data.merging_params.mode == MergeTreeData::MergingParams::Txn; std::vector regions_query_info = query_info.regions_query_info; - std::vector regions_query_res; + RegionMap kvstore_region; + std::vector regions_query_res; BlockInputStreams region_block_data; String handle_col_name; size_t region_cnt = 0; @@ -277,10 +278,15 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( if (!select.no_kvstore && regions_query_info.empty()) { - tmt.region_table.traverseRegionsByTable(data.table_info.id, [&](Regions regions) { - for (const auto & region : regions) + tmt.region_table.traverseRegionsByTable(data.table_info.id, [&](std::vector> & regions) { + for (const auto & [id, region]: regions) { - regions_query_info.push_back({region->id(), region->version(), region->confVer(), region->getHandleRangeByTable(data.table_info.id)}); + kvstore_region.emplace(id, region); + if (region == nullptr) + // maybe region is removed. + regions_query_info.push_back({id, InvalidRegionVersion, InvalidRegionVersion, {0, 0}}); + else + regions_query_info.push_back({id, region->version(), region->confVer(), region->getHandleRangeByTable(data.table_info.id)}); } }); } @@ -288,7 +294,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( std::sort(regions_query_info.begin(), regions_query_info.end()); region_cnt = regions_query_info.size(); region_range_parts.assign(region_cnt, {}); - regions_query_res.assign(region_cnt, true); + regions_query_res.assign(region_cnt, RegionTable::OK); region_block_data.assign(region_cnt, nullptr); rows_in_mem.assign(region_cnt, 0); @@ -298,36 +304,42 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( // get data block from region first. - ThreadPool pool(num_streams); + ThreadPool pool(std::min(region_cnt, num_streams)); for (size_t region_begin = 0, size = std::max(region_cnt / num_streams, 1); region_begin < region_cnt; region_begin += size) { pool.schedule([&, region_begin, size] { - for (size_t region_index = region_begin, region_end = std::min(region_begin + size, region_cnt); region_index < region_end; ++region_index) + + std::vector region_ids; + + for (size_t region_index = region_begin, region_end = std::min(region_begin + size, region_cnt); region_index < region_end; ++region_index) { const RegionQueryInfo & region_query_info = regions_query_info[region_index]; - auto [region_input_stream, status, tol] = tmt.region_table.getBlockInputStreamByRegion( - tmt, data.table_info.id, region_query_info.region_id, region_query_info.version, region_query_info.conf_version, - data.table_info, data.getColumns(), column_names_to_read, + auto [region_input_stream, status, tol] = RegionTable::getBlockInputStreamByRegion( + data.table_info.id, kvstore_region[region_query_info.region_id], region_query_info.version, + region_query_info.conf_version, data.table_info, data.getColumns(), column_names_to_read, true, query_info.resolve_locks, query_info.read_tso); + + regions_query_res[region_index] = status; + if (status != RegionTable::OK) { - regions_query_res[region_index] = false; - LOG_INFO(log, "Region " << region_query_info.region_id << ", version " << region_query_info.version - << ", handle range [" << region_query_info.range_in_table.first - << ", " << region_query_info.range_in_table.second << ") , status " - << RegionTable::RegionReadStatusString(status)); - std::vector region_ids; - for (size_t region_index = 0; region_index < region_cnt; ++region_index) - { - region_ids.push_back(regions_query_info[region_index].region_id); - } - throw RegionException(region_ids); - } else { + LOG_WARNING(log, "Region " << region_query_info.region_id << ", version " + << region_query_info.version + << ", handle range [" << region_query_info.range_in_table.first + << ", " << region_query_info.range_in_table.second << ") , status " + << RegionTable::RegionReadStatusString(status)); + region_ids.push_back(region_query_info.region_id); + } + else + { region_block_data[region_index] = region_input_stream; rows_in_mem[region_index] = tol; } } + + if (!region_ids.empty()) + throw RegionException(region_ids); }); } @@ -706,32 +718,38 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( else { TMTContext & tmt = context.getTMTContext(); + std::vector region_ids; for (size_t region_index = 0; region_index < region_cnt; ++region_index) { if (select.no_kvstore) continue; - if (!regions_query_res[region_index]) + if (regions_query_res[region_index] != RegionTable::OK) continue; const RegionQueryInfo & region_query_info = regions_query_info[region_index]; - if (tmt.kvstore->getRegion(region_query_info.region_id) == nullptr) { - // Region may be removed. - // If region is in kvstore, even if its state is pending_remove, the new parts with del data are not flushed into ch. - regions_query_res[region_index] = false; - LOG_INFO(log, "Region " << region_query_info.region_id << ", version " << region_query_info.version - << ", handle range [" << region_query_info.range_in_table.first - << ", " << region_query_info.range_in_table.second << ") , status " - << RegionTable::RegionReadStatusString(RegionTable::RegionReadStatus::NOT_FOUND)); - std::vector region_ids; - for (size_t region_index = 0; region_index < region_cnt; ++region_index) + auto region = tmt.kvstore->getRegion(region_query_info.region_id); + RegionTable::RegionReadStatus status = RegionTable::OK; + if (region != kvstore_region[region_query_info.region_id]) + status = RegionTable::NOT_FOUND; + else if (region->version() != region_query_info.version) + status = RegionTable::VERSION_ERROR; + + if (status != RegionTable::OK) { - region_ids.push_back(regions_query_info[region_index].region_id); + regions_query_res[region_index] = status; + // ABA problem may cause because one region is removed and inserted back. + // if the version of region is changed, the part may has less data because of compaction. + LOG_WARNING(log, "Region " << region_query_info.region_id << ", version " << region_query_info.version + << ", handle range [" << region_query_info.range_in_table.first + << ", " << region_query_info.range_in_table.second << ") , status " + << RegionTable::RegionReadStatusString(status)); + region_ids.push_back(region_query_info.region_id); + continue; } - throw RegionException(region_ids); } size_t sum_marks = 0; @@ -761,6 +779,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( << region_range_parts[region_index].size() << " parts, " << sum_marks << " marks to read from " << sum_ranges << " ranges, read " << rows_in_mem[region_index] << " rows from memory"); } + + if (!region_ids.empty()) + throw RegionException(region_ids); } if (parts_with_ranges.empty() && !is_txn_engine) @@ -799,7 +820,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( { for (size_t region_index = 0; region_index < region_cnt; ++region_index) { - if (!regions_query_res[region_index]) + if (regions_query_res[region_index] != RegionTable::OK) continue; auto region_input_stream = region_block_data[region_index]; if (region_input_stream) @@ -814,7 +835,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( BlockInputStreams union_regions_stream; for (size_t region_index = region_begin, region_end = std::min(region_begin + size, region_cnt); region_index < region_end; ++region_index) { - if (!regions_query_res[region_index]) + if (regions_query_res[region_index] != RegionTable::OK) continue; const RegionQueryInfo & region_query_info = regions_query_info[region_index]; diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index d09570771f1..caa1e6f68ce 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -53,6 +53,8 @@ void KVStore::onSnapshot(RegionPtr new_region, Context * context) { TMTContext * tmt_ctx = context ? &(context->getTMTContext()) : nullptr; + auto table_ids = RegionTable::getRegionTableIds(new_region); + { std::lock_guard lock(task_mutex); @@ -75,12 +77,18 @@ void KVStore::onSnapshot(RegionPtr new_region, Context * context) std::lock_guard lock(mutex); regions[region_id] = new_region; } + + if (tmt_ctx) + tmt_ctx->region_table.applySnapshotRegion(new_region, table_ids); + + if (new_region->isPendingRemove()) + { + removeRegion(region_id, context); + return; + } } region_persister.persist(new_region); - - if (tmt_ctx) - tmt_ctx->region_table.applySnapshotRegion(new_region); } void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftContext & raft_ctx) diff --git a/dbms/src/Storages/Transaction/PartitionDataMover.cpp b/dbms/src/Storages/Transaction/PartitionDataMover.cpp index 48efa08446a..d65123c18ff 100644 --- a/dbms/src/Storages/Transaction/PartitionDataMover.cpp +++ b/dbms/src/Storages/Transaction/PartitionDataMover.cpp @@ -18,7 +18,7 @@ namespace ErrorCodes namespace PartitionDataMover { - +/* BlockInputStreamPtr createBlockInputStreamFromRange(const Context& context, StorageMergeTree* storage, const HandleID begin, const HandleID excluded_end) { @@ -106,9 +106,9 @@ void increaseVersionInBlock(Block & block, size_t increasement = 1) block.insert(version_col_pos, ColumnWithTypeAndName{std::move(version_new_col), version_type, MutableSupport::version_column_name}); } - +*/ } // namespace PartitionDataMover - +/* void deleteRange(const Context& context, StorageMergeTree* storage, const HandleID begin, const HandleID excluded_end) { @@ -132,7 +132,6 @@ void deleteRange(const Context& context, StorageMergeTree* storage, output.writeSuffix(); } -/* // TODO: use `new_ver = old_ver+1` to delete data is not a good way, may conflict with data in the future void moveRangeBetweenPartitions(const Context & context, StorageMergeTree * storage, UInt64 src_partition_id, UInt64 dest_partition_id, const Field & begin, const Field & excluded_end) diff --git a/dbms/src/Storages/Transaction/PartitionDataMover.h b/dbms/src/Storages/Transaction/PartitionDataMover.h index 8020798698a..8c16df8ef5f 100644 --- a/dbms/src/Storages/Transaction/PartitionDataMover.h +++ b/dbms/src/Storages/Transaction/PartitionDataMover.h @@ -9,13 +9,13 @@ namespace DB { - +/* /// Remove range from this partition. /// Note that [begin, excluded_end) is not necessarily to locate in the range of this partition (or table). void deleteRange(const Context& context, StorageMergeTree* storage, const HandleID begin, const HandleID excluded_end); -/* + /// Move data in [begin, excluded_end) from src_partition_id to dest_partition_id. /// FIXME/TODO: currently this function is not atomic and need to fix. void moveRangeBetweenPartitions(const Context & context, StorageMergeTree * storage, diff --git a/dbms/src/Storages/Transaction/PartitionStreams.cpp b/dbms/src/Storages/Transaction/PartitionStreams.cpp index 6f907d1daf6..b9ca1f60a41 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.cpp +++ b/dbms/src/Storages/Transaction/PartitionStreams.cpp @@ -2,7 +2,6 @@ #include #include -#include #include #include #include @@ -15,6 +14,17 @@ namespace DB std::tuple RegionTable::getBlockInputStreamByRegion(TMTContext & tmt, TableID table_id, const RegionID region_id, + const TiDB::TableInfo & table_info, + const ColumnsDescription & columns, + const Names & ordered_columns, + std::vector * keys) +{ + return getBlockInputStreamByRegion( + table_id, tmt.kvstore->getRegion(region_id), InvalidRegionVersion, InvalidRegionVersion, table_info, columns, ordered_columns, false, false, 0, keys); +} + +std::tuple RegionTable::getBlockInputStreamByRegion(TableID table_id, + RegionPtr region, const RegionVersion region_version, const RegionVersion conf_version, const TiDB::TableInfo & table_info, @@ -25,7 +35,6 @@ std::tuple RegionTab UInt64 start_ts, std::vector * keys) { - auto region = tmt.kvstore->getRegion(region_id); if (!region) return {nullptr, NOT_FOUND, 0}; @@ -42,7 +51,7 @@ std::tuple RegionTab if (region->isPendingRemove()) return {nullptr, PENDING_REMOVE, 0}; - if (region_version != InvalidRegionVersion && region->version() != region_version && region->confVer() != conf_version) + if (region_version != InvalidRegionVersion && (region->version() != region_version || region->confVer() != conf_version)) return {nullptr, VERSION_ERROR, 0}; { diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index ad9778feee6..93a28ff1ba5 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -625,4 +625,6 @@ void Region::reset(Region && new_region) meta.notifyAll(); } +bool Region::isPeerRemoved() const { return meta.isPeerRemoved(); } + } // namespace DB diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index ae09bc44c5c..16a634f1f14 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -169,6 +169,7 @@ class Region : public std::enable_shared_from_this bool isPendingRemove() const; void setPendingRemove(); + bool isPeerRemoved() const; size_t dataSize() const; diff --git a/dbms/src/Storages/Transaction/RegionMeta.cpp b/dbms/src/Storages/Transaction/RegionMeta.cpp index 31d2403a310..10e00c24ac2 100644 --- a/dbms/src/Storages/Transaction/RegionMeta.cpp +++ b/dbms/src/Storages/Transaction/RegionMeta.cpp @@ -72,6 +72,13 @@ metapb::Region RegionMeta::getRegion() const return region; } +pingcap::kv::RegionVerID RegionMeta::getRegionVerID() const +{ + std::lock_guard lock(mutex); + + return pingcap::kv::RegionVerID{region.id(), region.region_epoch().conf_ver(), region.region_epoch().version()}; +} + const raft_serverpb::RaftApplyState & RegionMeta::getApplyState() const { std::lock_guard lock(mutex); @@ -84,10 +91,7 @@ void RegionMeta::setRegion(const metapb::Region & region) doSetRegion(region); } -void RegionMeta::doSetRegion(const metapb::Region & region) -{ - this->region = region; -} +void RegionMeta::doSetRegion(const metapb::Region & region) { this->region = region; } void RegionMeta::setApplied(UInt64 index, UInt64 term) { @@ -101,10 +105,7 @@ void RegionMeta::doSetApplied(UInt64 index, UInt64 term) applied_term = term; } -void RegionMeta::notifyAll() -{ - cv.notify_all(); -} +void RegionMeta::notifyAll() { cv.notify_all(); } UInt64 RegionMeta::appliedIndex() const { @@ -176,17 +177,12 @@ void RegionMeta::setPendingRemove() doSetPendingRemove(); } -void RegionMeta::doSetPendingRemove() -{ - pending_remove = true; -} +void RegionMeta::doSetPendingRemove() { pending_remove = true; } void RegionMeta::waitIndex(UInt64 index) { - std::unique_lock lk(mutex); - cv.wait(lk, [this, index] { - return pending_remove || apply_state.applied_index() >= index; - }); + std::unique_lock lock(mutex); + cv.wait(lock, [this, index] { return pending_remove || apply_state.applied_index() >= index; }); } UInt64 RegionMeta::version() const @@ -226,7 +222,8 @@ void RegionMeta::doRemovePeer(UInt64 store_id) throw Exception("peer with store_id " + DB::toString(store_id) + " not found", ErrorCodes::LOGICAL_ERROR); } -void RegionMeta::execChangePeer(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, UInt64 index, UInt64 term) +void RegionMeta::execChangePeer( + const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, UInt64 index, UInt64 term) { const auto & change_peer_request = request.change_peer(); const auto & new_region = response.change_peer().region(); @@ -236,7 +233,7 @@ void RegionMeta::execChangePeer(const raft_cmdpb::AdminRequest & request, const case eraftpb::ConfChangeType::AddNode: case eraftpb::ConfChangeType::AddLearnerNode: { - std::lock_guard lk(mutex); + std::lock_guard lock(mutex); // change the peers of region, add conf_ver. doSetRegion(new_region); @@ -248,7 +245,7 @@ void RegionMeta::execChangePeer(const raft_cmdpb::AdminRequest & request, const const auto & peer = change_peer_request.peer(); auto store_id = peer.store_id(); - std::lock_guard lk(mutex); + std::lock_guard lock(mutex); doRemovePeer(store_id); @@ -262,5 +259,18 @@ void RegionMeta::execChangePeer(const raft_cmdpb::AdminRequest & request, const } } +bool RegionMeta::isPeerRemoved() const +{ + std::lock_guard lock(mutex); + + if (pending_remove) + return true; + for (auto region_peer : region.peers()) + { + if (region_peer.id() == peer.id()) + return false; + } + return true; +} } // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionMeta.h b/dbms/src/Storages/Transaction/RegionMeta.h index 7874cdfdd34..6caafa88dfa 100644 --- a/dbms/src/Storages/Transaction/RegionMeta.h +++ b/dbms/src/Storages/Transaction/RegionMeta.h @@ -37,13 +37,7 @@ class RegionMeta metapb::Peer getPeer() const; metapb::Region getRegion() const; - pingcap::kv::RegionVerID getRegionVerID() const { - return pingcap::kv::RegionVerID { - region.id(), - confVer(), - version() - }; - } + pingcap::kv::RegionVerID getRegionVerID() const; UInt64 version() const; @@ -77,11 +71,11 @@ class RegionMeta void waitIndex(UInt64 index); + bool isPeerRemoved() const; void execChangePeer(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response, UInt64 index, UInt64 term); private: - void doRemovePeer(UInt64 store_id); void doSetPendingRemove(); diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index a812edcbf48..8987f41d354 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -1,7 +1,6 @@ #include #include -#include #include #include #include @@ -20,9 +19,9 @@ extern const int UNKNOWN_TABLE; // Static methods. // ============================================================= -auto getRegionTableIds(const RegionPtr & region) +TableIDSet RegionTable::getRegionTableIds(const RegionPtr & region) { - std::unordered_set table_ids; + TableIDSet table_ids; { auto scanner = region->createCommittedScanner(InvalidTableID); while (true) @@ -182,7 +181,7 @@ void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & cac // TODO: confirm names is right Names names = columns.getNamesOfPhysical(); auto [input, status, tol] = getBlockInputStreamByRegion( - tmt, table_id, region_id, InvalidRegionVersion, InvalidRegionVersion, table_info, columns, names, false, false, 0, &keys_to_remove); + tmt, table_id, region_id, table_info, columns, names, &keys_to_remove); if (input == nullptr) return; @@ -245,6 +244,8 @@ RegionTable::RegionTable(Context & context_, const std::string & parent_path_) void RegionTable::restore(std::function region_fetcher) { + std::lock_guard lock(mutex); + Poco::File dir(parent_path + "tables/"); if (!dir.exists()) dir.createDirectories(); @@ -259,16 +260,17 @@ void RegionTable::restore(std::function region_fetcher) auto p = tables.try_emplace(table_id, parent_path + "tables/", table_id); Table & table = p.first->second; - for (auto it = table.regions.get().begin(); it != table.regions.get().end();) + auto & table_regions = table.regions.get(); + for (auto it = table_regions.begin(); it != table_regions.end();) { auto region_id = it->first; auto & region = it->second; auto region_ptr = region_fetcher(region_id); - if (!region_ptr) + if (region_ptr == nullptr) { - // It could happen that process crash after region split or region snapshot apply, - // and region has not been persisted, but region <-> partition mapping does. - it = table.regions.get().erase(it); + // It could happen that process crash after region split or apply snapshot, + // and region has not been persisted, but region <-> table mapping does. + it = table_regions.erase(it); LOG_WARNING(log, "Region " << region_id << " not found from KVStore, dropped."); continue; } @@ -278,14 +280,13 @@ void RegionTable::restore(std::function region_fetcher) region.cache_bytes = region_ptr->dataSize(); if (region.cache_bytes) region.updated = true; + region.range_in_table = region_ptr->getHandleRangeByTable(table_id); // Update region_id -> table_id { - auto it = regions.find(region_id); - if (it == regions.end()) - std::tie(it, std::ignore) = regions.try_emplace(region_id); - RegionInfo & region_info = it->second; - region_info.tables.emplace(table_id); + auto [it, ok] = regions.emplace(region_id, RegionInfo{}); + std::ignore = ok; + it->second.tables.emplace(table_id); } } @@ -314,8 +315,26 @@ void RegionTable::updateRegion(const RegionPtr & region, const TableIDSet & rela void RegionTable::applySnapshotRegion(const RegionPtr & region) { auto table_ids = getRegionTableIds(region); + return applySnapshotRegion(region, table_ids); +} + +void RegionTable::applySnapshotRegion(const RegionPtr & region, const TableIDSet & table_ids) +{ + TableIDSet table_to_persist; + size_t cache_bytes = region->dataSize(); + + std::lock_guard lock(mutex); - updateRegion(region, table_ids); + for (auto table_id : table_ids) + { + auto & internal_region = getOrInsertRegion(table_id, region, table_to_persist); + internal_region.updated = true; + internal_region.cache_bytes = cache_bytes; + internal_region.range_in_table = region->getHandleRangeByTable(table_id); + } + + for (auto table_id : table_to_persist) + tables.find(table_id)->second.persist(); } void RegionTable::splitRegion(const RegionPtr & kvstore_region, const std::vector & split_regions) @@ -439,10 +458,10 @@ void RegionTable::traverseInternalRegionsByTable(const TableID table_id, std::fu callback(region_info.second); } -void RegionTable::traverseRegionsByTable(const TableID table_id, std::function && callback) +void RegionTable::traverseRegionsByTable(const TableID table_id, std::function>&)> && callback) { auto & kvstore = context.getTMTContext().kvstore; - Regions regions; + std::vector> regions; { std::lock_guard lock(mutex); auto & table = getOrCreateTable(table_id); @@ -450,9 +469,7 @@ void RegionTable::traverseRegionsByTable(const TableID table_id, std::functiongetRegion(region_info.second.region_id); - if (region == nullptr) - continue; - regions.push_back(region); + regions.emplace_back(region_info.second.region_id, region); } } callback(regions); diff --git a/dbms/src/Storages/Transaction/RegionTable.h b/dbms/src/Storages/Transaction/RegionTable.h index a7e93ab0e67..c25726e7489 100644 --- a/dbms/src/Storages/Transaction/RegionTable.h +++ b/dbms/src/Storages/Transaction/RegionTable.h @@ -21,7 +21,9 @@ class RegionTable : private boost::noncopyable { InternalRegion() {} InternalRegion(const InternalRegion & p) : region_id(p.region_id), range_in_table(p.range_in_table) {} - InternalRegion(const RegionID region_id_, const HandleRange & range_in_table_) : region_id(region_id_), range_in_table(range_in_table_) {} + InternalRegion(const RegionID region_id_, const HandleRange & range_in_table_) + : region_id(region_id_), range_in_table(range_in_table_) + {} RegionID region_id; HandleRange range_in_table; @@ -170,6 +172,8 @@ class RegionTable : private boost::noncopyable void updateRegion(const RegionPtr & region, const TableIDSet & relative_table_ids); /// A new region arrived by apply snapshot command, this function store the region into selected partitions. void applySnapshotRegion(const RegionPtr & region); + void applySnapshotRegion(const RegionPtr & region, const TableIDSet & table_ids); + /// Manage data after region split into split_regions. /// i.e. split_regions could have assigned to another partitions, we need to move the data belong with them. void splitRegion(const RegionPtr & region, const std::vector & split_regions); @@ -184,11 +188,18 @@ class RegionTable : private boost::noncopyable void traverseInternalRegions(std::function && callback); void traverseInternalRegionsByTable(const TableID table_id, std::function && callback); - void traverseRegionsByTable(const TableID table_id, std::function && callback); + void traverseRegionsByTable(const TableID table_id, std::function>&)> && callback); static std::tuple getBlockInputStreamByRegion(TMTContext & tmt, TableID table_id, const RegionID region_id, + const TiDB::TableInfo & table_info, + const ColumnsDescription & columns, + const Names & ordered_columns, + std::vector * keys); + + static std::tuple getBlockInputStreamByRegion(TableID table_id, + RegionPtr region, const RegionVersion region_version, const RegionVersion conf_version, const TiDB::TableInfo & table_info, @@ -199,6 +210,8 @@ class RegionTable : private boost::noncopyable UInt64 start_ts, std::vector * keys = nullptr); + static TableIDSet getRegionTableIds(const RegionPtr & region); + // For debug void dumpRegionMap(RegionTable::RegionMap & res); void dropRegionsInTable(TableID table_id); diff --git a/dbms/src/Storages/Transaction/TMTContext.cpp b/dbms/src/Storages/Transaction/TMTContext.cpp index f718ad322cb..ee8a450bc84 100644 --- a/dbms/src/Storages/Transaction/TMTContext.cpp +++ b/dbms/src/Storages/Transaction/TMTContext.cpp @@ -15,9 +15,8 @@ TMTContext::TMTContext(Context & context, std::vector addrs) region_cache(std::make_shared(pd_client)), rpc_client(std::make_shared()) { - kvstore->restore([&](pingcap::kv::RegionVerID id) -> pingcap::kv::RegionClientPtr { - return this->createRegionClient(id); - }, ®ions_to_remove); + kvstore->restore( + [&](pingcap::kv::RegionVerID id) -> pingcap::kv::RegionClientPtr { return this->createRegionClient(id); }, ®ions_to_remove); region_table.restore(std::bind(&KVStore::getRegion, kvstore.get(), std::placeholders::_1)); for (RegionID id : regions_to_remove) kvstore->removeRegion(id, &context); @@ -57,9 +56,6 @@ pingcap::kv::RegionClientPtr TMTContext::createRegionClient(pingcap::kv::RegionV pingcap::kv::RegionCachePtr TMTContext::getRegionCache() const { return region_cache; } -pingcap::kv::RpcClientPtr TMTContext::getRpcClient() -{ - return rpc_client; -} +pingcap::kv::RpcClientPtr TMTContext::getRpcClient() { return rpc_client; } } // namespace DB diff --git a/dbms/src/Storages/Transaction/applySnapshot.cpp b/dbms/src/Storages/Transaction/applySnapshot.cpp index 4d018f3126e..324c0e9a30e 100644 --- a/dbms/src/Storages/Transaction/applySnapshot.cpp +++ b/dbms/src/Storages/Transaction/applySnapshot.cpp @@ -56,6 +56,11 @@ void applySnapshot(KVStorePtr kvstore, RequestReader read, Context * context) } } + { + if (region->isPeerRemoved()) + region->setPendingRemove(); + } + // context may be null in test cases. kvstore->onSnapshot(region, context);