diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 6da54e74e69..8e8b6117def 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -83,7 +83,7 @@ std::unordered_map> FailPointHelper::f M(force_slow_page_storage_snapshot_release) #define APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) \ - M(pause_after_learner_read) \ + M(pause_with_alter_locks_acquired) \ M(hang_in_execution) \ M(pause_before_dt_background_delta_merge) \ M(pause_until_dt_background_delta_merge) \ diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index f514293e7d6..ebf9e9eaed3 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -37,7 +38,7 @@ namespace FailPoints { extern const char region_exception_after_read_from_storage_some_error[]; extern const char region_exception_after_read_from_storage_all_error[]; -extern const char pause_after_learner_read[]; +extern const char pause_with_alter_locks_acquired[]; extern const char force_remote_read_for_batch_cop[]; extern const char pause_after_copr_streams_acquired[]; } // namespace FailPoints @@ -223,6 +224,9 @@ DAGStorageInterpreter::DAGStorageInterpreter( } } +// Apply learner read to ensure we can get strong consistent with TiKV Region +// leaders. If the local Regions do not match the requested Regions, then build +// request to retry fetching data from other nodes. void DAGStorageInterpreter::execute(DAGPipeline & pipeline) { prepare(); @@ -233,13 +237,19 @@ void DAGStorageInterpreter::execute(DAGPipeline & pipeline) void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline) { if (!mvcc_query_info->regions_query_info.empty()) - doLocalRead(pipeline, settings.max_block_size); + buildLocalRead(pipeline, settings.max_block_size); null_stream_if_empty = std::make_shared(storage_for_logical_table->getSampleBlockForColumns(required_columns)); // Should build these vars under protect of `table_structure_lock`. buildRemoteRequests(); + // A failpoint to test pause before alter lock released + FAIL_POINT_PAUSE(FailPoints::pause_with_alter_locks_acquired); + // Release alter locks + // The DeltaTree engine ensures that once input streams are created, the caller can get a consistent result + // from those streams even if DDL operations are applied. Release the alter lock so that reading does not + // block DDL operations, keep the drop lock so that the storage not to be dropped during reading. releaseAlterLocks(); // It is impossible to have no joined stream. @@ -290,12 +300,29 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline) void DAGStorageInterpreter::prepare() { + // About why we do learner read before acquiring structure lock on Storage(s). + // Assume that: + // 1. Read threads do learner read and wait for the Raft applied index with holding a read lock + // on "alter lock" of an IStorage X + // 2. Raft threads try to decode data for Region in the same IStorage X, and find it need to + // apply DDL operations which acquire write lock on "alter locks" + // Under this situation, all Raft threads will be stuck by the read threads, but read threads + // wait for Raft threads to push forward the applied index. Deadlocks happens!! + // So we must do learner read without structure lock on IStorage. After learner read, acquire the + // structure lock of IStorage(s) (to avoid concurrent issues between read threads and DDL + // operations) and build the requested inputstreams. Once the inputstreams build, we should release + // the alter lock to avoid blocking DDL operations. + // TODO: If we can acquire a read-only view on the IStorage structure (both `ITableDeclaration` + // and `TiDB::TableInfo`) we may get this process more simplified. (tiflash/issues/1853) + + // Do learner read const DAGContext & dag_context = *context.getDAGContext(); if (dag_context.isBatchCop() || dag_context.isMPPTask()) learner_read_snapshot = doBatchCopLearnerRead(); else learner_read_snapshot = doCopLearnerRead(); + // Acquire read lock on `alter lock` and build the requested inputstreams storages_with_structure_lock = getAndLockStorages(settings.schema_version); assert(storages_with_structure_lock.find(logical_table_id) != storages_with_structure_lock.end()); storage_for_logical_table = storages_with_structure_lock[logical_table_id].storage; @@ -303,8 +330,6 @@ void DAGStorageInterpreter::prepare() std::tie(required_columns, source_columns, is_need_add_cast_column) = getColumnsForTableScan(settings.max_columns_to_read); analyzer = std::make_unique(std::move(source_columns), context); - - FAIL_POINT_PAUSE(FailPoints::pause_after_learner_read); } void DAGStorageInterpreter::executePushedDownFilter( @@ -464,8 +489,9 @@ LearnerReadSnapshot DAGStorageInterpreter::doCopLearnerRead() { if (table_scan.isPartitionTableScan()) { - throw Exception("Cop request does not support partition table scan"); + throw TiFlashException("Cop request does not support partition table scan", DB::Errors::Coprocessor::BadRequest); } + TablesRegionInfoMap regions_for_local_read; for (const auto physical_table_id : table_scan.getPhysicalTableIDs()) { @@ -481,7 +507,7 @@ LearnerReadSnapshot DAGStorageInterpreter::doCopLearnerRead() if (info_retry) throw RegionException({info_retry->begin()->get().region_id}, status); - return doLearnerRead(logical_table_id, *mvcc_query_info, max_streams, false, context, log); + return doLearnerRead(logical_table_id, *mvcc_query_info, max_streams, /*for_batch_cop=*/false, context, log); } /// Will assign region_retry_from_local_region @@ -517,7 +543,7 @@ LearnerReadSnapshot DAGStorageInterpreter::doBatchCopLearnerRead() } if (mvcc_query_info->regions_query_info.empty()) return {}; - return doLearnerRead(logical_table_id, *mvcc_query_info, max_streams, true, context, log); + return doLearnerRead(logical_table_id, *mvcc_query_info, max_streams, /*for_batch_cop=*/true, context, log); } catch (const LockException & e) { @@ -584,18 +610,18 @@ std::unordered_map DAGStorageInterpreter::generateSele return ret; } -void DAGStorageInterpreter::doLocalRead(DAGPipeline & pipeline, size_t max_block_size) +void DAGStorageInterpreter::buildLocalRead(DAGPipeline & pipeline, size_t max_block_size) { const DAGContext & dag_context = *context.getDAGContext(); size_t total_local_region_num = mvcc_query_info->regions_query_info.size(); if (total_local_region_num == 0) return; - auto table_query_infos = generateSelectQueryInfos(); - for (auto & table_query_info : table_query_infos) + const auto table_query_infos = generateSelectQueryInfos(); + for (const auto & table_query_info : table_query_infos) { DAGPipeline current_pipeline; - TableID table_id = table_query_info.first; - SelectQueryInfo & query_info = table_query_info.second; + const TableID table_id = table_query_info.first; + const SelectQueryInfo & query_info = table_query_info.second; size_t region_num = query_info.mvcc_query_info->regions_query_info.size(); if (region_num == 0) continue; @@ -613,11 +639,11 @@ void DAGStorageInterpreter::doLocalRead(DAGPipeline & pipeline, size_t max_block { current_pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, current_max_streams); - // After getting streams from storage, we need to validate whether regions have changed or not after learner read. - // In case the versions of regions have changed, those `streams` may contain different data other than expected. - // Like after region merge/split. + // After getting streams from storage, we need to validate whether Regions have changed or not after learner read. + // (by calling `validateQueryInfo`). In case the key ranges of Regions have changed (Region merge/split), those `streams` + // may contain different data other than expected. - // Inject failpoint to throw RegionException + // Inject failpoint to throw RegionException for testing fiu_do_on(FailPoints::region_exception_after_read_from_storage_some_error, { const auto & regions_info = query_info.mvcc_query_info->regions_query_info; RegionException::UnavailableRegions region_ids; @@ -781,7 +807,7 @@ std::unordered_map DAG return {{}, {}, {}, false}; } - if (table_store->engineType() != ::TiDB::StorageEngine::TMT && table_store->engineType() != ::TiDB::StorageEngine::DT) + if (unlikely(table_store->engineType() != ::TiDB::StorageEngine::DT)) { throw TiFlashException( fmt::format( @@ -954,6 +980,7 @@ std::tuple> DAGStorageIn return {required_columns_tmp, source_columns_tmp, need_cast_column}; } +// Build remote requests from `region_retry_from_local_region` void DAGStorageInterpreter::buildRemoteRequests() { std::unordered_map region_id_to_table_id_map; @@ -978,6 +1005,8 @@ void DAGStorageInterpreter::buildRemoteRequests() if (retry_regions.empty()) continue; + // Append the region into DAGContext to return them to the upper layer. + // The upper layer should refresh its cache about these regions. for (const auto & r : retry_regions) context.getDAGContext()->retry_regions.push_back(r.get()); @@ -993,9 +1022,6 @@ void DAGStorageInterpreter::buildRemoteRequests() void DAGStorageInterpreter::releaseAlterLocks() { - // The DeltaTree engine ensures that once input streams are created, the caller can get a consistent result - // from those streams even if DDL operations are applied. Release the alter lock so that reading does not - // block DDL operations, keep the drop lock so that the storage not to be dropped during reading. for (auto storage_with_lock : storages_with_structure_lock) { drop_locks.emplace_back(std::get<1>(std::move(storage_with_lock.second.lock).release())); diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h index a1d88083468..64e90f62d25 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h @@ -58,7 +58,7 @@ class DAGStorageInterpreter void execute(DAGPipeline & pipeline); - /// Members will be transfered to DAGQueryBlockInterpreter after execute + /// Members will be transferred to DAGQueryBlockInterpreter after execute std::unique_ptr analyzer; @@ -72,7 +72,7 @@ class DAGStorageInterpreter LearnerReadSnapshot doBatchCopLearnerRead(); - void doLocalRead(DAGPipeline & pipeline, size_t max_block_size); + void buildLocalRead(DAGPipeline & pipeline, size_t max_block_size); std::unordered_map getAndLockStorages(Int64 query_schema_version);