diff --git a/dbms/src/TiDB/Schema/SchemaSyncService.cpp b/dbms/src/TiDB/Schema/SchemaSyncService.cpp index 3071a9b162a..3a85b66b45d 100644 --- a/dbms/src/TiDB/Schema/SchemaSyncService.cpp +++ b/dbms/src/TiDB/Schema/SchemaSyncService.cpp @@ -42,15 +42,26 @@ SchemaSyncService::SchemaSyncService(DB::Context & context_) , log(Logger::get()) { // Add task for adding and removing keyspace sync schema tasks. - handle = background_pool.addTask( - [&, this] { - addKeyspaceGCTasks(); - removeKeyspaceGCTasks(); - - return false; - }, - false, - context.getSettingsRef().ddl_sync_interval_seconds * 1000); + auto interval_ms = context.getSettingsRef().ddl_sync_interval_seconds * 1000; + if (interval_ms == 0) + { + LOG_WARNING( + log, + "The background task of SchemaSyncService is disabled, please check the ddl_sync_interval_seconds " + "settings"); + } + else + { + handle = background_pool.addTask( + [&, this] { + addKeyspaceGCTasks(); + removeKeyspaceGCTasks(); + + return false; + }, + false, + interval_ms); + } } void SchemaSyncService::addKeyspaceGCTasks() diff --git a/dbms/src/TiDB/Schema/TiDBSchemaManager.h b/dbms/src/TiDB/Schema/TiDBSchemaManager.h index 8e9d4289936..6d4bac44e7a 100644 --- a/dbms/src/TiDB/Schema/TiDBSchemaManager.h +++ b/dbms/src/TiDB/Schema/TiDBSchemaManager.h @@ -29,31 +29,6 @@ class TiDBSchemaSyncerManager , log(Logger::get("TiDBSchemaSyncerManager")) {} - SchemaSyncerPtr createSchemaSyncer(KeyspaceID keyspace_id) - { - if (!mock_getter and !mock_mapper) - { - auto schema_syncer = std::static_pointer_cast( - std::make_shared>(cluster, keyspace_id)); - schema_syncers[keyspace_id] = schema_syncer; - return schema_syncer; - } - else if (mock_getter and mock_mapper) - { - // for mock test - auto schema_syncer = std::static_pointer_cast( - std::make_shared>(cluster, keyspace_id)); - schema_syncers[keyspace_id] = schema_syncer; - return schema_syncer; - } - - // for unit test - auto schema_syncer = std::static_pointer_cast( - std::make_shared>(cluster, keyspace_id)); - schema_syncers[keyspace_id] = schema_syncer; - return schema_syncer; - } - bool syncSchemas(Context & context, KeyspaceID keyspace_id) { auto schema_syncer = getOrCreateSchemaSyncer(keyspace_id); @@ -68,8 +43,8 @@ class TiDBSchemaSyncerManager void reset(KeyspaceID keyspace_id) { - std::shared_lock read_lock(schema_syncers_mutex); - auto schema_syncer = getSchemaSyncer(keyspace_id); + std::shared_lock read_lock(schema_syncers_mutex); + auto schema_syncer = getSchemaSyncer(keyspace_id, read_lock); if (schema_syncer == nullptr) { LOG_ERROR(log, "SchemaSyncer not found, keyspace={}", keyspace_id); @@ -80,8 +55,8 @@ class TiDBSchemaSyncerManager TiDB::DBInfoPtr getDBInfoByName(KeyspaceID keyspace_id, const String & database_name) { - std::shared_lock read_lock(schema_syncers_mutex); - auto schema_syncer = getSchemaSyncer(keyspace_id); + std::shared_lock read_lock(schema_syncers_mutex); + auto schema_syncer = getSchemaSyncer(keyspace_id, read_lock); if (schema_syncer == nullptr) { LOG_ERROR(log, "SchemaSyncer not found, keyspace={}", keyspace_id); @@ -92,8 +67,8 @@ class TiDBSchemaSyncerManager bool removeSchemaSyncer(KeyspaceID keyspace_id) { - std::unique_lock lock(schema_syncers_mutex); - auto schema_syncer = getSchemaSyncer(keyspace_id); + std::unique_lock lock(schema_syncers_mutex); + auto schema_syncer = getSchemaSyncer(keyspace_id, lock); if (schema_syncer == nullptr) { LOG_ERROR(log, "SchemaSyncer not found, keyspace={}", keyspace_id); @@ -105,8 +80,8 @@ class TiDBSchemaSyncerManager void removeTableID(KeyspaceID keyspace_id, TableID table_id) { - std::shared_lock read_lock(schema_syncers_mutex); - auto schema_syncer = getSchemaSyncer(keyspace_id); + std::shared_lock read_lock(schema_syncers_mutex); + auto schema_syncer = getSchemaSyncer(keyspace_id, read_lock); if (schema_syncer == nullptr) { LOG_ERROR(log, "SchemaSyncer not found, keyspace={}", keyspace_id); @@ -126,30 +101,59 @@ class TiDBSchemaSyncerManager std::unordered_map schema_syncers; - /// the function is not thread safe, should be called with a lock - SchemaSyncerPtr getSchemaSyncer(KeyspaceID keyspace_id) +private: + /// Try to get the SchemaSyncer for the `keyspace_id`. Returns nullptr + /// if there is not exist. + /// Note: the function is not thread safe, should be called with a lock + template + SchemaSyncerPtr getSchemaSyncer(KeyspaceID keyspace_id, Lock & /*lock*/) { auto syncer = schema_syncers.find(keyspace_id); return syncer == schema_syncers.end() ? nullptr : syncer->second; } + /// Try to get the SchemaSyncer for the `keyspace_id`. Create a SchemaSyncer + /// if there is not exist. SchemaSyncerPtr getOrCreateSchemaSyncer(KeyspaceID keyspace_id) { - std::shared_lock read_lock(schema_syncers_mutex); - auto syncer = schema_syncers.find(keyspace_id); - if (syncer == schema_syncers.end()) { - read_lock.unlock(); - std::unique_lock write_lock(schema_syncers_mutex); - - syncer = schema_syncers.find(keyspace_id); - if (syncer == schema_syncers.end()) + std::shared_lock read_lock(schema_syncers_mutex); + if (auto iter = schema_syncers.find(keyspace_id); iter != schema_syncers.end()) { - return createSchemaSyncer(keyspace_id); + return iter->second; } - return syncer->second; } - return syncer->second; + + // release the read_lock and acquire a write_lock + std::unique_lock write_lock(schema_syncers_mutex); + // check again whether other thread has created for the keyspace_id + // after `write_lock` acquired + if (auto iter = schema_syncers.find(keyspace_id); iter != schema_syncers.end()) + { + return iter->second; + } + auto syncer = createSchemaSyncer(keyspace_id); + schema_syncers[keyspace_id] = syncer; // register to the syncers + return syncer; + } + + SchemaSyncerPtr createSchemaSyncer(KeyspaceID keyspace_id) + { + if (!mock_getter && !mock_mapper) + { + return std::static_pointer_cast( + std::make_shared>(cluster, keyspace_id)); + } + else if (mock_getter && mock_mapper) + { + // for mock test + return std::static_pointer_cast( + std::make_shared>(cluster, keyspace_id)); + } + + // for unit test + return std::static_pointer_cast( + std::make_shared>(cluster, keyspace_id)); } }; } // namespace DB diff --git a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp index ba24eab316f..5771ad6c778 100644 --- a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp +++ b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp @@ -72,6 +72,14 @@ class SchemaSyncTest : public ::testing::Test void SetUp() override { + // unit test. + // Get DBInfo/TableInfo from MockTiDB, but create table with names `t_${table_id}` + auto cluster = std::make_shared(); + schema_sync_manager = std::make_unique( + cluster, + /*mock_getter*/ true, + /*mock_mapper*/ false); + // disable schema sync timer global_ctx.getSchemaSyncService().reset(); recreateMetadataPath(); @@ -97,11 +105,9 @@ class SchemaSyncTest : public ::testing::Test // Sync schema info from TiDB/MockTiDB to TiFlash void refreshSchema() { - auto & flash_ctx = global_ctx.getTMTContext(); - auto schema_syncer = flash_ctx.getSchemaSyncerManager(); try { - schema_syncer->syncSchemas(global_ctx, NullspaceID); + schema_sync_manager->syncSchemas(global_ctx, NullspaceID); } catch (Exception & e) { @@ -118,11 +124,9 @@ class SchemaSyncTest : public ::testing::Test void refreshTableSchema(TableID table_id) { - auto & flash_ctx = global_ctx.getTMTContext(); - auto schema_syncer = flash_ctx.getSchemaSyncerManager(); try { - schema_syncer->syncTableSchema(global_ctx, NullspaceID, table_id); + schema_sync_manager->syncTableSchema(global_ctx, NullspaceID, table_id); } catch (Exception & e) { @@ -138,11 +142,7 @@ class SchemaSyncTest : public ::testing::Test } // Reset the schema syncer to mock TiFlash shutdown - void resetSchemas() - { - auto & flash_ctx = global_ctx.getTMTContext(); - flash_ctx.getSchemaSyncerManager()->reset(NullspaceID); - } + void resetSchemas() { schema_sync_manager->reset(NullspaceID); } // Get the TiFlash synced table ManageableStoragePtr mustGetSyncedTable(TableID table_id) @@ -207,6 +207,8 @@ class SchemaSyncTest : public ::testing::Test protected: Context & global_ctx; + + std::unique_ptr schema_sync_manager; }; TEST_F(SchemaSyncTest, SchemaDiff) @@ -300,7 +302,12 @@ try refreshTableSchema(table_id); } - auto sync_service = std::make_shared(global_ctx); + // Create a temporary context with ddl sync task disabled + auto ctx = DB::tests::TiFlashTestEnv::getContext(); + ctx->getSettingsRef().ddl_sync_interval_seconds = 0; + auto sync_service = std::make_shared(*ctx); + sync_service->shutdown(); // shutdown the background tasks + // run gc with safepoint == 0, will be skip ASSERT_FALSE(sync_service->gc(0, NullspaceID)); ASSERT_TRUE(sync_service->gc(10000000, NullspaceID)); @@ -312,8 +319,6 @@ try ASSERT_TRUE(sync_service->gc(20000000, 1024)); // run gc with the same safepoint ASSERT_FALSE(sync_service->gc(20000000, 1024)); - - sync_service->shutdown(); } CATCH @@ -357,7 +362,12 @@ try std::vector{1001, 1002, 1003}); SCOPE_EXIT({ FailPointHelper::disableFailPoint(FailPoints::force_set_num_regions_for_table); }); - auto sync_service = std::make_shared(global_ctx); + // Create a temporary context with ddl sync task disabled + auto ctx = DB::tests::TiFlashTestEnv::getContext(); + ctx->getSettingsRef().ddl_sync_interval_seconds = 0; + auto sync_service = std::make_shared(*ctx); + sync_service->shutdown(); // shutdown the background tasks + { // ensure gc_safe_point cache is empty auto last_gc_safe_point = lastGcSafePoint(sync_service, NullspaceID); @@ -381,8 +391,6 @@ try ++num_remain_tables; } ASSERT_EQ(num_remain_tables, 1); - - sync_service->shutdown(); } CATCH