From a1956bc0e824604580d9d0b7791eff3cb74f45da Mon Sep 17 00:00:00 2001 From: ruoxi Date: Fri, 9 Aug 2019 15:06:10 +0800 Subject: [PATCH] Optimize schema sync on read and refine mock TiDB (#170) * Add get current version in schema syncer * Add global schema version check * Refine mock tidb and dbg funcs * Fix reference to shared ptr --- dbms/src/Debug/MockTiDB.cpp | 102 +++++++++++------- dbms/src/Debug/MockTiDB.h | 7 +- dbms/src/Debug/dbgFuncMockTiDBTable.cpp | 42 ++------ .../Interpreters/InterpreterSelectQuery.cpp | 30 ++++-- dbms/src/Storages/Transaction/KVStore.h | 1 + dbms/src/Storages/Transaction/RegionTable.h | 1 + dbms/src/Storages/Transaction/SchemaSyncer.h | 5 + .../Storages/Transaction/TiDBSchemaSyncer.h | 6 ++ 8 files changed, 109 insertions(+), 85 deletions(-) diff --git a/dbms/src/Debug/MockTiDB.cpp b/dbms/src/Debug/MockTiDB.cpp index 70c25cec551..c02a2cf76f2 100644 --- a/dbms/src/Debug/MockTiDB.cpp +++ b/dbms/src/Debug/MockTiDB.cpp @@ -1,3 +1,5 @@ +#include + #include #include #include @@ -7,10 +9,10 @@ #include #include #include - #include - -#include +#include +#include +#include namespace DB { @@ -28,8 +30,57 @@ Table::Table(const String & database_name_, const String & table_name_, TableInf MockTiDB::MockTiDB() { databases["default"] = 0; } -void MockTiDB::dropDB(const String & database_name) +TablePtr MockTiDB::dropTableInternal(Context & context, const String & database_name, const String & table_name, bool drop_regions) { + String qualified_name = database_name + "." + table_name; + auto it_by_name = tables_by_name.find(qualified_name); + if (it_by_name == tables_by_name.end()) + return nullptr; + + auto & kvstore = context.getTMTContext().getKVStore(); + auto & region_table = context.getTMTContext().getRegionTable(); + + auto table = it_by_name->second; + if (table->isPartitionTable()) + { + for (const auto & partition : table->table_info.partition.definitions) + { + tables_by_id.erase(partition.id); + if (drop_regions) + { + for (auto & e : region_table.getRegionsByTable(partition.id)) + kvstore->removeRegion(e.first, ®ion_table); + region_table.mockDropRegionsInTable(partition.id); + } + } + } + tables_by_id.erase(table->id()); + + tables_by_name.erase(it_by_name); + + if (drop_regions) + { + for (auto & e : region_table.getRegionsByTable(table->id())) + kvstore->removeRegion(e.first, ®ion_table); + region_table.mockDropRegionsInTable(table->id()); + } + + return table; +} + +void MockTiDB::dropDB(Context & context, const String & database_name, bool drop_regions) +{ + std::lock_guard lock(tables_mutex); + + std::vector table_names; + std::for_each(tables_by_id.begin(), tables_by_id.end(), [&](const auto & pair) { + if (pair.second->table_info.db_name == database_name) + table_names.emplace_back(pair.second->table_info.name); + }); + + for (const auto & table_name : table_names) + dropTableInternal(context, database_name, table_name, drop_regions); + version++; SchemaDiff diff; @@ -44,38 +95,22 @@ void MockTiDB::dropDB(const String & database_name) databases.erase(database_name); } -void MockTiDB::dropTable(const String & database_name, const String & table_name, bool is_drop_db) +void MockTiDB::dropTable(Context & context, const String & database_name, const String & table_name, bool drop_regions) { std::lock_guard lock(tables_mutex); - String qualified_name = database_name + "." + table_name; - auto it_by_name = tables_by_name.find(qualified_name); - if (it_by_name == tables_by_name.end()) + auto table = dropTableInternal(context, database_name, table_name, drop_regions); + if (!table) return; - const auto & table = it_by_name->second; - if (table->isPartitionTable()) - { - for (const auto & partition : table->table_info.partition.definitions) - { - tables_by_id.erase(partition.id); - } - } - tables_by_id.erase(table->id()); - - tables_by_name.erase(it_by_name); + version++; - if (!is_drop_db) - { - version++; - - SchemaDiff diff; - diff.type = SchemaActionDropTable; - diff.schema_id = table->table_info.db_id; - diff.table_id = table->id(); - diff.version = version; - version_diff[version] = diff; - } + SchemaDiff diff; + diff.type = SchemaActionDropTable; + diff.schema_id = table->table_info.db_id; + diff.table_id = table->id(); + diff.version = version; + version_diff[version] = diff; } ColumnInfo getColumnInfoFromColumn(const NameAndTypePair & column, ColumnID id) @@ -361,13 +396,6 @@ TablePtr MockTiDB::getTableByName(const String & database_name, const String & t return getTableByNameInternal(database_name, table_name); } -void MockTiDB::traverseTables(std::function f) -{ - std::lock_guard lock(tables_mutex); - - std::for_each(tables_by_id.begin(), tables_by_id.end(), [&](const auto & pair) { f(pair.second); }); -} - TablePtr MockTiDB::getTableByNameInternal(const String & database_name, const String & table_name) { String qualified_name = database_name + "." + table_name; diff --git a/dbms/src/Debug/MockTiDB.h b/dbms/src/Debug/MockTiDB.h index de2eb517111..23e6978a7b3 100644 --- a/dbms/src/Debug/MockTiDB.h +++ b/dbms/src/Debug/MockTiDB.h @@ -70,9 +70,9 @@ class MockTiDB : public ext::singleton TableID newPartition(const String & database_name, const String & table_name, const String & partition_name, Timestamp tso); - void dropTable(const String & database_name, const String & table_name, bool is_drop_db); + void dropTable(Context & context, const String & database_name, const String & table_name, bool drop_regions); - void dropDB(const String & database_name); + void dropDB(Context & context, const String & database_name, bool drop_regions); void addColumnToTable(const String & database_name, const String & table_name, const NameAndTypePair & column); @@ -86,8 +86,6 @@ class MockTiDB : public ext::singleton TablePtr getTableByName(const String & database_name, const String & table_name); - void traverseTables(std::function f); - TiDB::TableInfoPtr getTableInfoByID(TableID table_id); TiDB::DBInfoPtr getDBInfoByID(DatabaseID db_id); @@ -101,6 +99,7 @@ class MockTiDB : public ext::singleton Int64 getVersion() { return version; } private: + TablePtr dropTableInternal(Context & context, const String & database_name, const String & table_name, bool drop_regions); TablePtr getTableByNameInternal(const String & database_name, const String & table_name); private: diff --git a/dbms/src/Debug/dbgFuncMockTiDBTable.cpp b/dbms/src/Debug/dbgFuncMockTiDBTable.cpp index e787a8f6cab..ec84fbdf9dc 100644 --- a/dbms/src/Debug/dbgFuncMockTiDBTable.cpp +++ b/dbms/src/Debug/dbgFuncMockTiDBTable.cpp @@ -120,14 +120,11 @@ void MockTiDBTable::dbgFuncDropTiDBDB(Context & context, const ASTs & args, DBGI if (args.size() == 3) drop_regions = typeid_cast(*args[1]).name == "true"; - std::vector table_names; - MockTiDB::instance().traverseTables([&](MockTiDB::TablePtr table) { - if (table->table_info.db_name == database_name) - table_names.push_back(table->table_info.name); - }); - for (auto table_name : table_names) - dbgFuncDropTiDBTableImpl(context, database_name, table_name, drop_regions, true, output); - MockTiDB::instance().dropDB(database_name); + MockTiDB::instance().dropDB(context, database_name, drop_regions); + + std::stringstream ss; + ss << "dropped db #" << database_name; + output(ss.str()); } void MockTiDBTable::dbgFuncDropTiDBTable(Context & context, const ASTs & args, DBGInvoker::Printer output) @@ -140,12 +137,7 @@ void MockTiDBTable::dbgFuncDropTiDBTable(Context & context, const ASTs & args, D bool drop_regions = true; if (args.size() == 3) drop_regions = typeid_cast(*args[1]).name == "true"; - dbgFuncDropTiDBTableImpl(context, database_name, table_name, drop_regions, false, output); -} -void MockTiDBTable::dbgFuncDropTiDBTableImpl( - Context & context, String database_name, String table_name, bool drop_regions, bool is_drop_db, DBGInvoker::Printer output) -{ MockTiDB::TablePtr table = nullptr; TableID table_id = InvalidTableID; try @@ -161,29 +153,7 @@ void MockTiDBTable::dbgFuncDropTiDBTableImpl( return; } - TMTContext & tmt = context.getTMTContext(); - auto & kvstore = tmt.getKVStore(); - auto & region_table = tmt.getRegionTable(); - - if (table->isPartitionTable() && drop_regions) - { - auto partition_ids = table->getPartitionIDs(); - std::for_each(partition_ids.begin(), partition_ids.end(), [&](TableID partition_id) { - for (auto & e : region_table.getRegionsByTable(partition_id)) - kvstore->removeRegion(e.first, ®ion_table); - - region_table.mockDropRegionsInTable(partition_id); - }); - } - - if (drop_regions) - { - for (auto & e : region_table.getRegionsByTable(table_id)) - kvstore->removeRegion(e.first, ®ion_table); - region_table.mockDropRegionsInTable(table_id); - } - - MockTiDB::instance().dropTable(database_name, table_name, is_drop_db); + MockTiDB::instance().dropTable(context, database_name, table_name, drop_regions); std::stringstream ss; ss << "dropped table #" << table_id; diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 2d7093cd365..5c8949b40ed 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -193,10 +193,13 @@ void InterpreterSelectQuery::init(const Names & required_result_column_names) } -void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & database_name, const String & table_name, Int64 schema_version) +void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & database_name, const String & table_name, Int64 query_schema_version) { String qualified_name = database_name + "." + table_name; + /// Get current schema version in schema syncer for a chance to shortcut. + auto global_schema_version = context.getTMTContext().getSchemaSyncer()->getCurrentVersion(); + /// Lambda for get storage, then align schema version under the read lock. auto get_and_lock_storage = [&](bool schema_synced) -> std::tuple { /// Get storage in case it's dropped then re-created. @@ -214,10 +217,15 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d /// Check schema version. auto storage_schema_version = merge_tree->getTableInfo().schema_version; - if (storage_schema_version > schema_version) - throw Exception("Table " + qualified_name + " schema version " + std::to_string(storage_schema_version) + " newer than query schema version " + std::to_string(schema_version), ErrorCodes::SCHEMA_VERSION_ERROR); - - if ((schema_synced && storage_schema_version <= schema_version) || (!schema_synced && storage_schema_version == schema_version)) + // Not allow storage schema version greater than query schema version in any case. + if (storage_schema_version > query_schema_version) + throw Exception("Table " + qualified_name + " schema version " + std::to_string(storage_schema_version) + " newer than query schema version " + std::to_string(query_schema_version), ErrorCodes::SCHEMA_VERSION_ERROR); + + // If schema synced, we must be very recent so we are good as long as storage schema version is no greater than query schema version. + // If schema not synced, we are good if storage schema version is right on query schema version. + // Otherwise we are at the risk of out-of-date schema, but we still have a chance to be sure that we are good, if global schema version is greater than query schema version. + if ((schema_synced && storage_schema_version <= query_schema_version) + || (!schema_synced && (storage_schema_version == query_schema_version || global_schema_version > query_schema_version))) return std::make_tuple(storage_, lock, storage_schema_version, true); return std::make_tuple(nullptr, nullptr, storage_schema_version, false); @@ -227,12 +235,18 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d StoragePtr storage_; TableStructureReadLockPtr lock; Int64 storage_schema_version; + auto log_schema_version = [&](const String & result) { + LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Table " << qualified_name << " schema " << result + << " Schema version [storage, global, query]: " + << "[" << storage_schema_version << ", " << global_schema_version << ", " << query_schema_version + << "]."); + }; bool ok; { std::tie(storage_, lock, storage_schema_version, ok) = get_and_lock_storage(false); if (ok) { - LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Table " << qualified_name << " schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", OK, no syncing required."); + log_schema_version("OK, no syncing required."); storage = storage_; table_lock = lock; return; @@ -241,7 +255,7 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d /// If first try failed, sync schema and try again. { - LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Table " << qualified_name << " schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", not OK, syncing schemas."); + log_schema_version("not OK, syncing schemas."); auto start_time = Clock::now(); context.getTMTContext().getSchemaSyncer()->syncSchemas(context); auto schema_sync_cost = std::chrono::duration_cast(Clock::now() - start_time).count(); @@ -250,7 +264,7 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d std::tie(storage_, lock, storage_schema_version, ok) = get_and_lock_storage(true); if (ok) { - LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Table " << qualified_name << " schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", OK after syncing."); + log_schema_version("OK after syncing."); storage = storage_; table_lock = lock; return; diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index 17a8d67e7f7..282e06935b2 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -52,6 +52,7 @@ class KVStore final : private boost::noncopyable void updateRegionTableBySnapshot(RegionTable & region_table); private: + friend class MockTiDB; friend struct MockTiDBTable; void removeRegion(const RegionID region_id, RegionTable * region_table); diff --git a/dbms/src/Storages/Transaction/RegionTable.h b/dbms/src/Storages/Transaction/RegionTable.h index fb08f718fb6..b281a9adf0e 100644 --- a/dbms/src/Storages/Transaction/RegionTable.h +++ b/dbms/src/Storages/Transaction/RegionTable.h @@ -142,6 +142,7 @@ class RegionTable : private boost::noncopyable void flushRegion(TableID table_id, RegionID partition_id, size_t & cache_size, const bool try_persist = true); // For debug + friend class MockTiDB; friend struct MockTiDBTable; void mockDropRegionsInTable(TableID table_id); diff --git a/dbms/src/Storages/Transaction/SchemaSyncer.h b/dbms/src/Storages/Transaction/SchemaSyncer.h index 629b0ed07f8..6a6d0d2f4e6 100644 --- a/dbms/src/Storages/Transaction/SchemaSyncer.h +++ b/dbms/src/Storages/Transaction/SchemaSyncer.h @@ -15,6 +15,11 @@ class SchemaSyncer public: virtual ~SchemaSyncer() = default; + /** + * Get current version of CH schema. + */ + virtual Int64 getCurrentVersion() = 0; + /** * Synchronize all schemas between TiDB and CH. * @param context diff --git a/dbms/src/Storages/Transaction/TiDBSchemaSyncer.h b/dbms/src/Storages/Transaction/TiDBSchemaSyncer.h index 10945898f5c..134009ab53b 100644 --- a/dbms/src/Storages/Transaction/TiDBSchemaSyncer.h +++ b/dbms/src/Storages/Transaction/TiDBSchemaSyncer.h @@ -55,6 +55,12 @@ struct TiDBSchemaSyncer : public SchemaSyncer cur_version = 0; } + Int64 getCurrentVersion() override + { + std::lock_guard lock(schema_mutex); + return cur_version; + } + bool syncSchemas(Context & context) override { std::lock_guard lock(schema_mutex);