Skip to content

Commit

Permalink
ddl: Fix potential data lost of alter_partition_by (#8337)
Browse files Browse the repository at this point in the history
close #8206
  • Loading branch information
JaySon-Huang authored Nov 10, 2023
1 parent 9ad4fd4 commit 27de3d3
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 116 deletions.
145 changes: 84 additions & 61 deletions dbms/src/TiDB/Schema/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,26 @@ void SchemaBuilder<Getter, NameMapper>::applyCreateTable(DatabaseID database_id,
table_id_map.emplaceTableID(table_id, database_id);
LOG_DEBUG(log, "register table to table_id_map, database_id={} table_id={}", database_id, table_id);

if (table_info->isLogicalPartitionTable())
// non partition table, done
if (!table_info->isLogicalPartitionTable())
{
// If table is partition table, we will create the logical table here.
// Because we get the table_info, so we can ensure new_db_info will not be nullptr.
auto new_db_info = getter.getDatabase(database_id);
applyCreatePhysicalTable(new_db_info, table_info);
return;
}

for (const auto & part_def : table_info->partition.definitions)
{
LOG_DEBUG(
log,
"register table to table_id_map for partition table, logical_table_id={} physical_table_id={}",
table_id,
part_def.id);
table_id_map.emplacePartitionTableID(part_def.id, table_id);
}
// If table is partition table, we will create the logical table here.
// Because we get the table_info, so we can ensure new_db_info will not be nullptr.
auto new_db_info = getter.getDatabase(database_id);
applyCreateStorageInstance(new_db_info, table_info);

// Register the partition_id -> logical_table_id mapping
for (const auto & part_def : table_info->partition.definitions)
{
LOG_DEBUG(
log,
"register table to table_id_map for partition table, logical_table_id={} physical_table_id={}",
table_id,
part_def.id);
table_id_map.emplacePartitionTableID(part_def.id, table_id);
}
}

Expand Down Expand Up @@ -252,9 +256,13 @@ void SchemaBuilder<Getter, NameMapper>::applyDiff(const SchemaDiff & diff)
}
else
{
/// The new non-partitioned table will have a new id
applyDropTable(diff.schema_id, diff.old_table_id);
// Create the new table.
// If the new table is a partition table, this will also overwrite
// the partition id mapping to the new logical table
applyCreateTable(diff.schema_id, diff.table_id);
// Drop the old table. if the previous partitions of the old table are
// not mapping to the old logical table now, they will not be removed.
applyDropTable(diff.schema_id, diff.old_table_id);
}
break;
}
Expand Down Expand Up @@ -410,70 +418,70 @@ void SchemaBuilder<Getter, NameMapper>::applyPartitionDiff(
const TableInfoPtr & table_info,
const ManageableStoragePtr & storage)
{
const auto & orig_table_info = storage->getTableInfo();
if (!orig_table_info.isLogicalPartitionTable())
const auto & local_table_info = storage->getTableInfo();
// ALTER TABLE t PARTITION BY ... may turn a non-partition table into partition table
// with some partition ids in `partition.adding_definitions`/`partition.definitions`
// and `partition.dropping_definitions`. We need to create those partitions.
if (!local_table_info.isLogicalPartitionTable())
{
LOG_ERROR(
LOG_INFO(
log,
"old table in TiFlash not partition table {} with database_id={}, table_id={}",
name_mapper.debugCanonicalName(*db_info, orig_table_info),
"Altering non-partition table to be a partition table {} with database_id={}, table_id={}",
name_mapper.debugCanonicalName(*db_info, local_table_info),
db_info->id,
orig_table_info.id);
return;
local_table_info.id);
}

const auto & orig_defs = orig_table_info.partition.definitions;
const auto & local_defs = local_table_info.partition.definitions;
const auto & new_defs = table_info->partition.definitions;

std::unordered_set<TableID> orig_part_id_set, new_part_id_set;
std::vector<String> orig_part_ids, new_part_ids;
std::for_each(orig_defs.begin(), orig_defs.end(), [&orig_part_id_set, &orig_part_ids](const auto & def) {
orig_part_id_set.emplace(def.id);
orig_part_ids.emplace_back(std::to_string(def.id));
std::unordered_set<TableID> local_part_id_set, new_part_id_set;
std::for_each(local_defs.begin(), local_defs.end(), [&local_part_id_set](const auto & def) {
local_part_id_set.emplace(def.id);
});
std::for_each(new_defs.begin(), new_defs.end(), [&new_part_id_set, &new_part_ids](const auto & def) {
std::for_each(new_defs.begin(), new_defs.end(), [&new_part_id_set](const auto & def) {
new_part_id_set.emplace(def.id);
new_part_ids.emplace_back(std::to_string(def.id));
});

auto orig_part_ids_str = boost::algorithm::join(orig_part_ids, ", ");
auto new_part_ids_str = boost::algorithm::join(new_part_ids, ", ");

LOG_INFO(
log,
"Applying partition changes {} with database_id={}, table_id={}, old: {}, new: {}",
name_mapper.debugCanonicalName(*db_info, *table_info),
db_info->id,
table_info->id,
orig_part_ids_str,
new_part_ids_str);
local_part_id_set,
new_part_id_set);

if (orig_part_id_set == new_part_id_set)
if (local_part_id_set == new_part_id_set)
{
LOG_INFO(
log,
"No partition changes {} with database_id={}, table_id={}",
"No partition changes, paritions_size={} {} with database_id={}, table_id={}",
name_mapper.debugCanonicalName(*db_info, *table_info),
new_part_id_set.size(),
db_info->id,
table_info->id);
return;
}

auto updated_table_info = orig_table_info;
// Copy the local table info and update fileds on the copy
auto updated_table_info = local_table_info;
updated_table_info.is_partition_table = true;
updated_table_info.belonging_table_id = table_info->belonging_table_id;
updated_table_info.partition = table_info->partition;

/// Apply changes to physical tables.
for (const auto & orig_def : orig_defs)
for (const auto & local_def : local_defs)
{
if (new_part_id_set.count(orig_def.id) == 0)
if (!new_part_id_set.contains(local_def.id))
{
applyDropPhysicalTable(name_mapper.mapDatabaseName(*db_info), orig_def.id);
applyDropPhysicalTable(name_mapper.mapDatabaseName(*db_info), local_def.id);
}
}

for (const auto & new_def : new_defs)
{
if (orig_part_id_set.count(new_def.id) == 0)
if (!local_part_id_set.contains(new_def.id))
{
table_id_map.emplacePartitionTableID(new_def.id, updated_table_info.id);
}
Expand Down Expand Up @@ -733,7 +741,7 @@ template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyCreateSchema(const TiDB::DBInfoPtr & db_info)
{
GET_METRIC(tiflash_schema_internal_ddl_count, type_create_db).Increment();
LOG_INFO(log, "Creating database {} with database_id={}", name_mapper.debugDatabaseName(*db_info), db_info->id);
LOG_INFO(log, "Create database {} begin, database_id={}", name_mapper.debugDatabaseName(*db_info), db_info->id);

auto statement = createDatabaseStmt(context, *db_info, name_mapper);

Expand All @@ -749,7 +757,7 @@ void SchemaBuilder<Getter, NameMapper>::applyCreateSchema(const TiDB::DBInfoPtr
databases.emplace(db_info->id, db_info);
}

LOG_INFO(log, "Created database {} with database_id={}", name_mapper.debugDatabaseName(*db_info), db_info->id);
LOG_INFO(log, "Create database {} end, database_id={}", name_mapper.debugDatabaseName(*db_info), db_info->id);
}

template <typename Getter, typename NameMapper>
Expand Down Expand Up @@ -786,11 +794,11 @@ template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyDropSchema(const String & db_name)
{
GET_METRIC(tiflash_schema_internal_ddl_count, type_drop_db).Increment();
LOG_INFO(log, "Tombstoning database {}", db_name);
LOG_INFO(log, "Tombstone database begin, db_name={}", db_name);
auto db = context.tryGetDatabase(db_name);
if (db == nullptr)
{
LOG_INFO(log, "Database {} does not exists", db_name);
LOG_INFO(log, "Database does not exist, db_name={}", db_name);
return;
}

Expand All @@ -807,7 +815,7 @@ void SchemaBuilder<Getter, NameMapper>::applyDropSchema(const String & db_name)
auto tombstone = tmt_context.getPDClient()->getTS();
db->alterTombstone(context, tombstone);

LOG_INFO(log, "Tombstoned database {}", db_name);
LOG_INFO(log, "Tombstone database end, db_name={}", db_name);
}

std::tuple<NamesAndTypes, Strings> parseColumnsFromTableInfo(const TiDB::TableInfo & table_info)
Expand Down Expand Up @@ -888,14 +896,14 @@ String createTableStmt(
}

template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyCreatePhysicalTable(
void SchemaBuilder<Getter, NameMapper>::applyCreateStorageInstance(
const TiDB::DBInfoPtr & db_info,
const TableInfoPtr & table_info)
{
GET_METRIC(tiflash_schema_internal_ddl_count, type_create_table).Increment();
LOG_INFO(
log,
"Creating table {} with database_id={}, table_id={}",
"Create table {} begin, database_id={}, table_id={}",
name_mapper.debugCanonicalName(*db_info, *table_info),
db_info->id,
table_info->id);
Expand Down Expand Up @@ -978,7 +986,7 @@ void SchemaBuilder<Getter, NameMapper>::applyCreatePhysicalTable(
interpreter.execute();
LOG_INFO(
log,
"Created table {}, database_id={} table_id={}",
"Creat table {} end, database_id={} table_id={}",
name_mapper.debugCanonicalName(*db_info, *table_info),
db_info->id,
table_info->id);
Expand All @@ -992,13 +1000,13 @@ void SchemaBuilder<Getter, NameMapper>::applyDropPhysicalTable(const String & db
auto storage = tmt_context.getStorages().get(keyspace_id, table_id);
if (storage == nullptr)
{
LOG_DEBUG(log, "table {} does not exist.", table_id);
LOG_DEBUG(log, "table does not exist, table_id={}", table_id);
return;
}
GET_METRIC(tiflash_schema_internal_ddl_count, type_drop_table).Increment();
LOG_INFO(
log,
"Tombstoning table {}.{}, table_id={}",
"Tombstone table {}.{} begin, table_id={}",
db_name,
name_mapper.debugTableName(storage->getTableInfo()),
table_id);
Expand All @@ -1020,7 +1028,7 @@ void SchemaBuilder<Getter, NameMapper>::applyDropPhysicalTable(const String & db
storage->updateTombstone(alter_lock, commands, db_name, storage->getTableInfo(), name_mapper, context);
LOG_INFO(
log,
"Tombstoned table {}.{}, table_id={}",
"Tombstone table {}.{} end, table_id={}",
db_name,
name_mapper.debugTableName(storage->getTableInfo()),
table_id);
Expand All @@ -1033,14 +1041,29 @@ void SchemaBuilder<Getter, NameMapper>::applyDropTable(DatabaseID database_id, T
auto * storage = tmt_context.getStorages().get(keyspace_id, table_id).get();
if (storage == nullptr)
{
LOG_DEBUG(log, "table {} does not exist.", table_id);
LOG_DEBUG(log, "table does not exist, table_id={}", table_id);
return;
}
const auto & table_info = storage->getTableInfo();
if (table_info.isLogicalPartitionTable())
{
for (const auto & part_def : table_info.partition.definitions)
{
if (TableID latest_logical_table_id = table_id_map.findTableIDInPartitionMap(part_def.id);
latest_logical_table_id == -1 || latest_logical_table_id != table_info.id)
{
// The partition is managed by another logical table now (caused by `alter table X partition by ...`),
// skip dropping this partition when dropping the old logical table
LOG_INFO(
log,
"The partition is not managed by current logical table, skip, partition_table_id={} "
"new_logical_table_id={} current_logical_table_id={}",
part_def.id,
latest_logical_table_id,
table_info.id);
continue;
}

applyDropPhysicalTable(name_mapper.mapDatabaseName(database_id, keyspace_id), part_def.id);
}
}
Expand All @@ -1055,7 +1078,7 @@ void SchemaBuilder<Getter, NameMapper>::applyDropTable(DatabaseID database_id, T
template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::syncAllSchema()
{
LOG_INFO(log, "Syncing all schemas.");
LOG_INFO(log, "Sync all schemas begin");

/// Create all databases.
std::vector<DBInfoPtr> all_schemas = getter.listDBs();
Expand Down Expand Up @@ -1116,7 +1139,7 @@ void SchemaBuilder<Getter, NameMapper>::syncAllSchema()
table_id_map.emplaceTableID(table->id, db->id);
LOG_DEBUG(log, "register table to table_id_map, database_id={} table_id={}", db->id, table->id);

applyCreatePhysicalTable(db, table);
applyCreateStorageInstance(db, table);
if (table->isLogicalPartitionTable())
{
for (const auto & part_def : table->partition.definitions)
Expand Down Expand Up @@ -1173,7 +1196,7 @@ void SchemaBuilder<Getter, NameMapper>::syncAllSchema()
}
}

LOG_INFO(log, "Loaded all schemas.");
LOG_INFO(log, "Sync all schemas end");
}

template <typename Getter, typename NameMapper>
Expand Down Expand Up @@ -1230,7 +1253,7 @@ void SchemaBuilder<Getter, NameMapper>::applyTable(
return;
}

applyCreatePhysicalTable(db_info, table_info);
applyCreateStorageInstance(db_info, table_info);
}
else
{
Expand All @@ -1255,7 +1278,7 @@ void SchemaBuilder<Getter, NameMapper>::applyTable(
template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::dropAllSchema()
{
LOG_INFO(log, "Dropping all schemas.");
LOG_INFO(log, "Drop all schemas begin");

auto & tmt_context = context.getTMTContext();

Expand Down Expand Up @@ -1290,7 +1313,7 @@ void SchemaBuilder<Getter, NameMapper>::dropAllSchema()
LOG_DEBUG(log, "DB {} dropped during drop all schemas", db.first);
}

LOG_INFO(log, "Dropped all schemas.");
LOG_INFO(log, "Drop all schemas end");
}

// product env
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/TiDB/Schema/SchemaBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ struct SchemaBuilder

void applyCreateSchema(const TiDB::DBInfoPtr & db_info);

void applyCreatePhysicalTable(const TiDB::DBInfoPtr & db_info, const TiDB::TableInfoPtr & table_info);
void applyCreateStorageInstance(const TiDB::DBInfoPtr & db_info, const TiDB::TableInfoPtr & table_info);

void applyDropTable(DatabaseID database_id, TableID table_id);

Expand All @@ -207,7 +207,6 @@ struct SchemaBuilder
void applyDropPhysicalTable(const String & db_name, TableID table_id);

void applyPartitionDiff(DatabaseID database_id, TableID table_id);

void applyPartitionDiff(
const TiDB::DBInfoPtr & db_info,
const TiDB::TableInfoPtr & table_info,
Expand Down
Loading

0 comments on commit 27de3d3

Please sign in to comment.