From 4d1f787a887f9384e77425180ed7e53ae278f362 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Tue, 22 Feb 2022 21:55:43 +0800 Subject: [PATCH] lightning: let ignore columns be compatible with tidb backend (#27850) (#30064) --- br/pkg/lightning/backend/tidb/tidb.go | 30 ++- br/pkg/lightning/backend/tidb/tidb_test.go | 46 ++-- br/pkg/lightning/config/config.go | 8 + .../lightning/lightning_server_serial_test.go | 1 + br/pkg/lightning/restore/check_info.go | 7 +- br/pkg/lightning/restore/restore.go | 34 ++- br/pkg/lightning/restore/restore_test.go | 243 ++++++++++++++++-- br/pkg/lightning/restore/table_restore.go | 40 +-- br/tests/lightning_distributed_import/run.sh | 2 +- br/tests/lightning_duplicate_detection/run.sh | 2 +- .../data/rowid.explicit_tidb_rowid-schema.sql | 2 +- .../data/rowid.specific_auto_inc-schema.sql | 2 +- br/tests/run.sh | 2 +- 13 files changed, 339 insertions(+), 80 deletions(-) diff --git a/br/pkg/lightning/backend/tidb/tidb.go b/br/pkg/lightning/backend/tidb/tidb.go index 1b95fe558ef88..c748da0c76625 100644 --- a/br/pkg/lightning/backend/tidb/tidb.go +++ b/br/pkg/lightning/backend/tidb/tidb.go @@ -84,6 +84,9 @@ type tidbEncoder struct { // the index of table columns for each data field. // index == len(table.columns) means this field is `_tidb_rowid` columnIdx []int + // the max index used in this chunk, due to the ignore-columns config, we can't + // directly check the total column count, so we fall back to only check that + // the there are enough columns. columnCnt int } @@ -284,22 +287,27 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co cols := enc.tbl.Cols() if len(enc.columnIdx) == 0 { - columnCount := 0 + columnMaxIdx := -1 columnIdx := make([]int, len(columnPermutation)) + for i := 0; i < len(columnPermutation); i++ { + columnIdx[i] = -1 + } for i, idx := range columnPermutation { if idx >= 0 { columnIdx[idx] = i - columnCount++ + if idx > columnMaxIdx { + columnMaxIdx = idx + } } } enc.columnIdx = columnIdx - enc.columnCnt = columnCount + enc.columnCnt = columnMaxIdx + 1 } // TODO: since the column count doesn't exactly reflect the real column names, we only check the upper bound currently. // See: tests/generated_columns/data/gencol.various_types.0.sql this sql has no columns, so encodeLoop will fill the // column permutation with default, thus enc.columnCnt > len(row). - if len(row) > enc.columnCnt { + if len(row) < enc.columnCnt { logger.Error("column count mismatch", zap.Ints("column_permutation", columnPermutation), zap.Array("data", kv.RowArrayMarshaler(row))) return emptyTiDBRow, errors.Errorf("column count mismatch, expected %d, got %d", enc.columnCnt, len(row)) @@ -308,8 +316,12 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co var encoded strings.Builder encoded.Grow(8 * len(row)) encoded.WriteByte('(') + cnt := 0 for i, field := range row { - if i != 0 { + if enc.columnIdx[i] < 0 { + continue + } + if cnt > 0 { encoded.WriteByte(',') } datum := field @@ -321,6 +333,7 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co ) return nil, err } + cnt++ } encoded.WriteByte(')') return tidbRow{ @@ -569,7 +582,7 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st serverInfo := version.ParseServerInfo(versionStr) rows, e := tx.Query(` - SELECT table_name, column_name, column_type, extra + SELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position; @@ -585,8 +598,8 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st curTable *model.TableInfo ) for rows.Next() { - var tableName, columnName, columnType, columnExtra string - if e := rows.Scan(&tableName, &columnName, &columnType, &columnExtra); e != nil { + var tableName, columnName, columnType, generationExpr, columnExtra string + if e := rows.Scan(&tableName, &columnName, &columnType, &generationExpr, &columnExtra); e != nil { return e } if tableName != curTableName { @@ -615,6 +628,7 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st FieldType: types.FieldType{ Flag: flag, }, + GeneratedExprString: generationExpr, }) curColOffset++ } diff --git a/br/pkg/lightning/backend/tidb/tidb_test.go b/br/pkg/lightning/backend/tidb/tidb_test.go index e37f53d12b27e..3a824063ccaed 100644 --- a/br/pkg/lightning/backend/tidb/tidb_test.go +++ b/br/pkg/lightning/backend/tidb/tidb_test.go @@ -82,7 +82,7 @@ func (s *mysqlSuite) TearDownTest(c *C) { func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) { s.mockDB. - ExpectExec("\\QREPLACE INTO `foo`.`bar`(`a`,`b`,`c`,`d`,`e`,`f`,`g`,`h`,`i`,`j`,`k`,`l`,`m`,`n`,`o`) VALUES(18446744073709551615,-9223372036854775808,0,NULL,7.5,5e-324,1.7976931348623157e+308,0,'甲乙丙\\r\\n\\0\\Z''\"\\\\`',x'000000abcdef',2557891634,'12.5',51)\\E"). + ExpectExec("\\QREPLACE INTO `foo`.`bar`(`b`,`d`,`e`,`f`,`g`,`h`,`i`,`j`,`k`,`l`,`m`,`n`,`o`) VALUES(-9223372036854775808,NULL,7.5,5e-324,1.7976931348623157e+308,0,'甲乙丙\\r\\n\\0\\Z''\"\\\\`',x'000000abcdef',2557891634,'12.5',51)\\E"). WillReturnResult(sqlmock.NewResult(1, 1)) ctx := context.Background() @@ -102,6 +102,9 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) { perms = append(perms, i) } perms = append(perms, -1) + // skip column a,c due to ignore-columns + perms[0] = -1 + perms[2] = -1 encoder, err := s.backend.NewEncoder(s.tbl, &kv.SessionOptions{SQLMode: 0, Timestamp: 1234567890}) c.Assert(err, IsNil) row, err := encoder.Encode(logger, []types.Datum{ @@ -126,7 +129,7 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) { writer, err := engine.LocalWriter(ctx, nil) c.Assert(err, IsNil) - err = writer.WriteRows(ctx, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o"}, dataRows) + err = writer.WriteRows(ctx, []string{"b", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o"}, dataRows) c.Assert(err, IsNil) st, err := writer.Close(ctx) c.Assert(err, IsNil) @@ -154,7 +157,7 @@ func (s *mysqlSuite) TestWriteRowsIgnoreOnDup(c *C) { c.Assert(err, IsNil) row, err := encoder.Encode(logger, []types.Datum{ types.NewIntDatum(1), - }, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "1.csv", 0) + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "1.csv", 0) c.Assert(err, IsNil) row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) @@ -198,7 +201,7 @@ func (s *mysqlSuite) TestWriteRowsErrorOnDup(c *C) { c.Assert(err, IsNil) row, err := encoder.Encode(logger, []types.Datum{ types.NewIntDatum(1), - }, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "3.csv", 0) + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "3.csv", 0) c.Assert(err, IsNil) row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) @@ -254,10 +257,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_3_x(c *C) { s.mockDB.ExpectBegin() s.mockDB.ExpectQuery("SELECT version()"). WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v3.0.18")) - s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). + s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). WithArgs("test"). - WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}). - AddRow("t", "id", "int(10)", "auto_increment")) + WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}). + AddRow("t", "id", "int(10)", "", "auto_increment")) s.mockDB.ExpectCommit() bk := tidb.NewTiDBBackend(s.dbHandle, config.ErrorOnDup, errormanager.New(nil, config.NewConfig())) @@ -286,10 +289,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_0(c *C) { s.mockDB.ExpectBegin() s.mockDB.ExpectQuery("SELECT version()"). WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.0")) - s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). + s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). WithArgs("test"). - WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}). - AddRow("t", "id", "bigint(20) unsigned", "auto_increment")) + WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}). + AddRow("t", "id", "bigint(20) unsigned", "", "auto_increment")) s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID"). WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID"}). AddRow("test", "t", "id", int64(1))) @@ -321,10 +324,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_x_auto_increment(c *C) { s.mockDB.ExpectBegin() s.mockDB.ExpectQuery("SELECT version()"). WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.7")) - s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). + s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). WithArgs("test"). - WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}). - AddRow("t", "id", "bigint(20)", "")) + WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}). + AddRow("t", "id", "bigint(20)", "", "")) s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID"). WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID", "ID_TYPE"}). AddRow("test", "t", "id", int64(1), "AUTO_INCREMENT")) @@ -356,10 +359,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_x_auto_random(c *C) { s.mockDB.ExpectBegin() s.mockDB.ExpectQuery("SELECT version()"). WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.7")) - s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). + s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). WithArgs("test"). - WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}). - AddRow("t", "id", "bigint(20)", "")) + WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}). + AddRow("t", "id", "bigint(20)", "1 + 2", "")) s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID"). WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID", "ID_TYPE"}). AddRow("test", "t", "id", int64(1), "AUTO_RANDOM")) @@ -382,6 +385,7 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_x_auto_random(c *C) { FieldType: types.FieldType{ Flag: mysql.PriKeyFlag, }, + GeneratedExprString: "1 + 2", }, }, }, @@ -447,35 +451,35 @@ func (s *mysqlSuite) TestWriteRowsErrorDowngrading(c *C) { c.Assert(err, IsNil) row, err := encoder.Encode(logger, []types.Datum{ types.NewIntDatum(1), - }, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "7.csv", 0) + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "7.csv", 0) c.Assert(err, IsNil) row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) row, err = encoder.Encode(logger, []types.Datum{ types.NewIntDatum(2), - }, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "8.csv", 0) + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "8.csv", 0) c.Assert(err, IsNil) row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) row, err = encoder.Encode(logger, []types.Datum{ types.NewIntDatum(3), - }, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "9.csv", 0) + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "9.csv", 0) c.Assert(err, IsNil) row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) row, err = encoder.Encode(logger, []types.Datum{ types.NewIntDatum(4), - }, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "10.csv", 0) + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "10.csv", 0) c.Assert(err, IsNil) row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) row, err = encoder.Encode(logger, []types.Datum{ types.NewIntDatum(5), - }, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "11.csv", 0) + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "11.csv", 0) c.Assert(err, IsNil) row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index 438ec148b4118..76a62373f292e 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -473,6 +473,14 @@ type IgnoreColumns struct { Columns []string `toml:"columns" json:"columns"` } +func (ic *IgnoreColumns) ColumnsMap() map[string]struct{} { + columnMap := make(map[string]struct{}, len(ic.Columns)) + for _, c := range ic.Columns { + columnMap[c] = struct{}{} + } + return columnMap +} + // GetIgnoreColumns gets Ignore config by schema name/regex and table name/regex. func (igCols AllIgnoreColumns) GetIgnoreColumns(db string, table string, caseSensitive bool) (*IgnoreColumns, error) { if !caseSensitive { diff --git a/br/pkg/lightning/lightning_server_serial_test.go b/br/pkg/lightning/lightning_server_serial_test.go index efede7cb9d581..e8a9d86ebbdec 100644 --- a/br/pkg/lightning/lightning_server_serial_test.go +++ b/br/pkg/lightning/lightning_server_serial_test.go @@ -306,6 +306,7 @@ func TestHTTPAPIOutsideServerMode(t *testing.T) { errCh := make(chan error) cfg := config.NewConfig() + cfg.TiDB.DistSQLScanConcurrency = 4 err := cfg.LoadFromGlobal(s.lightning.globalCfg) require.NoError(t, err) go func() { diff --git a/br/pkg/lightning/restore/check_info.go b/br/pkg/lightning/restore/check_info.go index d597b6e2646fb..1ff4a0075c0f8 100644 --- a/br/pkg/lightning/restore/check_info.go +++ b/br/pkg/lightning/restore/check_info.go @@ -629,14 +629,11 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTab return msgs, nil } - igCols := make(map[string]struct{}) igCol, err := rc.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(tableInfo.DB, tableInfo.Name, rc.cfg.Mydumper.CaseSensitive) if err != nil { return nil, errors.Trace(err) } - for _, col := range igCol.Columns { - igCols[col] = struct{}{} - } + igCols := igCol.ColumnsMap() if len(tableInfo.DataFiles) == 0 { log.L().Info("no data files detected", zap.String("db", tableInfo.DB), zap.String("table", tableInfo.Name)) @@ -814,7 +811,7 @@ outloop: case nil: if !initializedColumns { if len(columnPermutation) == 0 { - columnPermutation, err = createColumnPermutation(columnNames, igCols.Columns, tableInfo) + columnPermutation, err = createColumnPermutation(columnNames, igCols.ColumnsMap(), tableInfo) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 0ff3260d9875f..f6c1f554eb7cb 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -1471,7 +1471,7 @@ func (rc *Controller) restoreTables(ctx context.Context) error { if err != nil { return errors.Trace(err) } - tr, err := NewTableRestore(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.Columns) + tr, err := NewTableRestore(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap()) if err != nil { return errors.Trace(err) } @@ -2287,6 +2287,16 @@ func (cr *chunkRestore) encodeLoop( pauser, maxKvPairsCnt := rc.pauser, rc.cfg.TikvImporter.MaxKVPairs initializedColumns, reachEOF := false, false + // filteredColumns is column names that excluded ignored columns + // WARN: this might be not correct when different SQL statements contains different fields, + // but since ColumnPermutation also depends on the hypothesis that the columns in one source file is the same + // so this should be ok. + var filteredColumns []string + ignoreColumns, err1 := rc.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(t.dbInfo.Name, t.tableInfo.Core.Name.O, rc.cfg.Mydumper.CaseSensitive) + if err1 != nil { + err = err1 + return + } for !reachEOF { if err = pauser.Wait(ctx); err != nil { return @@ -2317,6 +2327,26 @@ func (cr *chunkRestore) encodeLoop( return } } + filteredColumns = columnNames + if ignoreColumns != nil && len(ignoreColumns.Columns) > 0 { + filteredColumns = make([]string, 0, len(columnNames)) + ignoreColsMap := ignoreColumns.ColumnsMap() + if len(columnNames) > 0 { + for _, c := range columnNames { + if _, ok := ignoreColsMap[c]; !ok { + filteredColumns = append(filteredColumns, c) + } + } + } else { + // init column names by table schema + // after filtered out some columns, we must explicitly set the columns for TiDB backend + for _, col := range t.tableInfo.Core.Columns { + if _, ok := ignoreColsMap[col.Name.L]; !col.Hidden && !ok { + filteredColumns = append(filteredColumns, col.Name.O) + } + } + } + } initializedColumns = true } case io.EOF: @@ -2350,7 +2380,7 @@ func (cr *chunkRestore) encodeLoop( continue } - kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: columnNames, offset: newOffset, rowID: rowID}) + kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: filteredColumns, offset: newOffset, rowID: rowID}) kvSize += kvs.Size() failpoint.Inject("mock-kv-size", func(val failpoint.Value) { kvSize += uint64(val.(int)) diff --git a/br/pkg/lightning/restore/restore_test.go b/br/pkg/lightning/restore/restore_test.go index c322a1037df39..97de1a1a78d85 100644 --- a/br/pkg/lightning/restore/restore_test.go +++ b/br/pkg/lightning/restore/restore_test.go @@ -889,30 +889,124 @@ func (s *tableRestoreSuite) TestGetColumnsNames(c *C) { func (s *tableRestoreSuite) TestInitializeColumns(c *C) { ccp := &checkpoints.ChunkCheckpoint{} - c.Assert(s.tr.initializeColumns(nil, ccp), IsNil) - c.Assert(ccp.ColumnPermutation, DeepEquals, []int{0, 1, 2, -1}) - ccp.ColumnPermutation = nil - c.Assert(s.tr.initializeColumns([]string{"b", "c", "a"}, ccp), IsNil) - c.Assert(ccp.ColumnPermutation, DeepEquals, []int{2, 0, 1, -1}) + defer func() { + s.tr.ignoreColumns = nil + }() + + cases := []struct { + columns []string + ignoreColumns map[string]struct{} + expectedPermutation []int + errPat string + }{ + { + nil, + nil, + []int{0, 1, 2, -1}, + "", + }, + { + nil, + map[string]struct{}{"b": {}}, + []int{0, -1, 2, -1}, + "", + }, + { + []string{"b", "c", "a"}, + nil, + []int{2, 0, 1, -1}, + "", + }, + { + []string{"b", "c", "a"}, + map[string]struct{}{"b": {}}, + []int{2, -1, 1, -1}, + "", + }, + { + []string{"b"}, + nil, + []int{-1, 0, -1, -1}, + "", + }, + { + []string{"_tidb_rowid", "b", "a", "c"}, + nil, + []int{2, 1, 3, 0}, + "", + }, + { + []string{"_tidb_rowid", "b", "a", "c"}, + map[string]struct{}{"b": {}, "_tidb_rowid": {}}, + []int{2, -1, 3, -1}, + "", + }, + { + []string{"_tidb_rowid", "b", "a", "c", "d"}, + nil, + nil, + `unknown columns in header \[d\]`, + }, + { + []string{"e", "b", "c", "d"}, + nil, + nil, + `unknown columns in header \[e d\]`, + }, + } + + for _, testCase := range cases { + ccp.ColumnPermutation = nil + s.tr.ignoreColumns = testCase.ignoreColumns + err := s.tr.initializeColumns(testCase.columns, ccp) + if len(testCase.errPat) > 0 { + c.Assert(err, NotNil) + c.Assert(err, ErrorMatches, testCase.errPat) + } else { + c.Assert(ccp.ColumnPermutation, DeepEquals, testCase.expectedPermutation) + } + } +} - ccp.ColumnPermutation = nil - c.Assert(s.tr.initializeColumns([]string{"b"}, ccp), IsNil) - c.Assert(ccp.ColumnPermutation, DeepEquals, []int{-1, 0, -1, -1}) +func (s *tableRestoreSuite) TestInitializeColumnsGenerated(c *C) { + p := parser.New() + p.SetSQLMode(mysql.ModeANSIQuotes) + se := tmock.NewContext() - ccp.ColumnPermutation = nil - c.Assert(s.tr.initializeColumns([]string{"_tidb_rowid", "b", "a", "c"}, ccp), IsNil) - c.Assert(ccp.ColumnPermutation, DeepEquals, []int{2, 1, 3, 0}) + cases := []struct { + schema string + columns []string + expectedPermutation []int + }{ + { + "CREATE TABLE `table` (a INT, b INT, C INT, d INT AS (a * 2))", + []string{"b", "c", "a"}, + []int{2, 0, 1, -1, -1}, + }, + // all generated columns and none input columns + { + "CREATE TABLE `table` (a bigint as (1 + 2) stored, b text as (sha1(repeat('x', a))) stored)", + []string{}, + []int{-1, -1, -1}, + }, + } - ccp.ColumnPermutation = nil - err := s.tr.initializeColumns([]string{"_tidb_rowid", "b", "a", "c", "d"}, ccp) - c.Assert(err, NotNil) - c.Assert(err, ErrorMatches, `unknown columns in header \[d\]`) + for _, testCase := range cases { + node, err := p.ParseOneStmt(testCase.schema, "", "") + c.Assert(err, IsNil) + core, err := ddl.MockTableInfo(se, node.(*ast.CreateTableStmt), 0xabcdef) + c.Assert(err, IsNil) + core.State = model.StatePublic + tableInfo := &checkpoints.TidbTableInfo{Name: "table", DB: "db", Core: core} + s.tr, err = NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil) + c.Assert(err, IsNil) + ccp := &checkpoints.ChunkCheckpoint{} - ccp.ColumnPermutation = nil - err = s.tr.initializeColumns([]string{"e", "b", "c", "d"}, ccp) - c.Assert(err, NotNil) - c.Assert(err, ErrorMatches, `unknown columns in header \[e d\]`) + err = s.tr.initializeColumns(testCase.columns, ccp) + c.Assert(err, IsNil) + c.Assert(ccp.ColumnPermutation, DeepEquals, testCase.expectedPermutation) + } } func (s *tableRestoreSuite) TestCompareChecksumSuccess(c *C) { @@ -1378,6 +1472,7 @@ func (s *chunkRestoreSuite) TestEncodeLoop(c *C) { c.Assert(kvs, HasLen, 1) c.Assert(kvs[0].rowID, Equals, int64(19)) c.Assert(kvs[0].offset, Equals, int64(36)) + c.Assert(kvs[0].columns, DeepEquals, []string(nil)) kvs = <-kvsCh c.Assert(len(kvs), Equals, 0) @@ -1450,6 +1545,7 @@ func (s *chunkRestoreSuite) TestEncodeLoopDeliverLimit(c *C) { rc := &Controller{pauser: DeliverPauser, cfg: cfg} c.Assert(failpoint.Enable( "github.com/pingcap/tidb/br/pkg/lightning/restore/mock-kv-size", "return(110000000)"), IsNil) + defer failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/restore/mock-kv-size") _, _, err = s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, rc) c.Assert(err, IsNil) @@ -1499,7 +1595,7 @@ func (s *chunkRestoreSuite) TestEncodeLoopDeliverErrored(c *C) { func (s *chunkRestoreSuite) TestEncodeLoopColumnsMismatch(c *C) { dir := c.MkDir() fileName := "db.table.000.csv" - err := os.WriteFile(filepath.Join(dir, fileName), []byte("1,2,3,4\r\n4,5,6,7\r\n"), 0o644) + err := os.WriteFile(filepath.Join(dir, fileName), []byte("1,2\r\n4,5,6,7\r\n"), 0o644) c.Assert(err, IsNil) store, err := storage.NewLocalStorage(dir) @@ -1529,12 +1625,117 @@ func (s *chunkRestoreSuite) TestEncodeLoopColumnsMismatch(c *C) { Timestamp: 1234567895, }) c.Assert(err, IsNil) + defer kvEncoder.Close() _, _, err = s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, rc) - c.Assert(err, ErrorMatches, "in file db.table.2.sql:0 at offset 8: column count mismatch, expected 3, got 4") + c.Assert(err, ErrorMatches, "in file db.table.2.sql:0 at offset 4: column count mismatch, expected 3, got 2") c.Assert(kvsCh, HasLen, 0) } +func (s *chunkRestoreSuite) TestEncodeLoopIgnoreColumnsCSV(c *C) { + log.InitLogger(&log.Config{}, "error") + cases := []struct { + s string + ignoreColumns []*config.IgnoreColumns + kvs deliveredKVs + header bool + }{ + { + "1,2,3\r\n4,5,6\r\n", + []*config.IgnoreColumns{ + { + DB: "db", + Table: "table", + Columns: []string{"a"}, + }, + }, + deliveredKVs{ + rowID: 1, + offset: 6, + columns: []string{"b", "c"}, + }, + false, + }, + { + "b,c\r\n2,3\r\n5,6\r\n", + []*config.IgnoreColumns{ + { + TableFilter: []string{"db*.tab*"}, + Columns: []string{"b"}, + }, + }, + deliveredKVs{ + rowID: 1, + offset: 9, + columns: []string{"c"}, + }, + true, + }, + } + + for _, cs := range cases { + // reset test + s.SetUpTest(c) + s.testEncodeLoopIgnoreColumnsCSV(c, cs.s, cs.ignoreColumns, cs.kvs, cs.header) + } +} + +func (s *chunkRestoreSuite) testEncodeLoopIgnoreColumnsCSV( + c *C, + f string, + ignoreColumns []*config.IgnoreColumns, + deliverKV deliveredKVs, + header bool, +) { + dir := c.MkDir() + fileName := "db.table.000.csv" + err := os.WriteFile(filepath.Join(dir, fileName), []byte(f), 0o644) + c.Assert(err, IsNil) + + store, err := storage.NewLocalStorage(dir) + c.Assert(err, IsNil) + + ctx := context.Background() + cfg := config.NewConfig() + cfg.Mydumper.IgnoreColumns = ignoreColumns + cfg.Mydumper.CSV.Header = header + rc := &Controller{pauser: DeliverPauser, cfg: cfg} + + reader, err := store.Open(ctx, fileName) + c.Assert(err, IsNil) + w := worker.NewPool(ctx, 5, "io") + p, err := mydump.NewCSVParser(&cfg.Mydumper.CSV, reader, 111, w, cfg.Mydumper.CSV.Header, nil) + c.Assert(err, IsNil) + + err = s.cr.parser.Close() + c.Assert(err, IsNil) + s.cr.parser = p + + kvsCh := make(chan []deliveredKVs, 2) + deliverCompleteCh := make(chan deliverResult) + kvEncoder, err := tidb.NewTiDBBackend(nil, config.ReplaceOnDup, errormanager.New(nil, config.NewConfig())).NewEncoder( + s.tr.encTable, + &kv.SessionOptions{ + SQLMode: s.cfg.TiDB.SQLMode, + Timestamp: 1234567895, + }) + c.Assert(err, IsNil) + defer kvEncoder.Close() + + _, _, err = s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, rc) + c.Assert(err, IsNil) + c.Assert(kvsCh, HasLen, 2) + + kvs := <-kvsCh + c.Assert(kvs, HasLen, 2) + c.Assert(kvs[0].rowID, Equals, deliverKV.rowID) + c.Assert(kvs[0].offset, Equals, deliverKV.offset) + c.Assert(kvs[0].columns, DeepEquals, deliverKV.columns) + + kvs = <-kvsCh + c.Assert(len(kvs), Equals, 0) +} + func (s *chunkRestoreSuite) TestRestore(c *C) { ctx := context.Background() diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 37b842187d42d..24f7b003894a4 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -53,7 +53,7 @@ type TableRestore struct { alloc autoid.Allocators logger log.Logger - ignoreColumns []string + ignoreColumns map[string]struct{} } func NewTableRestore( @@ -62,7 +62,7 @@ func NewTableRestore( dbInfo *checkpoints.TidbDBInfo, tableInfo *checkpoints.TidbTableInfo, cp *checkpoints.TableCheckpoint, - ignoreColumns []string, + ignoreColumns map[string]struct{}, ) (*TableRestore, error) { idAlloc := kv.NewPanickingAllocators(cp.AllocBase) tbl, err := tables.TableFromMeta(idAlloc, tableInfo.Core) @@ -167,15 +167,21 @@ func (tr *TableRestore) initializeColumns(columns []string, ccp *checkpoints.Chu return nil } -func createColumnPermutation(columns []string, ignoreColumns []string, tableInfo *model.TableInfo) ([]int, error) { +func createColumnPermutation(columns []string, ignoreColumns map[string]struct{}, tableInfo *model.TableInfo) ([]int, error) { var colPerm []int if len(columns) == 0 { colPerm = make([]int, 0, len(tableInfo.Columns)+1) shouldIncludeRowID := common.TableHasAutoRowID(tableInfo) // no provided columns, so use identity permutation. - for i := range tableInfo.Columns { - colPerm = append(colPerm, i) + for i, col := range tableInfo.Columns { + idx := i + if _, ok := ignoreColumns[col.Name.L]; ok { + idx = -1 + } else if col.IsGenerated() { + idx = -1 + } + colPerm = append(colPerm, idx) } if shouldIncludeRowID { colPerm = append(colPerm, -1) @@ -834,7 +840,7 @@ func (tr *TableRestore) postProcess( return true, nil } -func parseColumnPermutations(tableInfo *model.TableInfo, columns []string, ignoreColumns []string) ([]int, error) { +func parseColumnPermutations(tableInfo *model.TableInfo, columns []string, ignoreColumns map[string]struct{}) ([]int, error) { colPerm := make([]int, 0, len(tableInfo.Columns)+1) columnMap := make(map[string]int) @@ -842,13 +848,6 @@ func parseColumnPermutations(tableInfo *model.TableInfo, columns []string, ignor columnMap[column] = i } - ignoreMap := make(map[string]int) - for _, column := range ignoreColumns { - if i, ok := columnMap[column]; ok { - ignoreMap[column] = i - } - } - tableColumnMap := make(map[string]int) for i, col := range tableInfo.Columns { tableColumnMap[col.Name.L] = i @@ -858,7 +857,7 @@ func parseColumnPermutations(tableInfo *model.TableInfo, columns []string, ignor var unknownCols []string for _, c := range columns { if _, ok := tableColumnMap[c]; !ok && c != model.ExtraHandleName.L { - if _, ignore := ignoreMap[c]; !ignore { + if _, ignore := ignoreColumns[c]; !ignore { unknownCols = append(unknownCols, c) } } @@ -870,7 +869,7 @@ func parseColumnPermutations(tableInfo *model.TableInfo, columns []string, ignor for _, colInfo := range tableInfo.Columns { if i, ok := columnMap[colInfo.Name.L]; ok { - if _, ignore := ignoreMap[colInfo.Name.L]; !ignore { + if _, ignore := ignoreColumns[colInfo.Name.L]; !ignore { colPerm = append(colPerm, i) } else { log.L().Debug("column ignored by user requirements", @@ -891,11 +890,16 @@ func parseColumnPermutations(tableInfo *model.TableInfo, columns []string, ignor colPerm = append(colPerm, -1) } } + // append _tidb_rowid column + rowIDIdx := -1 if i, ok := columnMap[model.ExtraHandleName.L]; ok { - colPerm = append(colPerm, i) - } else if common.TableHasAutoRowID(tableInfo) { - colPerm = append(colPerm, -1) + if _, ignored := ignoreColumns[model.ExtraHandleName.L]; !ignored { + rowIDIdx = i + } } + // FIXME: the schema info for tidb backend is not complete, so always add the _tidb_rowid field. + // Other logic should ignore this extra field if not needed. + colPerm = append(colPerm, rowIDIdx) return colPerm, nil } diff --git a/br/tests/lightning_distributed_import/run.sh b/br/tests/lightning_distributed_import/run.sh index f640ec3159c75..d21bf356b1568 100644 --- a/br/tests/lightning_distributed_import/run.sh +++ b/br/tests/lightning_distributed_import/run.sh @@ -20,7 +20,7 @@ LOG_FILE1="$TEST_DIR/lightning-distributed-import1.log" LOG_FILE2="$TEST_DIR/lightning-distributed-import2.log" # let lightning run a bit slow to avoid some table in the first lightning finish too fast. -export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/restore/SlowDownImport=sleep(50)" +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/restore/SlowDownImport=sleep(250)" run_lightning --backend local --sorted-kv-dir "$TEST_DIR/lightning_distributed_import.sorted1" \ -d "tests/$TEST_NAME/data1" --log-file "$LOG_FILE1" --config "tests/$TEST_NAME/config.toml" & diff --git a/br/tests/lightning_duplicate_detection/run.sh b/br/tests/lightning_duplicate_detection/run.sh index cfbc95d6ef4e7..6aace9b8ae5e4 100644 --- a/br/tests/lightning_duplicate_detection/run.sh +++ b/br/tests/lightning_duplicate_detection/run.sh @@ -22,7 +22,7 @@ LOG_FILE1="$TEST_DIR/lightning-duplicate-detection1.log" LOG_FILE2="$TEST_DIR/lightning-duplicate-detection2.log" # let lightning run a bit slow to avoid some table in the first lightning finish too fast. -export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/restore/SlowDownImport=sleep(50)" +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/restore/SlowDownImport=sleep(250)" run_lightning --backend local --sorted-kv-dir "$TEST_DIR/lightning_duplicate_detection.sorted1" \ --enable-checkpoint=1 --log-file "$LOG_FILE1" --config "tests/$TEST_NAME/config1.toml" & diff --git a/br/tests/lightning_tidb_rowid/data/rowid.explicit_tidb_rowid-schema.sql b/br/tests/lightning_tidb_rowid/data/rowid.explicit_tidb_rowid-schema.sql index 4f1d634485cca..55232f2ff6081 100644 --- a/br/tests/lightning_tidb_rowid/data/rowid.explicit_tidb_rowid-schema.sql +++ b/br/tests/lightning_tidb_rowid/data/rowid.explicit_tidb_rowid-schema.sql @@ -1 +1 @@ -create table explicit_tidb_rowid (pk varchar(6) primary key); \ No newline at end of file +create table explicit_tidb_rowid (pk varchar(6) primary key /*T![clustered_index] NONCLUSTERED */); diff --git a/br/tests/lightning_tidb_rowid/data/rowid.specific_auto_inc-schema.sql b/br/tests/lightning_tidb_rowid/data/rowid.specific_auto_inc-schema.sql index f6962e15a0072..a69f5bf4350eb 100644 --- a/br/tests/lightning_tidb_rowid/data/rowid.specific_auto_inc-schema.sql +++ b/br/tests/lightning_tidb_rowid/data/rowid.specific_auto_inc-schema.sql @@ -1 +1 @@ -create table specific_auto_inc (a varchar(6) primary key, b int unique auto_increment) auto_increment=80000; \ No newline at end of file +create table specific_auto_inc (a varchar(6) primary key /*T![clustered_index] NONCLUSTERED */, b int unique auto_increment) auto_increment=80000; diff --git a/br/tests/run.sh b/br/tests/run.sh index 140491caddc90..bbf17deb3e715 100755 --- a/br/tests/run.sh +++ b/br/tests/run.sh @@ -28,7 +28,7 @@ SELECTED_TEST_NAME="${TEST_NAME-$(find tests -mindepth 2 -maxdepth 2 -name run.s source tests/_utils/run_services trap stop_services EXIT -start_services +start_services $@ # Intermediate file needed because read can be used as a pipe target. # https://stackoverflow.com/q/2746553/