Skip to content

Commit

Permalink
Merge branch 'master' into cop
Browse files Browse the repository at this point in the history
  • Loading branch information
zanmato1984 committed Aug 9, 2019
2 parents 62ced38 + a1956bc commit b346a24
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 85 deletions.
102 changes: 65 additions & 37 deletions dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include <Debug/MockTiDB.h>

#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDecimal.h>
Expand All @@ -7,10 +9,10 @@
#include <DataTypes/DataTypeSet.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>

#include <Functions/FunctionHelpers.h>

#include <Debug/MockTiDB.h>
#include <Interpreters/Context.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/TMTContext.h>

namespace DB
{
Expand All @@ -28,8 +30,57 @@ Table::Table(const String & database_name_, const String & table_name_, TableInf

MockTiDB::MockTiDB() { databases["default"] = 0; }

void MockTiDB::dropDB(const String & database_name)
TablePtr MockTiDB::dropTableInternal(Context & context, const String & database_name, const String & table_name, bool drop_regions)
{
String qualified_name = database_name + "." + table_name;
auto it_by_name = tables_by_name.find(qualified_name);
if (it_by_name == tables_by_name.end())
return nullptr;

auto & kvstore = context.getTMTContext().getKVStore();
auto & region_table = context.getTMTContext().getRegionTable();

auto table = it_by_name->second;
if (table->isPartitionTable())
{
for (const auto & partition : table->table_info.partition.definitions)
{
tables_by_id.erase(partition.id);
if (drop_regions)
{
for (auto & e : region_table.getRegionsByTable(partition.id))
kvstore->removeRegion(e.first, &region_table);
region_table.mockDropRegionsInTable(partition.id);
}
}
}
tables_by_id.erase(table->id());

tables_by_name.erase(it_by_name);

if (drop_regions)
{
for (auto & e : region_table.getRegionsByTable(table->id()))
kvstore->removeRegion(e.first, &region_table);
region_table.mockDropRegionsInTable(table->id());
}

return table;
}

void MockTiDB::dropDB(Context & context, const String & database_name, bool drop_regions)
{
std::lock_guard lock(tables_mutex);

std::vector<String> table_names;
std::for_each(tables_by_id.begin(), tables_by_id.end(), [&](const auto & pair) {
if (pair.second->table_info.db_name == database_name)
table_names.emplace_back(pair.second->table_info.name);
});

for (const auto & table_name : table_names)
dropTableInternal(context, database_name, table_name, drop_regions);

version++;

SchemaDiff diff;
Expand All @@ -44,38 +95,22 @@ void MockTiDB::dropDB(const String & database_name)
databases.erase(database_name);
}

void MockTiDB::dropTable(const String & database_name, const String & table_name, bool is_drop_db)
void MockTiDB::dropTable(Context & context, const String & database_name, const String & table_name, bool drop_regions)
{
std::lock_guard lock(tables_mutex);

String qualified_name = database_name + "." + table_name;
auto it_by_name = tables_by_name.find(qualified_name);
if (it_by_name == tables_by_name.end())
auto table = dropTableInternal(context, database_name, table_name, drop_regions);
if (!table)
return;

const auto & table = it_by_name->second;
if (table->isPartitionTable())
{
for (const auto & partition : table->table_info.partition.definitions)
{
tables_by_id.erase(partition.id);
}
}
tables_by_id.erase(table->id());

tables_by_name.erase(it_by_name);
version++;

if (!is_drop_db)
{
version++;

SchemaDiff diff;
diff.type = SchemaActionDropTable;
diff.schema_id = table->table_info.db_id;
diff.table_id = table->id();
diff.version = version;
version_diff[version] = diff;
}
SchemaDiff diff;
diff.type = SchemaActionDropTable;
diff.schema_id = table->table_info.db_id;
diff.table_id = table->id();
diff.version = version;
version_diff[version] = diff;
}

ColumnInfo getColumnInfoFromColumn(const NameAndTypePair & column, ColumnID id)
Expand Down Expand Up @@ -361,13 +396,6 @@ TablePtr MockTiDB::getTableByName(const String & database_name, const String & t
return getTableByNameInternal(database_name, table_name);
}

void MockTiDB::traverseTables(std::function<void(TablePtr)> f)
{
std::lock_guard lock(tables_mutex);

std::for_each(tables_by_id.begin(), tables_by_id.end(), [&](const auto & pair) { f(pair.second); });
}

TablePtr MockTiDB::getTableByNameInternal(const String & database_name, const String & table_name)
{
String qualified_name = database_name + "." + table_name;
Expand Down
7 changes: 3 additions & 4 deletions dbms/src/Debug/MockTiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ class MockTiDB : public ext::singleton<MockTiDB>

TableID newPartition(const String & database_name, const String & table_name, const String & partition_name, Timestamp tso);

void dropTable(const String & database_name, const String & table_name, bool is_drop_db);
void dropTable(Context & context, const String & database_name, const String & table_name, bool drop_regions);

void dropDB(const String & database_name);
void dropDB(Context & context, const String & database_name, bool drop_regions);

void addColumnToTable(const String & database_name, const String & table_name, const NameAndTypePair & column);

Expand All @@ -86,8 +86,6 @@ class MockTiDB : public ext::singleton<MockTiDB>

TablePtr getTableByName(const String & database_name, const String & table_name);

void traverseTables(std::function<void(TablePtr)> f);

TiDB::TableInfoPtr getTableInfoByID(TableID table_id);

TiDB::DBInfoPtr getDBInfoByID(DatabaseID db_id);
Expand All @@ -101,6 +99,7 @@ class MockTiDB : public ext::singleton<MockTiDB>
Int64 getVersion() { return version; }

private:
TablePtr dropTableInternal(Context & context, const String & database_name, const String & table_name, bool drop_regions);
TablePtr getTableByNameInternal(const String & database_name, const String & table_name);

private:
Expand Down
42 changes: 6 additions & 36 deletions dbms/src/Debug/dbgFuncMockTiDBTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,11 @@ void MockTiDBTable::dbgFuncDropTiDBDB(Context & context, const ASTs & args, DBGI
if (args.size() == 3)
drop_regions = typeid_cast<const ASTIdentifier &>(*args[1]).name == "true";

std::vector<String> table_names;
MockTiDB::instance().traverseTables([&](MockTiDB::TablePtr table) {
if (table->table_info.db_name == database_name)
table_names.push_back(table->table_info.name);
});
for (auto table_name : table_names)
dbgFuncDropTiDBTableImpl(context, database_name, table_name, drop_regions, true, output);
MockTiDB::instance().dropDB(database_name);
MockTiDB::instance().dropDB(context, database_name, drop_regions);

std::stringstream ss;
ss << "dropped db #" << database_name;
output(ss.str());
}

void MockTiDBTable::dbgFuncDropTiDBTable(Context & context, const ASTs & args, DBGInvoker::Printer output)
Expand All @@ -140,12 +137,7 @@ void MockTiDBTable::dbgFuncDropTiDBTable(Context & context, const ASTs & args, D
bool drop_regions = true;
if (args.size() == 3)
drop_regions = typeid_cast<const ASTIdentifier &>(*args[1]).name == "true";
dbgFuncDropTiDBTableImpl(context, database_name, table_name, drop_regions, false, output);
}

void MockTiDBTable::dbgFuncDropTiDBTableImpl(
Context & context, String database_name, String table_name, bool drop_regions, bool is_drop_db, DBGInvoker::Printer output)
{
MockTiDB::TablePtr table = nullptr;
TableID table_id = InvalidTableID;
try
Expand All @@ -161,29 +153,7 @@ void MockTiDBTable::dbgFuncDropTiDBTableImpl(
return;
}

TMTContext & tmt = context.getTMTContext();
auto & kvstore = tmt.getKVStore();
auto & region_table = tmt.getRegionTable();

if (table->isPartitionTable() && drop_regions)
{
auto partition_ids = table->getPartitionIDs();
std::for_each(partition_ids.begin(), partition_ids.end(), [&](TableID partition_id) {
for (auto & e : region_table.getRegionsByTable(partition_id))
kvstore->removeRegion(e.first, &region_table);

region_table.mockDropRegionsInTable(partition_id);
});
}

if (drop_regions)
{
for (auto & e : region_table.getRegionsByTable(table_id))
kvstore->removeRegion(e.first, &region_table);
region_table.mockDropRegionsInTable(table_id);
}

MockTiDB::instance().dropTable(database_name, table_name, is_drop_db);
MockTiDB::instance().dropTable(context, database_name, table_name, drop_regions);

std::stringstream ss;
ss << "dropped table #" << table_id;
Expand Down
30 changes: 22 additions & 8 deletions dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,13 @@ void InterpreterSelectQuery::init(const Names & required_result_column_names)
}


void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & database_name, const String & table_name, Int64 schema_version)
void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & database_name, const String & table_name, Int64 query_schema_version)
{
String qualified_name = database_name + "." + table_name;

/// Get current schema version in schema syncer for a chance to shortcut.
auto global_schema_version = context.getTMTContext().getSchemaSyncer()->getCurrentVersion();

/// Lambda for get storage, then align schema version under the read lock.
auto get_and_lock_storage = [&](bool schema_synced) -> std::tuple<StoragePtr, TableStructureReadLockPtr, Int64, bool> {
/// Get storage in case it's dropped then re-created.
Expand All @@ -214,10 +217,15 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d

/// Check schema version.
auto storage_schema_version = merge_tree->getTableInfo().schema_version;
if (storage_schema_version > schema_version)
throw Exception("Table " + qualified_name + " schema version " + std::to_string(storage_schema_version) + " newer than query schema version " + std::to_string(schema_version), ErrorCodes::SCHEMA_VERSION_ERROR);

if ((schema_synced && storage_schema_version <= schema_version) || (!schema_synced && storage_schema_version == schema_version))
// Not allow storage schema version greater than query schema version in any case.
if (storage_schema_version > query_schema_version)
throw Exception("Table " + qualified_name + " schema version " + std::to_string(storage_schema_version) + " newer than query schema version " + std::to_string(query_schema_version), ErrorCodes::SCHEMA_VERSION_ERROR);

// If schema synced, we must be very recent so we are good as long as storage schema version is no greater than query schema version.
// If schema not synced, we are good if storage schema version is right on query schema version.
// Otherwise we are at the risk of out-of-date schema, but we still have a chance to be sure that we are good, if global schema version is greater than query schema version.
if ((schema_synced && storage_schema_version <= query_schema_version)
|| (!schema_synced && (storage_schema_version == query_schema_version || global_schema_version > query_schema_version)))
return std::make_tuple(storage_, lock, storage_schema_version, true);

return std::make_tuple(nullptr, nullptr, storage_schema_version, false);
Expand All @@ -227,12 +235,18 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d
StoragePtr storage_;
TableStructureReadLockPtr lock;
Int64 storage_schema_version;
auto log_schema_version = [&](const String & result) {
LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Table " << qualified_name << " schema " << result
<< " Schema version [storage, global, query]: "
<< "[" << storage_schema_version << ", " << global_schema_version << ", " << query_schema_version
<< "].");
};
bool ok;
{
std::tie(storage_, lock, storage_schema_version, ok) = get_and_lock_storage(false);
if (ok)
{
LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Table " << qualified_name << " schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", OK, no syncing required.");
log_schema_version("OK, no syncing required.");
storage = storage_;
table_lock = lock;
return;
Expand All @@ -241,7 +255,7 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d

/// If first try failed, sync schema and try again.
{
LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Table " << qualified_name << " schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", not OK, syncing schemas.");
log_schema_version("not OK, syncing schemas.");
auto start_time = Clock::now();
context.getTMTContext().getSchemaSyncer()->syncSchemas(context);
auto schema_sync_cost = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - start_time).count();
Expand All @@ -250,7 +264,7 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d
std::tie(storage_, lock, storage_schema_version, ok) = get_and_lock_storage(true);
if (ok)
{
LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Table " << qualified_name << " schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", OK after syncing.");
log_schema_version("OK after syncing.");
storage = storage_;
table_lock = lock;
return;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class KVStore final : private boost::noncopyable
void updateRegionTableBySnapshot(RegionTable & region_table);

private:
friend class MockTiDB;
friend struct MockTiDBTable;
void removeRegion(const RegionID region_id, RegionTable * region_table);

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Transaction/RegionTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class RegionTable : private boost::noncopyable
void flushRegion(TableID table_id, RegionID partition_id, size_t & cache_size, const bool try_persist = true);

// For debug
friend class MockTiDB;
friend struct MockTiDBTable;

void mockDropRegionsInTable(TableID table_id);
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/Transaction/SchemaSyncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ class SchemaSyncer
public:
virtual ~SchemaSyncer() = default;

/**
* Get current version of CH schema.
*/
virtual Int64 getCurrentVersion() = 0;

/**
* Synchronize all schemas between TiDB and CH.
* @param context
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/Transaction/TiDBSchemaSyncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ struct TiDBSchemaSyncer : public SchemaSyncer
cur_version = 0;
}

Int64 getCurrentVersion() override
{
std::lock_guard<std::mutex> lock(schema_mutex);
return cur_version;
}

bool syncSchemas(Context & context) override
{
std::lock_guard<std::mutex> lock(schema_mutex);
Expand Down

0 comments on commit b346a24

Please sign in to comment.