Skip to content

Commit

Permalink
Refine some comments on DAGStorageInterpreter
Browse files Browse the repository at this point in the history
Signed-off-by: JaySon-Huang <jayson.hjs@gmail.com>
  • Loading branch information
JaySon-Huang committed May 5, 2022
1 parent e10c6ed commit 35e6c18
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 22 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(force_slow_page_storage_snapshot_release)

#define APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) \
M(pause_after_learner_read) \
M(pause_with_alter_locks_acquired) \
M(hang_in_execution) \
M(pause_before_dt_background_delta_merge) \
M(pause_until_dt_background_delta_merge) \
Expand Down
65 changes: 46 additions & 19 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/FailPoint.h>
#include <Common/FmtUtils.h>
#include <Common/TiFlashException.h>
#include <Common/TiFlashMetrics.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
Expand All @@ -37,7 +38,7 @@ namespace FailPoints
{
extern const char region_exception_after_read_from_storage_some_error[];
extern const char region_exception_after_read_from_storage_all_error[];
extern const char pause_after_learner_read[];
extern const char pause_with_alter_locks_acquired[];
extern const char force_remote_read_for_batch_cop[];
extern const char pause_after_copr_streams_acquired[];
} // namespace FailPoints
Expand Down Expand Up @@ -223,6 +224,9 @@ DAGStorageInterpreter::DAGStorageInterpreter(
}
}

// Apply learner read to ensure we can get strong consistent with TiKV Region
// leaders. If the local Regions do not match the requested Regions, then build
// request to retry fetching data from other nodes.
void DAGStorageInterpreter::execute(DAGPipeline & pipeline)
{
prepare();
Expand All @@ -233,13 +237,19 @@ void DAGStorageInterpreter::execute(DAGPipeline & pipeline)
void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)
{
if (!mvcc_query_info->regions_query_info.empty())
doLocalRead(pipeline, settings.max_block_size);
buildLocalRead(pipeline, settings.max_block_size);

null_stream_if_empty = std::make_shared<NullBlockInputStream>(storage_for_logical_table->getSampleBlockForColumns(required_columns));

// Should build these vars under protect of `table_structure_lock`.
buildRemoteRequests();

// A failpoint to test pause before alter lock released
FAIL_POINT_PAUSE(FailPoints::pause_with_alter_locks_acquired);
// Release alter locks
// The DeltaTree engine ensures that once input streams are created, the caller can get a consistent result
// from those streams even if DDL operations are applied. Release the alter lock so that reading does not
// block DDL operations, keep the drop lock so that the storage not to be dropped during reading.
releaseAlterLocks();

// It is impossible to have no joined stream.
Expand Down Expand Up @@ -290,12 +300,29 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)

void DAGStorageInterpreter::prepare()
{
// About why we do learner read before acquiring structure lock on Storage(s).
// Assume that:
// 1. Read threads do learner read and wait for the Raft applied index with holding a read lock
// on "alter lock" of an IStorage X
// 2. Raft threads try to decode data for Region in the same IStorage X, and find it need to
// apply DDL operations which acquire write lock on "alter locks"
// Under this situation, all Raft threads will be stuck by the read threads, but read threads
// wait for Raft threads to push forward the applied index. Deadlocks happens!!
// So we must do learner read without structure lock on IStorage. After learner read, acquire the
// structure lock of IStorage(s) (to avoid concurrent issues between read threads and DDL
// operations) and build the requested inputstreams. Once the inputstreams build, we should release
// the alter lock to avoid blocking DDL operations.
// TODO: If we can acquire a read-only view on the IStorage structure (both `ITableDeclaration`
// and `TiDB::TableInfo`) we may get this process more simplified. (tiflash/issues/1853)

// Do learner read
const DAGContext & dag_context = *context.getDAGContext();
if (dag_context.isBatchCop() || dag_context.isMPPTask())
learner_read_snapshot = doBatchCopLearnerRead();
else
learner_read_snapshot = doCopLearnerRead();

// Acquire read lock on `alter lock` and build the requested inputstreams
storages_with_structure_lock = getAndLockStorages(settings.schema_version);
assert(storages_with_structure_lock.find(logical_table_id) != storages_with_structure_lock.end());
storage_for_logical_table = storages_with_structure_lock[logical_table_id].storage;
Expand All @@ -304,7 +331,6 @@ void DAGStorageInterpreter::prepare()

analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);

FAIL_POINT_PAUSE(FailPoints::pause_after_learner_read);
}

void DAGStorageInterpreter::executePushedDownFilter(
Expand Down Expand Up @@ -464,8 +490,9 @@ LearnerReadSnapshot DAGStorageInterpreter::doCopLearnerRead()
{
if (table_scan.isPartitionTableScan())
{
throw Exception("Cop request does not support partition table scan");
throw TiFlashException("Cop request does not support partition table scan", DB::Errors::Coprocessor::BadRequest);
}

TablesRegionInfoMap regions_for_local_read;
for (const auto physical_table_id : table_scan.getPhysicalTableIDs())
{
Expand All @@ -481,7 +508,7 @@ LearnerReadSnapshot DAGStorageInterpreter::doCopLearnerRead()
if (info_retry)
throw RegionException({info_retry->begin()->get().region_id}, status);

return doLearnerRead(logical_table_id, *mvcc_query_info, max_streams, false, context, log);
return doLearnerRead(logical_table_id, *mvcc_query_info, max_streams, /*for_batch_cop=*/false, context, log);
}

/// Will assign region_retry_from_local_region
Expand Down Expand Up @@ -517,7 +544,7 @@ LearnerReadSnapshot DAGStorageInterpreter::doBatchCopLearnerRead()
}
if (mvcc_query_info->regions_query_info.empty())
return {};
return doLearnerRead(logical_table_id, *mvcc_query_info, max_streams, true, context, log);
return doLearnerRead(logical_table_id, *mvcc_query_info, max_streams, /*for_batch_cop=*/true, context, log);
}
catch (const LockException & e)
{
Expand Down Expand Up @@ -584,18 +611,18 @@ std::unordered_map<TableID, SelectQueryInfo> DAGStorageInterpreter::generateSele
return ret;
}

void DAGStorageInterpreter::doLocalRead(DAGPipeline & pipeline, size_t max_block_size)
void DAGStorageInterpreter::buildLocalRead(DAGPipeline & pipeline, size_t max_block_size)
{
const DAGContext & dag_context = *context.getDAGContext();
size_t total_local_region_num = mvcc_query_info->regions_query_info.size();
if (total_local_region_num == 0)
return;
auto table_query_infos = generateSelectQueryInfos();
for (auto & table_query_info : table_query_infos)
const auto table_query_infos = generateSelectQueryInfos();
for (const auto & table_query_info : table_query_infos)
{
DAGPipeline current_pipeline;
TableID table_id = table_query_info.first;
SelectQueryInfo & query_info = table_query_info.second;
const TableID table_id = table_query_info.first;
const SelectQueryInfo & query_info = table_query_info.second;
size_t region_num = query_info.mvcc_query_info->regions_query_info.size();
if (region_num == 0)
continue;
Expand All @@ -613,11 +640,11 @@ void DAGStorageInterpreter::doLocalRead(DAGPipeline & pipeline, size_t max_block
{
current_pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, current_max_streams);

// After getting streams from storage, we need to validate whether regions have changed or not after learner read.
// In case the versions of regions have changed, those `streams` may contain different data other than expected.
// Like after region merge/split.
// After getting streams from storage, we need to validate whether Regions have changed or not after learner read.
// (by calling `validateQueryInfo`). In case the key ranges of Regions have changed (Region merge/split), those `streams`
// may contain different data other than expected.

// Inject failpoint to throw RegionException
// Inject failpoint to throw RegionException for testing
fiu_do_on(FailPoints::region_exception_after_read_from_storage_some_error, {
const auto & regions_info = query_info.mvcc_query_info->regions_query_info;
RegionException::UnavailableRegions region_ids;
Expand Down Expand Up @@ -781,7 +808,7 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
return {{}, {}, {}, false};
}

if (table_store->engineType() != ::TiDB::StorageEngine::TMT && table_store->engineType() != ::TiDB::StorageEngine::DT)
if (unlikely(table_store->engineType() != ::TiDB::StorageEngine::DT))
{
throw TiFlashException(
fmt::format(
Expand Down Expand Up @@ -954,6 +981,7 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageIn
return {required_columns_tmp, source_columns_tmp, need_cast_column};
}

// Build remote requests from `region_retry_from_local_region`
void DAGStorageInterpreter::buildRemoteRequests()
{
std::unordered_map<Int64, Int64> region_id_to_table_id_map;
Expand All @@ -978,6 +1006,8 @@ void DAGStorageInterpreter::buildRemoteRequests()
if (retry_regions.empty())
continue;

// Append the region into DAGContext to return them to the upper layer.
// The upper layer should refresh its cache about these regions.
for (const auto & r : retry_regions)
context.getDAGContext()->retry_regions.push_back(r.get());

Expand All @@ -993,9 +1023,6 @@ void DAGStorageInterpreter::buildRemoteRequests()

void DAGStorageInterpreter::releaseAlterLocks()
{
// The DeltaTree engine ensures that once input streams are created, the caller can get a consistent result
// from those streams even if DDL operations are applied. Release the alter lock so that reading does not
// block DDL operations, keep the drop lock so that the storage not to be dropped during reading.
for (auto storage_with_lock : storages_with_structure_lock)
{
drop_locks.emplace_back(std::get<1>(std::move(storage_with_lock.second.lock).release()));
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class DAGStorageInterpreter

void execute(DAGPipeline & pipeline);

/// Members will be transfered to DAGQueryBlockInterpreter after execute
/// Members will be transferred to DAGQueryBlockInterpreter after execute

std::unique_ptr<DAGExpressionAnalyzer> analyzer;

Expand All @@ -72,7 +72,7 @@ class DAGStorageInterpreter

LearnerReadSnapshot doBatchCopLearnerRead();

void doLocalRead(DAGPipeline & pipeline, size_t max_block_size);
void buildLocalRead(DAGPipeline & pipeline, size_t max_block_size);

std::unordered_map<TableID, StorageWithStructureLock> getAndLockStorages(Int64 query_schema_version);

Expand Down

0 comments on commit 35e6c18

Please sign in to comment.