Skip to content

Commit

Permalink
Add DatabaseInfoCache.h
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang committed Nov 23, 2023
1 parent 86e899a commit 0f1cb0e
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 70 deletions.
92 changes: 92 additions & 0 deletions dbms/src/TiDB/Schema/DatabaseInfoCache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Storages/KVStore/Types.h>
#include <TiDB/Schema/TiDB.h>

#include <shared_mutex>
#include <unordered_map>

namespace DB
{

class DatabaseInfoCache
{
public:
TiDB::DBInfoPtr getDBInfoByName(const String & database_name) const
{
std::shared_lock lock(mtx_databases);

auto it = std::find_if(databases.begin(), databases.end(), [&](const auto & pair) {
return pair.second->name == database_name;
});
if (it == databases.end())
return nullptr;
return it->second;
}

template <typename NameMapper>
TiDB::DBInfoPtr getDBInfoByMappedName(const String & mapped_database_name) const
{
std::shared_lock lock(mtx_databases);

auto it = std::find_if(databases.begin(), databases.end(), [&](const auto & pair) {
return NameMapper().mapDatabaseName(*pair.second) == mapped_database_name;
});
if (it == databases.end())
return nullptr;
return it->second;
}

void addDatabaseInfo(const TiDB::DBInfoPtr & db_info)
{
std::unique_lock lock(mtx_databases);
databases.emplace(db_info->id, db_info);
}

TiDB::DBInfoPtr getDBInfo(DatabaseID database_id) const
{
std::shared_lock shared_lock(mtx_databases);
if (auto it = databases.find(database_id); likely(it == databases.end()))
{
return it->second;
}
return nullptr;
}

bool exists(DatabaseID database_id) const
{
std::shared_lock shared_lock(mtx_databases);
return databases.contains(database_id);
}

void eraseDBInfo(DatabaseID database_id)
{
std::unique_lock shared_lock(mtx_databases);
databases.erase(database_id);
}

void clear()
{
std::unique_lock lock(mtx_databases);
databases.clear();
}

private:
mutable std::shared_mutex mtx_databases;
std::unordered_map<DB::DatabaseID, TiDB::DBInfoPtr> databases;
};
} // namespace DB
34 changes: 10 additions & 24 deletions dbms/src/TiDB/Schema/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -871,27 +871,19 @@ void SchemaBuilder<Getter, NameMapper>::applyCreateSchema(const TiDB::DBInfoPtr
interpreter.setForceRestoreData(false);
interpreter.execute();

{
std::unique_lock<std::shared_mutex> lock(shared_mutex_for_databases);
databases.emplace(db_info->id, db_info);
}
databases.addDatabaseInfo(db_info);

LOG_INFO(log, "Create database {} end, database_id={}", name_mapper.debugDatabaseName(*db_info), db_info->id);
}

template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyDropSchema(DatabaseID schema_id)
{
TiDB::DBInfoPtr db_info;
TiDB::DBInfoPtr db_info = databases.getDBInfo(schema_id);
if (unlikely(db_info == nullptr))
{
std::shared_lock<std::shared_mutex> shared_lock(shared_mutex_for_databases);
auto it = databases.find(schema_id);
if (unlikely(it == databases.end()))
{
LOG_INFO(log, "Try to drop database but not found, may has been dropped, database_id={}", schema_id);
return;
}
db_info = it->second;
LOG_INFO(log, "Try to drop database but not found, may has been dropped, database_id={}", schema_id);
return;
}

{
Expand All @@ -903,10 +895,7 @@ void SchemaBuilder<Getter, NameMapper>::applyDropSchema(DatabaseID schema_id)

applyDropSchema(name_mapper.mapDatabaseName(*db_info));

{
std::unique_lock<std::shared_mutex> lock(shared_mutex_for_databases);
databases.erase(schema_id);
}
databases.eraseDBInfo(schema_id);
}

template <typename Getter, typename NameMapper>
Expand Down Expand Up @@ -1176,13 +1165,10 @@ void SchemaBuilder<Getter, NameMapper>::syncAllSchema()
auto task = [this, db, &created_db_set, &created_db_set_mutex] {
do
{
if (databases.exists(db->id))
{
std::shared_lock<std::shared_mutex> shared_lock(shared_mutex_for_databases);
if (databases.find(db->id) != databases.end())
{
break;
}
} // release lock on `databases`
break;
}
applyCreateSchema(db);
{
std::unique_lock<std::mutex> created_db_set_lock(created_db_set_mutex);
Expand All @@ -1194,7 +1180,7 @@ void SchemaBuilder<Getter, NameMapper>::syncAllSchema()
"Database {} created during sync all schemas, database_id={}",
name_mapper.debugDatabaseName(*db),
db->id);
} while (false);
} while (false); // Ensure database existing

std::vector<TableInfoPtr> tables = getter.listTables(db->id);
for (auto & table : tables)
Expand Down
14 changes: 4 additions & 10 deletions dbms/src/TiDB/Schema/SchemaBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

#include <Interpreters/Context_fwd.h>
#include <Storages/KVStore/TMTStorages.h>
#include <TiDB/Schema/DatabaseInfoCache.h>
#include <TiDB/Schema/SchemaGetter.h>
#include <TiDB/Schema/TableIDMap.h>

namespace DB
{


template <typename Getter, typename NameMapper>
struct SchemaBuilder
{
Expand All @@ -32,9 +34,7 @@ struct SchemaBuilder

Context & context;

std::shared_mutex & shared_mutex_for_databases;

std::unordered_map<DB::DatabaseID, TiDB::DBInfoPtr> & databases;
DatabaseInfoCache & databases;

TableIDMap & table_id_map;

Expand All @@ -43,15 +43,9 @@ struct SchemaBuilder
LoggerPtr log;

public:
SchemaBuilder(
Getter & getter_,
Context & context_,
std::unordered_map<DB::DatabaseID, TiDB::DBInfoPtr> & dbs_,
TableIDMap & table_id_map_,
std::shared_mutex & shared_mutex_for_databases_)
SchemaBuilder(Getter & getter_, Context & context_, DatabaseInfoCache & dbs_, TableIDMap & table_id_map_)
: getter(getter_)
, context(context_)
, shared_mutex_for_databases(shared_mutex_for_databases_)
, databases(dbs_)
, table_id_map(table_id_map_)
, keyspace_id(getter_.getKeyspaceID())
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/TiDB/Schema/SchemaSyncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

#pragma once

#include <Interpreters/Context_fwd.h>
#include <Storages/KVStore/Types.h>
#include <common/logger_useful.h>
#include <Interpreters/Context_fwd.h>

#include <memory>
#include <vector>
Expand Down
14 changes: 11 additions & 3 deletions dbms/src/TiDB/Schema/TiDBSchemaSyncer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ Int64 TiDBSchemaSyncer<mock_getter, mock_mapper>::syncSchemaDiffs(
return -1;
}

SchemaBuilder<Getter, NameMapper> builder(getter, context, databases, table_id_map, shared_mutex_for_databases);
SchemaBuilder<Getter, NameMapper> builder(getter, context, databases, table_id_map);
builder.applyDiff(*diff);
}
return used_version;
Expand All @@ -146,7 +146,7 @@ Int64 TiDBSchemaSyncer<mock_getter, mock_mapper>::syncAllSchemas(Context & conte
{
--version;
}
SchemaBuilder<Getter, NameMapper> builder(getter, context, databases, table_id_map, shared_mutex_for_databases);
SchemaBuilder<Getter, NameMapper> builder(getter, context, databases, table_id_map);
builder.syncAllSchema();

return version;
Expand Down Expand Up @@ -176,7 +176,7 @@ std::tuple<bool, String> TiDBSchemaSyncer<mock_getter, mock_mapper>::trySyncTabl
// Try to fetch the latest table info from TiKV.
// If the table schema apply is failed, then we need to update the table-id-mapping
// and retry.
SchemaBuilder<Getter, NameMapper> builder(getter, context, databases, table_id_map, shared_mutex_for_databases);
SchemaBuilder<Getter, NameMapper> builder(getter, context, databases, table_id_map);
if (!builder.applyTable(database_id, logical_table_id, physical_table_id, force))
{
String message = fmt::format(
Expand Down Expand Up @@ -231,6 +231,14 @@ bool TiDBSchemaSyncer<mock_getter, mock_mapper>::syncTableSchema(Context & conte
return false;
}

template <bool mock_getter, bool mock_mapper>
void TiDBSchemaSyncer<mock_getter, mock_mapper>::dropAllSchema(Context & context)
{
auto getter = createSchemaGetter(keyspace_id);
SchemaBuilder<Getter, NameMapper> builder(getter, context, databases, table_id_map);
builder.dropAllSchema();
}

template class TiDBSchemaSyncer<false, false>;
template class TiDBSchemaSyncer<true, false>;
template class TiDBSchemaSyncer<true, true>;
Expand Down
39 changes: 7 additions & 32 deletions dbms/src/TiDB/Schema/TiDBSchemaSyncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Common/Stopwatch.h>
#include <Debug/MockSchemaGetter.h>
#include <Debug/MockSchemaNameMapper.h>
#include <TiDB/Schema/DatabaseInfoCache.h>
#include <TiDB/Schema/SchemaBuilder.h>
#include <TiDB/Schema/TableIDMap.h>
#include <TiDB/Schema/TiDB.h>
Expand Down Expand Up @@ -52,13 +53,10 @@ class TiDBSchemaSyncer : public SchemaSyncer
// Ensure `syncSchemas` will only executed by one thread.
std::mutex mutex_for_sync_schema;

// mutex for databases
std::shared_mutex shared_mutex_for_databases;
std::unordered_map<DB::DatabaseID, TiDB::DBInfoPtr> databases;

LoggerPtr log;

TableIDMap table_id_map;
DatabaseInfoCache databases;

Getter createSchemaGetter(KeyspaceID keyspace_id)
{
Expand Down Expand Up @@ -111,46 +109,23 @@ class TiDBSchemaSyncer : public SchemaSyncer
bool force,
const char * next_action);

void dropAllSchema(Context & context) override;

TiDB::DBInfoPtr getDBInfoByName(const String & database_name) override
{
std::shared_lock<std::shared_mutex> lock(shared_mutex_for_databases);

auto it = std::find_if(databases.begin(), databases.end(), [&](const auto & pair) {
return pair.second->name == database_name;
});
if (it == databases.end())
return nullptr;
return it->second;
return databases.getDBInfoByName(database_name);
}

TiDB::DBInfoPtr getDBInfoByMappedName(const String & mapped_database_name) override
{
std::shared_lock<std::shared_mutex> lock(shared_mutex_for_databases);

auto it = std::find_if(databases.begin(), databases.end(), [&](const auto & pair) {
return NameMapper().mapDatabaseName(*pair.second) == mapped_database_name;
});
if (it == databases.end())
return nullptr;
return it->second;
}

void dropAllSchema(Context & context) override
{
auto getter = createSchemaGetter(keyspace_id);
SchemaBuilder<Getter, NameMapper> builder(getter, context, databases, table_id_map, shared_mutex_for_databases);
builder.dropAllSchema();
return databases.getDBInfoByMappedName<NameMapper>(mapped_database_name);
}

// clear all states.
// just for testing restart
void reset() override
{
{
std::unique_lock<std::shared_mutex> lock(shared_mutex_for_databases);
databases.clear();
}

databases.clear();
table_id_map.clear();
cur_version = 0;
}
Expand Down

0 comments on commit 0f1cb0e

Please sign in to comment.