Skip to content

Commit c21a5cf

Browse files
authored
br: add integration test for pitr (#47740)
ref #47738
1 parent 515899f commit c21a5cf

File tree

6 files changed

+291
-34
lines changed

6 files changed

+291
-34
lines changed

br/pkg/stream/rewrite_meta_rawkv.go

+37-33
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
backuppb "github.com/pingcap/kvproto/pkg/brpb"
2424
"github.com/pingcap/log"
2525
berrors "github.com/pingcap/tidb/br/pkg/errors"
26+
"github.com/pingcap/tidb/br/pkg/logutil"
2627
"github.com/pingcap/tidb/br/pkg/restore/ingestrec"
2728
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
2829
"github.com/pingcap/tidb/pkg/kv"
@@ -705,10 +706,11 @@ func (sr *SchemasReplace) restoreFromHistory(job *model.Job, isSubJob bool) erro
705706
}
706707

707708
func (sr *SchemasReplace) deleteRange(job *model.Job) error {
709+
lctx := logutil.ContextWithField(context.Background(), logutil.RedactAny("category", "ddl: rewrite delete range"))
708710
dbReplace, exist := sr.DbMap[job.SchemaID]
709711
if !exist {
710712
// skip this mddljob, the same below
711-
log.Debug("try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID))
713+
logutil.CL(lctx).Warn("try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID))
712714
return nil
713715
}
714716

@@ -744,14 +746,14 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
744746
newTableIDs := make([]int64, 0, len(tableIDs))
745747
for tableID, tableReplace := range dbReplace.TableMap {
746748
if _, exist := argsSet[tableID]; !exist {
747-
log.Debug("DropSchema: record a table, but it doesn't exist in job args",
749+
logutil.CL(lctx).Warn("DropSchema: record a table, but it doesn't exist in job args",
748750
zap.Int64("oldTableID", tableID))
749751
continue
750752
}
751753
newTableIDs = append(newTableIDs, tableReplace.TableID)
752754
for partitionID, newPartitionID := range tableReplace.PartitionMap {
753755
if _, exist := argsSet[partitionID]; !exist {
754-
log.Debug("DropSchema: record a partition, but it doesn't exist in job args",
756+
logutil.CL(lctx).Warn("DropSchema: record a partition, but it doesn't exist in job args",
755757
zap.Int64("oldPartitionID", partitionID))
756758
continue
757759
}
@@ -760,7 +762,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
760762
}
761763

762764
if len(newTableIDs) != len(tableIDs) {
763-
log.Debug(
765+
logutil.CL(lctx).Warn(
764766
"DropSchema: try to drop a non-existent table/partition, whose oldID doesn't exist in tableReplace")
765767
// only drop newTableIDs' ranges
766768
}
@@ -774,7 +776,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
774776
case model.ActionDropTable, model.ActionTruncateTable:
775777
tableReplace, exist := dbReplace.TableMap[job.TableID]
776778
if !exist {
777-
log.Debug("DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID",
779+
logutil.CL(lctx).Warn("DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID",
778780
zap.Int64("oldTableID", job.TableID))
779781
return nil
780782
}
@@ -787,18 +789,19 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
787789
return errors.Trace(err)
788790
}
789791
if len(physicalTableIDs) > 0 {
790-
// delete partition id instead of table id
791-
for i := 0; i < len(physicalTableIDs); i++ {
792-
newPid, exist := tableReplace.PartitionMap[physicalTableIDs[i]]
792+
newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs))
793+
// delete partition id
794+
for _, oldPid := range physicalTableIDs {
795+
newPid, exist := tableReplace.PartitionMap[oldPid]
793796
if !exist {
794-
log.Debug("DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID",
795-
zap.Int64("oldPartitionID", physicalTableIDs[i]))
797+
logutil.CL(lctx).Warn("DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID",
798+
zap.Int64("oldPartitionID", oldPid))
796799
continue
797800
}
798-
physicalTableIDs[i] = newPid
801+
newPhysicalTableIDs = append(newPhysicalTableIDs, newPid)
799802
}
800-
if len(physicalTableIDs) > 0 {
801-
sr.insertDeleteRangeForTable(newJobID, physicalTableIDs)
803+
if len(newPhysicalTableIDs) > 0 {
804+
sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs)
802805
}
803806
return nil
804807
}
@@ -808,7 +811,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
808811
case model.ActionDropTablePartition, model.ActionTruncateTablePartition:
809812
tableReplace, exist := dbReplace.TableMap[job.TableID]
810813
if !exist {
811-
log.Debug(
814+
logutil.CL(lctx).Warn(
812815
"DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldTableID",
813816
zap.Int64("oldTableID", job.TableID))
814817
return nil
@@ -818,26 +821,27 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
818821
return errors.Trace(err)
819822
}
820823

821-
for i := 0; i < len(physicalTableIDs); i++ {
822-
newPid, exist := tableReplace.PartitionMap[physicalTableIDs[i]]
824+
newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs))
825+
for _, oldPid := range physicalTableIDs {
826+
newPid, exist := tableReplace.PartitionMap[oldPid]
823827
if !exist {
824-
log.Debug(
828+
logutil.CL(lctx).Warn(
825829
"DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldPartitionID",
826-
zap.Int64("oldPartitionID", physicalTableIDs[i]))
830+
zap.Int64("oldPartitionID", oldPid))
827831
continue
828832
}
829-
physicalTableIDs[i] = newPid
833+
newPhysicalTableIDs = append(newPhysicalTableIDs, newPid)
830834
}
831-
if len(physicalTableIDs) > 0 {
832-
sr.insertDeleteRangeForTable(newJobID, physicalTableIDs)
835+
if len(newPhysicalTableIDs) > 0 {
836+
sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs)
833837
}
834838
return nil
835839
// ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled.
836840
case model.ActionAddIndex, model.ActionAddPrimaryKey:
837841
// iff job.State = model.JobStateRollbackDone
838842
tableReplace, exist := dbReplace.TableMap[job.TableID]
839843
if !exist {
840-
log.Debug("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID",
844+
logutil.CL(lctx).Warn("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID",
841845
zap.Int64("oldTableID", job.TableID))
842846
return nil
843847
}
@@ -856,7 +860,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
856860
for _, oldPid := range partitionIDs {
857861
newPid, exist := tableReplace.PartitionMap[oldPid]
858862
if !exist {
859-
log.Debug(
863+
logutil.CL(lctx).Warn(
860864
"AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldPartitionID",
861865
zap.Int64("oldPartitionID", oldPid))
862866
continue
@@ -871,7 +875,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
871875
case model.ActionDropIndex, model.ActionDropPrimaryKey:
872876
tableReplace, exist := dbReplace.TableMap[job.TableID]
873877
if !exist {
874-
log.Debug("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
878+
logutil.CL(lctx).Warn("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
875879
return nil
876880
}
877881

@@ -890,7 +894,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
890894
for _, oldPid := range partitionIDs {
891895
newPid, exist := tableReplace.PartitionMap[oldPid]
892896
if !exist {
893-
log.Debug("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
897+
logutil.CL(lctx).Warn("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
894898
continue
895899
}
896900
// len(indexIDs) = 1
@@ -913,7 +917,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
913917

914918
tableReplace, exist := dbReplace.TableMap[job.TableID]
915919
if !exist {
916-
log.Debug("DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
920+
logutil.CL(lctx).Warn("DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
917921
return nil
918922
}
919923

@@ -922,7 +926,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
922926
for _, oldPid := range partitionIDs {
923927
newPid, exist := tableReplace.PartitionMap[oldPid]
924928
if !exist {
925-
log.Debug("DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
929+
logutil.CL(lctx).Warn("DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
926930
continue
927931
}
928932
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
@@ -942,7 +946,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
942946
if len(indexIDs) > 0 {
943947
tableReplace, exist := dbReplace.TableMap[job.TableID]
944948
if !exist {
945-
log.Debug("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
949+
logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
946950
return nil
947951
}
948952

@@ -951,7 +955,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
951955
for _, oldPid := range partitionIDs {
952956
newPid, exist := tableReplace.PartitionMap[oldPid]
953957
if !exist {
954-
log.Debug("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
958+
logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
955959
continue
956960
}
957961
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
@@ -972,7 +976,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
972976
if len(indexIDs) > 0 {
973977
tableReplace, exist := dbReplace.TableMap[job.TableID]
974978
if !exist {
975-
log.Debug("DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
979+
logutil.CL(lctx).Warn("DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
976980
return nil
977981
}
978982

@@ -981,7 +985,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
981985
for _, oldPid := range partitionIDs {
982986
newPid, exist := tableReplace.PartitionMap[oldPid]
983987
if !exist {
984-
log.Debug("DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
988+
logutil.CL(lctx).Warn("DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
985989
continue
986990
}
987991
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
@@ -1001,7 +1005,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
10011005
}
10021006
tableReplace, exist := dbReplace.TableMap[job.TableID]
10031007
if !exist {
1004-
log.Debug("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
1008+
logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
10051009
return nil
10061010
}
10071011

@@ -1010,7 +1014,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
10101014
for _, oldPid := range partitionIDs {
10111015
newPid, exist := tableReplace.PartitionMap[oldPid]
10121016
if !exist {
1013-
log.Debug("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
1017+
logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
10141018
continue
10151019
}
10161020
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)

br/pkg/stream/util_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ func TestDateFormat(t *testing.T) {
2323
434605479096221697,
2424
"2022-07-15 20:32:12.734 +0800",
2525
},
26+
{
27+
434605478903808000,
28+
"2022-07-15 20:32:12 +0800",
29+
},
2630
}
2731

2832
timeZone, _ := time.LoadLocation("Asia/Shanghai")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
-- 1. Drop Schema
2+
drop database db_to_be_dropped;
3+
-- 2. Drop/Truncate Table
4+
drop table table_to_be_dropped_or_truncated.t0_dropped;
5+
drop table table_to_be_dropped_or_truncated.t1_dropped;
6+
truncate table table_to_be_dropped_or_truncated.t0_truncated;
7+
truncate table table_to_be_dropped_or_truncated.t1_truncated;
8+
-- 3. Drop/Truncate Table Partition
9+
alter table partition_to_be_dropped_or_truncated.t1_dropped drop partition p0;
10+
alter table partition_to_be_dropped_or_truncated.t1_truncated truncate partition p0;
11+
-- 4. Drop Table Index/PrimaryKey
12+
alter table index_or_primarykey_to_be_dropped.t0 drop index k1;
13+
alter table index_or_primarykey_to_be_dropped.t1 drop index k1;
14+
alter table index_or_primarykey_to_be_dropped.t0 drop primary key;
15+
alter table index_or_primarykey_to_be_dropped.t1 drop primary key;
16+
-- 5. Drop Table Indexes
17+
alter table indexes_to_be_dropped.t0 drop index k1, drop index k2;
18+
alter table indexes_to_be_dropped.t1 drop index k1, drop index k2;
19+
-- 6. Drop Table Column/Columns
20+
alter table column_s_to_be_dropped.t0_column drop column name;
21+
alter table column_s_to_be_dropped.t1_column drop column name;
22+
alter table column_s_to_be_dropped.t0_columns drop column name, drop column c;
23+
alter table column_s_to_be_dropped.t1_columns drop column name, drop column c;
24+
-- 7. Modify Table Column
25+
alter table column_to_be_modified.t0 modify column name varchar(25);

0 commit comments

Comments
 (0)