Skip to content

Commit

Permalink
Fix DAG get and lock storage
Browse files Browse the repository at this point in the history
  • Loading branch information
zanmato1984 committed Aug 9, 2019
1 parent b346a24 commit 57cd382
Showing 1 changed file with 20 additions and 13 deletions.
33 changes: 20 additions & 13 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,11 @@ void InterpreterDAG::executeExpression(Pipeline & pipeline, const ExpressionActi
}
}

void InterpreterDAG::getAndLockStorageWithSchemaVersion(TableID table_id, Int64 schema_version)
void InterpreterDAG::getAndLockStorageWithSchemaVersion(TableID table_id, Int64 query_schema_version)
{
/// Get current schema version in schema syncer for a chance to shortcut.
auto global_schema_version = context.getTMTContext().getSchemaSyncer()->getCurrentVersion();

/// Lambda for get storage, then align schema version under the read lock.
auto get_and_lock_storage = [&](bool schema_synced) -> std::tuple<TMTStoragePtr, TableStructureReadLockPtr, Int64, bool> {
/// Get storage in case it's dropped then re-created.
Expand All @@ -321,12 +324,17 @@ void InterpreterDAG::getAndLockStorageWithSchemaVersion(TableID table_id, Int64

/// Check schema version.
auto storage_schema_version = storage_->getTableInfo().schema_version;
if (storage_schema_version > schema_version)
// Not allow storage schema version greater than query schema version in any case.
if (storage_schema_version > query_schema_version)
throw Exception("Table " + std::to_string(table_id) + " schema version " + std::to_string(storage_schema_version)
+ " newer than query schema version " + std::to_string(schema_version),
+ " newer than query schema version " + std::to_string(query_schema_version),
ErrorCodes::SCHEMA_VERSION_ERROR);

if ((schema_synced && storage_schema_version <= schema_version) || (!schema_synced && storage_schema_version == schema_version))
// If schema synced, we must be very recent so we are good as long as storage schema version is no greater than query schema version.
// If schema not synced, we are good if storage schema version is right on query schema version.
// Otherwise we are at the risk of out-of-date schema, but we still have a chance to be sure that we are good, if global schema version is greater than query schema version.
if ((schema_synced && storage_schema_version <= query_schema_version)
|| (!schema_synced && (storage_schema_version == query_schema_version || global_schema_version > query_schema_version)))
return std::make_tuple(storage_, lock, storage_schema_version, true);

return std::make_tuple(nullptr, nullptr, storage_schema_version, false);
Expand All @@ -336,14 +344,17 @@ void InterpreterDAG::getAndLockStorageWithSchemaVersion(TableID table_id, Int64
TMTStoragePtr storage_;
TableStructureReadLockPtr lock;
Int64 storage_schema_version;
auto log_schema_version = [&](const String & result) {
LOG_DEBUG(log,
__PRETTY_FUNCTION__ << " Table " << table_id << " schema " << result << " Schema version [storage, global, query]: "
<< "[" << storage_schema_version << ", " << global_schema_version << ", " << query_schema_version << "].");
};
bool ok;
{
std::tie(storage_, lock, storage_schema_version, ok) = get_and_lock_storage(false);
if (ok)
{
LOG_DEBUG(log,
__PRETTY_FUNCTION__ << " Table " << table_id << " schema version: " << storage_schema_version
<< ", query schema version: " << schema_version << ", OK, no syncing required.");
log_schema_version("OK, no syncing required.");
storage = storage_;
table_lock = lock;
return;
Expand All @@ -352,9 +363,7 @@ void InterpreterDAG::getAndLockStorageWithSchemaVersion(TableID table_id, Int64

/// If first try failed, sync schema and try again.
{
LOG_DEBUG(log,
__PRETTY_FUNCTION__ << " Table " << table_id << " schema version: " << storage_schema_version
<< ", query schema version: " << schema_version << ", not OK, syncing schemas.");
log_schema_version("not OK, syncing schemas.");
auto start_time = Clock::now();
context.getTMTContext().getSchemaSyncer()->syncSchemas(context);
auto schema_sync_cost = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - start_time).count();
Expand All @@ -363,9 +372,7 @@ void InterpreterDAG::getAndLockStorageWithSchemaVersion(TableID table_id, Int64
std::tie(storage_, lock, storage_schema_version, ok) = get_and_lock_storage(true);
if (ok)
{
LOG_DEBUG(log,
__PRETTY_FUNCTION__ << " Table " << table_id << " schema version: " << storage_schema_version
<< ", query schema version: " << schema_version << ", OK after syncing.");
log_schema_version("OK after syncing.");
storage = storage_;
table_lock = lock;
return;
Expand Down

0 comments on commit 57cd382

Please sign in to comment.