Skip to content

Commit

Permalink
ddl: Fix exchange partition across databases (release-6.5) (#9001) (#…
Browse files Browse the repository at this point in the history
…9066)

close #7296
  • Loading branch information
ti-chi-bot authored May 22, 2024
1 parent f665dec commit 7f6cd1b
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 10 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ namespace DB
M(unblock_query_init_after_write) \
M(exception_in_merged_task_init) \
M(force_fail_in_flush_region_data) \
M(force_schema_sync_too_old_schema) \
M(exception_after_large_write_exceed) \
M(delta_tree_create_node_fail)

Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1122,9 +1122,9 @@ inline OptionTableInfoConstRef getTableInfoForCreateStatement(

void StorageDeltaMerge::alterImpl(
const AlterCommands & commands,
const String & database_name,
const String & database_name_,
const String & table_name_,
const OptionTableInfoConstRef table_info,
OptionTableInfoConstRef table_info,
const Context & context)
try
{
Expand Down Expand Up @@ -1229,7 +1229,7 @@ try
// after update `new_columns` and store's table columns, we need to update create table statement,
// so that we can restore table next time.
updateDeltaMergeTableCreateStatement(
database_name,
database_name_,
table_name_,
pk_desc,
getColumns(),
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/StorageDeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,9 @@ class StorageDeltaMerge

void alterImpl(
const AlterCommands & commands,
const String & database_name,
const String & table_name,
const DB::DM::OptionTableInfoConstRef table_info_,
const String & database_name_,
const String & table_name_,
DB::DM::OptionTableInfoConstRef table_info_,
const Context & context);

DataTypePtr getPKTypeImpl() const override;
Expand Down
42 changes: 40 additions & 2 deletions dbms/src/TiDB/Schema/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,36 @@ void SchemaBuilder<Getter, NameMapper>::applyPartitionDiff(const TiDB::DBInfoPtr
applyPartitionDiff(db_info, table_info, storage, /*drop_part_if_not_exist*/ true);
}

template <typename Getter, typename NameMapper>
TiDB::DBInfoPtr SchemaBuilder<Getter, NameMapper>::tryFindDatabaseByPartitionTable(const TiDB::DBInfoPtr & db_info, const String & part_table_name)
{
bool ok = context.getDatabase(name_mapper.mapDatabaseName(*db_info))->isTableExist(context, part_table_name);
if (ok)
{
return db_info;
}

auto local_dbs = context.getDatabases();
for (const auto & [db_name, local_db] : local_dbs)
{
if (!local_db->isTableExist(context, part_table_name))
{
continue;
}
auto db_tiflash = std::dynamic_pointer_cast<DatabaseTiFlash>(local_db);
if (!db_tiflash)
{
LOG_WARNING(log, "tryFindDatabaseByPartitionTable, find part table in another database, but can not cast to DatabaseTiFlash, db_name={} part_table_name={}", db_name, part_table_name);
break;
}
LOG_INFO(log, "tryFindDatabaseByPartitionTable, find part table in another database, EXCHANGE PARTITION may be executed, db_name={} part_table_name={}", db_name, part_table_name);
return std::make_shared<TiDB::DBInfo>(db_tiflash->getDatabaseInfo());
}

LOG_INFO(log, "tryFindDatabaseByPartitionTable, can not find part table in all database, part_table_name={}", part_table_name);
return db_info;
}

template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyPartitionDiff(const TiDB::DBInfoPtr & db_info, const TableInfoPtr & table_info, const ManageableStoragePtr & storage, bool drop_part_if_not_exist)
{
Expand Down Expand Up @@ -655,7 +685,11 @@ void SchemaBuilder<Getter, NameMapper>::applyPartitionDiff(const TiDB::DBInfoPtr
{
if (new_part_id_set.count(orig_def.id) == 0)
{
applyDropPhysicalTable(name_mapper.mapDatabaseName(*db_info), orig_def.id);
const auto part_table_name = name_mapper.mapTableNameByID(orig_def.id);
// When `tryLoadSchemaDiffs` fails, we may run into `SchemaBuilder::syncAllSchem` -> `applyPartitionDiff` without `applyExchangeTablePartition`
// The physical table maybe `EXCHANGE` to another database, try to find the partition from all database
auto part_db_info = tryFindDatabaseByPartitionTable(db_info, part_table_name);
applyDropPhysicalTable(name_mapper.mapDatabaseName(*part_db_info), orig_def.id);
}
}
}
Expand All @@ -665,7 +699,11 @@ void SchemaBuilder<Getter, NameMapper>::applyPartitionDiff(const TiDB::DBInfoPtr
if (orig_part_id_set.count(new_def.id) == 0)
{
auto part_table_info = updated_table_info.producePartitionTableInfo(new_def.id, name_mapper);
applyCreatePhysicalTable(db_info, part_table_info);
const auto part_table_name = name_mapper.mapTableName(*part_table_info);
// When `tryLoadSchemaDiffs` fails, we may run into `SchemaBuilder::syncAllSchem` -> `applyPartitionDiff` without `applyExchangeTablePartition`
// The physical table maybe `EXCHANGE` from another database, try to find the partition from all database
auto part_db_info = tryFindDatabaseByPartitionTable(db_info, part_table_name);
applyCreatePhysicalTable(part_db_info, part_table_info);
}
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/TiDB/Schema/SchemaBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ struct SchemaBuilder
void applyPartitionDiff(const TiDB::DBInfoPtr & db_info, TableID table_id);

void applyPartitionDiff(const TiDB::DBInfoPtr & db_info, const TiDB::TableInfoPtr & table_info, const ManageableStoragePtr & storage, bool drop_part_if_not_exist);
TiDB::DBInfoPtr tryFindDatabaseByPartitionTable(const TiDB::DBInfoPtr & db_info, const String & part_table_name);

void applyAlterTable(const TiDB::DBInfoPtr & db_info, TableID table_id);

Expand Down
6 changes: 5 additions & 1 deletion dbms/src/TiDB/Schema/SchemaNameMapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ struct SchemaNameMapper

virtual String mapDatabaseName(const TiDB::DBInfo & db_info) const { return DATABASE_PREFIX + std::to_string(db_info.id); }
virtual String displayDatabaseName(const TiDB::DBInfo & db_info) const { return db_info.name; }
virtual String mapTableName(const TiDB::TableInfo & table_info) const { return TABLE_PREFIX + std::to_string(table_info.id); }
virtual String mapTableName(const TiDB::TableInfo & table_info) const { return mapTableNameByID(table_info.id); }
virtual String displayTableName(const TiDB::TableInfo & table_info) const { return table_info.name; }
virtual String mapTableNameByID(const TiDB::TableID table_id) const
{
return fmt::format("{}{}", TABLE_PREFIX, table_id);
}
virtual String mapPartitionName(const TiDB::TableInfo & table_info) const { return mapTableName(table_info); }

// Only use for logging / debugging
Expand Down
12 changes: 11 additions & 1 deletion dbms/src/TiDB/Schema/TiDBSchemaSyncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ namespace ErrorCodes
{
extern const int FAIL_POINT_ERROR;
};
namespace FailPoints
{
extern const char force_schema_sync_too_old_schema[];
}

template <bool mock_getter, bool mock_mapper>
struct TiDBSchemaSyncer : public SchemaSyncer
Expand All @@ -59,7 +63,13 @@ struct TiDBSchemaSyncer : public SchemaSyncer
, log(Logger::get())
{}

bool isTooOldSchema(Int64 cur_ver, Int64 new_version) { return cur_ver == 0 || new_version - cur_ver > maxNumberOfDiffs; }
bool isTooOldSchema(Int64 cur_ver, Int64 new_version)
{
fiu_do_on(FailPoints::force_schema_sync_too_old_schema, {
return true;
});
return cur_ver == 0 || new_version - cur_ver > maxNumberOfDiffs;
}

Getter createSchemaGetter()
{
Expand Down
53 changes: 53 additions & 0 deletions tests/fullstack-test2/ddl/alter_exchange_partition.test
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,56 @@ mysql> drop table if exists test.e2;
mysql> drop table if exists test_new.e2;
mysql> drop database if exists test_new;
>> DBGInvoke __enable_schema_sync_service('true')

mysql> create table test.e(id INT NOT NULL,fname VARCHAR(30),lname VARCHAR(30)) PARTITION BY RANGE (id) ( PARTITION p0 VALUES LESS THAN (50),PARTITION p1 VALUES LESS THAN (100),PARTITION p2 VALUES LESS THAN (150), PARTITION p3 VALUES LESS THAN (MAXVALUE));
mysql> alter table test.e set tiflash replica 1;

mysql> create table test.e2(id int not null, fname varchar(30), lname varchar(30));
mysql> alter table test.e2 set tiflash replica 1;

mysql> create database test_new;
mysql> create table test_new.e2(id int not null, fname varchar(30), lname varchar(30));
mysql> alter table test_new.e2 set tiflash replica 1;

func> wait_table test e e2
func> wait_table test_new e2

mysql> insert into test.e values (1, 'a', 'b'),(108, 'a', 'b');
mysql> insert into test.e2 values (2, 'a', 'b');
mysql> insert into test_new.e2 values (3, 'a', 'b');

# disable schema sync service
>> DBGInvoke __enable_schema_sync_service('false')
>> DBGInvoke __refresh_schemas()

# case 13, exchagne partition across databases, syncAllSchema without applying the diff
>> DBGInvoke __enable_fail_point(force_schema_sync_too_old_schema)
mysql> set @@tidb_enable_exchange_partition=1; alter table test.e exchange partition p0 with table test_new.e2
>> DBGInvoke __enable_fail_point(exception_after_step_1_in_exchange_partition)
>> DBGInvoke __refresh_schemas()
mysql> alter table test.e add column c1 int;
>> DBGInvoke __refresh_schemas()
mysql> set session tidb_isolation_read_engines='tiflash'; select * from test.e order by id;
+-----+-------+-------+------+
| id | fname | lname | c1 |
+-----+-------+-------+------+
| 3 | a | b | NULL |
| 108 | a | b | NULL |
+-----+-------+-------+------+
mysql> set session tidb_isolation_read_engines='tiflash'; select * from test_new.e2;
+----+-------+-------+
| id | fname | lname |
+----+-------+-------+
| 1 | a | b |
+----+-------+-------+
mysql> alter table test.e drop column c1;
>> DBGInvoke __refresh_schemas()

>> DBGInvoke __disable_fail_point(force_schema_sync_too_old_schema)

# cleanup
mysql> drop table if exists test.e;
mysql> drop table if exists test.e2;
mysql> drop table if exists test_new.e2;
mysql> drop database if exists test_new;
>> DBGInvoke __enable_schema_sync_service('true')

0 comments on commit 7f6cd1b

Please sign in to comment.