Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: Fix potential data lost of alter_partition_by #8337

Merged
merged 12 commits into from
Nov 10, 2023
157 changes: 111 additions & 46 deletions dbms/src/TiDB/Schema/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void SchemaBuilder<Getter, NameMapper>::applyCreateTable(DatabaseID database_id,
// If table is partition table, we will create the logical table here.
// Because we get the table_info, so we can ensure new_db_info will not be nullptr.
auto new_db_info = getter.getDatabase(database_id);
applyCreatePhysicalTable(new_db_info, table_info);
applyCreateStorageInstance(new_db_info, table_info);

for (const auto & part_def : table_info->partition.definitions)
{
Expand Down Expand Up @@ -252,9 +252,14 @@ void SchemaBuilder<Getter, NameMapper>::applyDiff(const SchemaDiff & diff)
}
else
{
/// The new non-partitioned table will have a new id
// Create the new table.
// If the new table is a partition table, this will also overwrite
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be better to seperate the logical for ActionAlterTablePartitioning and ActionRemovePartitioning

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ActionRemovePartitioning, it will

  • Receive a schema diff, and add the new non-partition table as a table_id in partition.adding_definitions. -- TiFlash should add the new table id to mapping and handle the apply snapshot
  • Then receive a schema diff to make the new non-partition table as a normal table and remove the old partition-table

So it is the same logic as ActionAlterTablePartitioning in tiflash

// the partition id mapping to the new logical table and renew the
// partition info.
applyPartitionAlter(diff.schema_id, diff.table_id);
// Drop the old table. if the previous partitions of the old table are
// not mapping to the old logical table now, they will not be removed.
applyDropTable(diff.schema_id, diff.old_table_id);
applyCreateTable(diff.schema_id, diff.table_id);
}
break;
}
Expand Down Expand Up @@ -373,6 +378,51 @@ void SchemaBuilder<Getter, NameMapper>::applySetTiFlashReplica(DatabaseID databa
}
}

template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyPartitionAlter(DatabaseID database_id, TableID table_id)
{
auto table_info = getter.getTableInfo(database_id, table_id);
if (table_info == nullptr) // the database maybe dropped
{
LOG_DEBUG(log, "table is not exist in TiKV, may have been dropped, table_id={}", table_id);
return;
}

table_id_map.emplaceTableID(table_id, database_id);
LOG_DEBUG(log, "register table to table_id_map, database_id={} table_id={}", database_id, table_id);

if (!table_info->isLogicalPartitionTable())
{
return;
}

// If table is partition table, we will create the logical table here.
// Because we get the table_info, so we can ensure new_db_info will not be nullptr.
auto new_db_info = getter.getDatabase(database_id);
applyCreateStorageInstance(new_db_info, table_info);

for (const auto & part_def : table_info->partition.definitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when do applyPartitionDiff(new_db_info, table_info, storage), we also will emplacePartitionTableId based on the definitions, why do we also add it here?

This comment was marked as off-topic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get what you mean, let me check it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we create the Storage instance by table_info in line 402, then try to execute applyPartitionDiff. Then nothing must change between the table_info and storage.table_info. So calling applyPartitionDiff is redundant.
Then this function is simple the same as applyCreateTable. So I have remove the function applyPartitionAlter

case SchemaActionType::ActionRemovePartitioning:
{
if (diff.table_id == diff.old_table_id)
{
/// Only internal additions of new partitions
applyPartitionDiff(diff.schema_id, diff.table_id);
}
else
{
// Create the new table.
// If the new table is a partition table, this will also overwrite
// the partition id mapping to the new logical table
applyCreateTable(diff.schema_id, diff.table_id);

{
LOG_DEBUG(
log,
"register table to table_id_map for partition table, logical_table_id={} physical_table_id={}",
table_id,
part_def.id);
table_id_map.emplacePartitionTableID(part_def.id, table_id);
}

auto & tmt_context = context.getTMTContext();
auto storage = tmt_context.getStorages().get(keyspace_id, table_info->id);
if (storage == nullptr)
{
LOG_ERROR(log, "table is not exist in TiFlash, table_id={}", table_id);
return;
}

// Try to renew the partition info for the new table
applyPartitionDiff(new_db_info, table_info, storage);
}

template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyPartitionDiff(DatabaseID database_id, TableID table_id)
{
Expand Down Expand Up @@ -410,70 +460,70 @@ void SchemaBuilder<Getter, NameMapper>::applyPartitionDiff(
const TableInfoPtr & table_info,
const ManageableStoragePtr & storage)
{
const auto & orig_table_info = storage->getTableInfo();
if (!orig_table_info.isLogicalPartitionTable())
const auto & local_table_info = storage->getTableInfo();
// ALTER TABLE t PARTITION BY ... may turn a non-partition table into partition table
// with some partition ids in `partition.adding_definitions`/`partition.definitions`
// and `partition.dropping_definitions`. We need to create those partitions.
if (!local_table_info.isLogicalPartitionTable())
{
LOG_ERROR(
LOG_INFO(
log,
"old table in TiFlash not partition table {} with database_id={}, table_id={}",
name_mapper.debugCanonicalName(*db_info, orig_table_info),
"Altering non-partition table to be a partition table {} with database_id={}, table_id={}",
name_mapper.debugCanonicalName(*db_info, local_table_info),
db_info->id,
orig_table_info.id);
return;
local_table_info.id);
}
Comment on lines +425 to 433
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When fetching the last table schema from TiKV, TiFlash may see a non-partition table turned into a table with "partition" field like this.
We need to create the physical table with the id in partition.adding_definitions/partition.definitions/partition.dropping_definitions

"partition": {
        "type": 0,
        "expr": "",
        "columns": null,
        "enable": true,
        "definitions": [{
            "id": 164,
            "name": {
                "O": "pFullTable",
                "L": "pfulltable"
            },
            "less_than": null,
            "in_values": null,
            "policy_ref_info": null,
            "comment": "Intermediate partition during ALTER TABLE ... PARTITION BY ..."
        }],
        "adding_definitions": [{
            "id": 168,
            "name": {
                "O": "p0",
                "L": "p0"
            },
            "less_than": null,
            "in_values": null,
            "policy_ref_info": null
        }, {
            "id": 169,
            "name": {
                "O": "p1",
                "L": "p1"
            },
            "less_than": null,
            "in_values": null,
            "policy_ref_info": null
        }, {
            "id": 170,
            "name": {
                "O": "p2",
                "L": "p2"
            },
            "less_than": null,
            "in_values": null,
            "policy_ref_info": null
        }],
        "dropping_definitions": [{
            "id": 164,
            "name": {
                "O": "pFullTable",
                "L": "pfulltable"
            },
            "less_than": null,
            "in_values": null,
            "policy_ref_info": null,
            "comment": "Intermediate partition during ALTER TABLE ... PARTITION BY ..."
        }],
        "NewPartitionIDs": null,
        "states": null,
        "num": 1,
        "ddl_state": 3,
        "new_table_id": 171,
        "ddl_type": 2,
        "ddl_expr": "`a`",
        "ddl_columns": null
    },


const auto & orig_defs = orig_table_info.partition.definitions;
const auto & local_defs = local_table_info.partition.definitions;
const auto & new_defs = table_info->partition.definitions;

std::unordered_set<TableID> orig_part_id_set, new_part_id_set;
std::vector<String> orig_part_ids, new_part_ids;
std::for_each(orig_defs.begin(), orig_defs.end(), [&orig_part_id_set, &orig_part_ids](const auto & def) {
orig_part_id_set.emplace(def.id);
orig_part_ids.emplace_back(std::to_string(def.id));
std::unordered_set<TableID> local_part_id_set, new_part_id_set;
std::for_each(local_defs.begin(), local_defs.end(), [&local_part_id_set](const auto & def) {
local_part_id_set.emplace(def.id);
});
std::for_each(new_defs.begin(), new_defs.end(), [&new_part_id_set, &new_part_ids](const auto & def) {
std::for_each(new_defs.begin(), new_defs.end(), [&new_part_id_set](const auto & def) {
new_part_id_set.emplace(def.id);
new_part_ids.emplace_back(std::to_string(def.id));
});

auto orig_part_ids_str = boost::algorithm::join(orig_part_ids, ", ");
auto new_part_ids_str = boost::algorithm::join(new_part_ids, ", ");

LOG_INFO(
log,
"Applying partition changes {} with database_id={}, table_id={}, old: {}, new: {}",
name_mapper.debugCanonicalName(*db_info, *table_info),
db_info->id,
table_info->id,
orig_part_ids_str,
new_part_ids_str);
local_part_id_set,
new_part_id_set);

if (orig_part_id_set == new_part_id_set)
if (local_part_id_set == new_part_id_set)
{
LOG_INFO(
log,
"No partition changes {} with database_id={}, table_id={}",
"No partition changes, paritions_size={} {} with database_id={}, table_id={}",
name_mapper.debugCanonicalName(*db_info, *table_info),
new_part_id_set.size(),
db_info->id,
table_info->id);
return;
}

auto updated_table_info = orig_table_info;
// Copy the local table info and update fileds on the copy
auto updated_table_info = local_table_info;
updated_table_info.is_partition_table = true;
updated_table_info.belonging_table_id = table_info->belonging_table_id;
updated_table_info.partition = table_info->partition;

/// Apply changes to physical tables.
for (const auto & orig_def : orig_defs)
for (const auto & local_def : local_defs)
{
if (new_part_id_set.count(orig_def.id) == 0)
if (!new_part_id_set.contains(local_def.id))
{
applyDropPhysicalTable(name_mapper.mapDatabaseName(*db_info), orig_def.id);
applyDropPhysicalTable(name_mapper.mapDatabaseName(*db_info), local_def.id);
}
}

for (const auto & new_def : new_defs)
{
if (orig_part_id_set.count(new_def.id) == 0)
if (!local_part_id_set.contains(new_def.id))
{
table_id_map.emplacePartitionTableID(new_def.id, updated_table_info.id);
}
Expand Down Expand Up @@ -733,7 +783,7 @@ template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyCreateSchema(const TiDB::DBInfoPtr & db_info)
{
GET_METRIC(tiflash_schema_internal_ddl_count, type_create_db).Increment();
LOG_INFO(log, "Creating database {} with database_id={}", name_mapper.debugDatabaseName(*db_info), db_info->id);
LOG_INFO(log, "Create database {} begin, database_id={}", name_mapper.debugDatabaseName(*db_info), db_info->id);

auto statement = createDatabaseStmt(context, *db_info, name_mapper);

Expand All @@ -749,7 +799,7 @@ void SchemaBuilder<Getter, NameMapper>::applyCreateSchema(const TiDB::DBInfoPtr
databases.emplace(db_info->id, db_info);
}

LOG_INFO(log, "Created database {} with database_id={}", name_mapper.debugDatabaseName(*db_info), db_info->id);
LOG_INFO(log, "Create database {} end, database_id={}", name_mapper.debugDatabaseName(*db_info), db_info->id);
}

template <typename Getter, typename NameMapper>
Expand Down Expand Up @@ -786,11 +836,11 @@ template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyDropSchema(const String & db_name)
{
GET_METRIC(tiflash_schema_internal_ddl_count, type_drop_db).Increment();
LOG_INFO(log, "Tombstoning database {}", db_name);
LOG_INFO(log, "Tombstone database begin, db_name={}", db_name);
auto db = context.tryGetDatabase(db_name);
if (db == nullptr)
{
LOG_INFO(log, "Database {} does not exists", db_name);
LOG_INFO(log, "Database does not exist, db_name={}", db_name);
return;
}

Expand All @@ -807,7 +857,7 @@ void SchemaBuilder<Getter, NameMapper>::applyDropSchema(const String & db_name)
auto tombstone = tmt_context.getPDClient()->getTS();
db->alterTombstone(context, tombstone);

LOG_INFO(log, "Tombstoned database {}", db_name);
LOG_INFO(log, "Tombstone database end, db_name={}", db_name);
}

std::tuple<NamesAndTypes, Strings> parseColumnsFromTableInfo(const TiDB::TableInfo & table_info)
Expand Down Expand Up @@ -888,7 +938,7 @@ String createTableStmt(
}

template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyCreatePhysicalTable(
void SchemaBuilder<Getter, NameMapper>::applyCreateStorageInstance(
const TiDB::DBInfoPtr & db_info,
const TableInfoPtr & table_info)
{
Expand Down Expand Up @@ -992,13 +1042,13 @@ void SchemaBuilder<Getter, NameMapper>::applyDropPhysicalTable(const String & db
auto storage = tmt_context.getStorages().get(keyspace_id, table_id);
if (storage == nullptr)
{
LOG_DEBUG(log, "table {} does not exist.", table_id);
LOG_DEBUG(log, "table does not exist, table_id={}", table_id);
return;
}
GET_METRIC(tiflash_schema_internal_ddl_count, type_drop_table).Increment();
LOG_INFO(
log,
"Tombstoning table {}.{}, table_id={}",
"Tombstone table {}.{} begin, table_id={}",
db_name,
name_mapper.debugTableName(storage->getTableInfo()),
table_id);
Expand All @@ -1020,7 +1070,7 @@ void SchemaBuilder<Getter, NameMapper>::applyDropPhysicalTable(const String & db
storage->updateTombstone(alter_lock, commands, db_name, storage->getTableInfo(), name_mapper, context);
LOG_INFO(
log,
"Tombstoned table {}.{}, table_id={}",
"Tombstone table {}.{} end, table_id={}",
db_name,
name_mapper.debugTableName(storage->getTableInfo()),
table_id);
Expand All @@ -1033,14 +1083,29 @@ void SchemaBuilder<Getter, NameMapper>::applyDropTable(DatabaseID database_id, T
auto * storage = tmt_context.getStorages().get(keyspace_id, table_id).get();
if (storage == nullptr)
{
LOG_DEBUG(log, "table {} does not exist.", table_id);
LOG_DEBUG(log, "table does not exist, table_id={}", table_id);
return;
}
const auto & table_info = storage->getTableInfo();
if (table_info.isLogicalPartitionTable())
{
for (const auto & part_def : table_info.partition.definitions)
{
if (TableID latest_logical_table_id = table_id_map.findTableIDInPartitionMap(part_def.id);
latest_logical_table_id == -1 || latest_logical_table_id != table_info.id)
{
// The partition is managed by another logical table now (caused by `alter table X partition by ...`),
// skip dropping this partition when dropping the old logical table
LOG_INFO(
log,
"The partition is not managed by current logical table, skip, partition_table_id={} "
"new_logical_table_id={} current_logical_table_id={}",
part_def.id,
latest_logical_table_id,
table_info.id);
continue;
}

applyDropPhysicalTable(name_mapper.mapDatabaseName(database_id, keyspace_id), part_def.id);
}
}
Expand All @@ -1055,7 +1120,7 @@ void SchemaBuilder<Getter, NameMapper>::applyDropTable(DatabaseID database_id, T
template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::syncAllSchema()
{
LOG_INFO(log, "Syncing all schemas.");
LOG_INFO(log, "Sync all schemas begin");

/// Create all databases.
std::vector<DBInfoPtr> all_schemas = getter.listDBs();
Expand Down Expand Up @@ -1116,7 +1181,7 @@ void SchemaBuilder<Getter, NameMapper>::syncAllSchema()
table_id_map.emplaceTableID(table->id, db->id);
LOG_DEBUG(log, "register table to table_id_map, database_id={} table_id={}", db->id, table->id);

applyCreatePhysicalTable(db, table);
applyCreateStorageInstance(db, table);
if (table->isLogicalPartitionTable())
{
for (const auto & part_def : table->partition.definitions)
Expand Down Expand Up @@ -1173,7 +1238,7 @@ void SchemaBuilder<Getter, NameMapper>::syncAllSchema()
}
}

LOG_INFO(log, "Loaded all schemas.");
LOG_INFO(log, "Sync all schemas end");
}

template <typename Getter, typename NameMapper>
Expand Down Expand Up @@ -1230,7 +1295,7 @@ void SchemaBuilder<Getter, NameMapper>::applyTable(
return;
}

applyCreatePhysicalTable(db_info, table_info);
applyCreateStorageInstance(db_info, table_info);
}
else
{
Expand All @@ -1255,7 +1320,7 @@ void SchemaBuilder<Getter, NameMapper>::applyTable(
template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::dropAllSchema()
{
LOG_INFO(log, "Dropping all schemas.");
LOG_INFO(log, "Drop all schemas begin");

auto & tmt_context = context.getTMTContext();

Expand Down Expand Up @@ -1290,7 +1355,7 @@ void SchemaBuilder<Getter, NameMapper>::dropAllSchema()
LOG_DEBUG(log, "DB {} dropped during drop all schemas", db.first);
}

LOG_INFO(log, "Dropped all schemas.");
LOG_INFO(log, "Drop all schemas end");
}

// product env
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/TiDB/Schema/SchemaBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ struct SchemaBuilder

void applyCreateSchema(const TiDB::DBInfoPtr & db_info);

void applyCreatePhysicalTable(const TiDB::DBInfoPtr & db_info, const TiDB::TableInfoPtr & table_info);
void applyCreateStorageInstance(const TiDB::DBInfoPtr & db_info, const TiDB::TableInfoPtr & table_info);

void applyDropTable(DatabaseID database_id, TableID table_id);

Expand All @@ -206,8 +206,9 @@ struct SchemaBuilder
/// Parameter schema_name should be mapped.
void applyDropPhysicalTable(const String & db_name, TableID table_id);

void applyPartitionDiff(DatabaseID database_id, TableID table_id);
void applyPartitionAlter(DatabaseID database_id, TableID table_id);

void applyPartitionDiff(DatabaseID database_id, TableID table_id);
void applyPartitionDiff(
const TiDB::DBInfoPtr & db_info,
const TiDB::TableInfoPtr & table_info,
Expand Down
Loading