Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: let ignore columns be compatible with tidb backend (#27850) #30064

Merged
merged 9 commits into from
Feb 22, 2022
30 changes: 22 additions & 8 deletions br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ type tidbEncoder struct {
// the index of table columns for each data field.
// index == len(table.columns) means this field is `_tidb_rowid`
columnIdx []int
// the max index used in this chunk, due to the ignore-columns config, we can't
// directly check the total column count, so we fall back to only check that
// the there are enough columns.
columnCnt int
}

Expand Down Expand Up @@ -284,22 +287,27 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co
cols := enc.tbl.Cols()

if len(enc.columnIdx) == 0 {
columnCount := 0
columnMaxIdx := -1
columnIdx := make([]int, len(columnPermutation))
for i := 0; i < len(columnPermutation); i++ {
columnIdx[i] = -1
}
for i, idx := range columnPermutation {
if idx >= 0 {
columnIdx[idx] = i
columnCount++
if idx > columnMaxIdx {
columnMaxIdx = idx
}
}
}
enc.columnIdx = columnIdx
enc.columnCnt = columnCount
enc.columnCnt = columnMaxIdx + 1
}

// TODO: since the column count doesn't exactly reflect the real column names, we only check the upper bound currently.
// See: tests/generated_columns/data/gencol.various_types.0.sql this sql has no columns, so encodeLoop will fill the
// column permutation with default, thus enc.columnCnt > len(row).
if len(row) > enc.columnCnt {
if len(row) < enc.columnCnt {
logger.Error("column count mismatch", zap.Ints("column_permutation", columnPermutation),
zap.Array("data", kv.RowArrayMarshaler(row)))
return emptyTiDBRow, errors.Errorf("column count mismatch, expected %d, got %d", enc.columnCnt, len(row))
Expand All @@ -308,8 +316,12 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co
var encoded strings.Builder
encoded.Grow(8 * len(row))
encoded.WriteByte('(')
cnt := 0
for i, field := range row {
if i != 0 {
if enc.columnIdx[i] < 0 {
continue
}
if cnt > 0 {
encoded.WriteByte(',')
}
datum := field
Expand All @@ -321,6 +333,7 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co
)
return nil, err
}
cnt++
}
encoded.WriteByte(')')
return tidbRow{
Expand Down Expand Up @@ -569,7 +582,7 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st
serverInfo := version.ParseServerInfo(versionStr)

rows, e := tx.Query(`
SELECT table_name, column_name, column_type, extra
SELECT table_name, column_name, column_type, generation_expression, extra
FROM information_schema.columns
WHERE table_schema = ?
ORDER BY table_name, ordinal_position;
Expand All @@ -585,8 +598,8 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st
curTable *model.TableInfo
)
for rows.Next() {
var tableName, columnName, columnType, columnExtra string
if e := rows.Scan(&tableName, &columnName, &columnType, &columnExtra); e != nil {
var tableName, columnName, columnType, generationExpr, columnExtra string
if e := rows.Scan(&tableName, &columnName, &columnType, &generationExpr, &columnExtra); e != nil {
return e
}
if tableName != curTableName {
Expand Down Expand Up @@ -615,6 +628,7 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st
FieldType: types.FieldType{
Flag: flag,
},
GeneratedExprString: generationExpr,
})
curColOffset++
}
Expand Down
71 changes: 58 additions & 13 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (s *mysqlSuite) TearDownTest(c *C) {

func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) {
s.mockDB.
ExpectExec("\\QREPLACE INTO `foo`.`bar`(`a`,`b`,`c`,`d`,`e`,`f`,`g`,`h`,`i`,`j`,`k`,`l`,`m`,`n`,`o`) VALUES(18446744073709551615,-9223372036854775808,0,NULL,7.5,5e-324,1.7976931348623157e+308,0,'甲乙丙\\r\\n\\0\\Z''\"\\\\`',x'000000abcdef',2557891634,'12.5',51)\\E").
ExpectExec("\\QREPLACE INTO `foo`.`bar`(`b`,`d`,`e`,`f`,`g`,`h`,`i`,`j`,`k`,`l`,`m`,`n`,`o`) VALUES(-9223372036854775808,NULL,7.5,5e-324,1.7976931348623157e+308,0,'甲乙丙\\r\\n\\0\\Z''\"\\\\`',x'000000abcdef',2557891634,'12.5',51)\\E").
WillReturnResult(sqlmock.NewResult(1, 1))

ctx := context.Background()
Expand All @@ -102,6 +102,9 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) {
perms = append(perms, i)
}
perms = append(perms, -1)
// skip column a,c due to ignore-columns
perms[0] = -1
perms[2] = -1
encoder, err := s.backend.NewEncoder(s.tbl, &kv.SessionOptions{SQLMode: 0, Timestamp: 1234567890})
c.Assert(err, IsNil)
row, err := encoder.Encode(logger, []types.Datum{
Expand All @@ -125,9 +128,15 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) {
row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

writer, err := engine.LocalWriter(ctx, nil)
<<<<<<< HEAD
c.Assert(err, IsNil)
err = writer.WriteRows(ctx, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o"}, dataRows)
c.Assert(err, IsNil)
=======
require.NoError(t, err)
err = writer.WriteRows(ctx, []string{"b", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o"}, dataRows)
require.NoError(t, err)
>>>>>>> c68791566... lightning: let ignore columns be compatible with tidb backend (#27850)
st, err := writer.Close(ctx)
c.Assert(err, IsNil)
c.Assert(st, IsNil)
Expand All @@ -154,8 +163,13 @@ func (s *mysqlSuite) TestWriteRowsIgnoreOnDup(c *C) {
c.Assert(err, IsNil)
row, err := encoder.Encode(logger, []types.Datum{
types.NewIntDatum(1),
<<<<<<< HEAD
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "1.csv", 0)
c.Assert(err, IsNil)
=======
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "1.csv", 0)
require.NoError(t, err)
>>>>>>> c68791566... lightning: let ignore columns be compatible with tidb backend (#27850)
kennytm marked this conversation as resolved.
Show resolved Hide resolved
row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

writer, err := engine.LocalWriter(ctx, nil)
Expand Down Expand Up @@ -198,8 +212,13 @@ func (s *mysqlSuite) TestWriteRowsErrorOnDup(c *C) {
c.Assert(err, IsNil)
row, err := encoder.Encode(logger, []types.Datum{
types.NewIntDatum(1),
<<<<<<< HEAD
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "3.csv", 0)
c.Assert(err, IsNil)
=======
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "3.csv", 0)
require.NoError(t, err)
>>>>>>> c68791566... lightning: let ignore columns be compatible with tidb backend (#27850)
kennytm marked this conversation as resolved.
Show resolved Hide resolved

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

Expand Down Expand Up @@ -254,10 +273,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_3_x(c *C) {
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT version()").
WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v3.0.18"))
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
WithArgs("test").
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}).
AddRow("t", "id", "int(10)", "auto_increment"))
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}).
AddRow("t", "id", "int(10)", "", "auto_increment"))
s.mockDB.ExpectCommit()

bk := tidb.NewTiDBBackend(s.dbHandle, config.ErrorOnDup, errormanager.New(nil, config.NewConfig()))
Expand Down Expand Up @@ -286,10 +305,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_0(c *C) {
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT version()").
WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.0"))
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
WithArgs("test").
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}).
AddRow("t", "id", "bigint(20) unsigned", "auto_increment"))
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}).
AddRow("t", "id", "bigint(20) unsigned", "", "auto_increment"))
s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID").
WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID"}).
AddRow("test", "t", "id", int64(1)))
Expand Down Expand Up @@ -321,10 +340,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_x_auto_increment(c *C) {
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT version()").
WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.7"))
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
WithArgs("test").
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}).
AddRow("t", "id", "bigint(20)", ""))
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}).
AddRow("t", "id", "bigint(20)", "", ""))
s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID").
WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID", "ID_TYPE"}).
AddRow("test", "t", "id", int64(1), "AUTO_INCREMENT"))
Expand Down Expand Up @@ -356,10 +375,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_x_auto_random(c *C) {
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT version()").
WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.7"))
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
WithArgs("test").
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}).
AddRow("t", "id", "bigint(20)", ""))
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}).
AddRow("t", "id", "bigint(20)", "1 + 2", ""))
s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID").
WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID", "ID_TYPE"}).
AddRow("test", "t", "id", int64(1), "AUTO_RANDOM"))
Expand All @@ -382,6 +401,7 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_x_auto_random(c *C) {
FieldType: types.FieldType{
Flag: mysql.PriKeyFlag,
},
GeneratedExprString: "1 + 2",
},
},
},
Expand Down Expand Up @@ -447,36 +467,61 @@ func (s *mysqlSuite) TestWriteRowsErrorDowngrading(c *C) {
c.Assert(err, IsNil)
row, err := encoder.Encode(logger, []types.Datum{
types.NewIntDatum(1),
<<<<<<< HEAD
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "7.csv", 0)
c.Assert(err, IsNil)
=======
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "7.csv", 0)
require.NoError(t, err)
>>>>>>> c68791566... lightning: let ignore columns be compatible with tidb backend (#27850)
kennytm marked this conversation as resolved.
Show resolved Hide resolved

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

row, err = encoder.Encode(logger, []types.Datum{
types.NewIntDatum(2),
<<<<<<< HEAD
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "8.csv", 0)
c.Assert(err, IsNil)
=======
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "8.csv", 0)
require.NoError(t, err)
>>>>>>> c68791566... lightning: let ignore columns be compatible with tidb backend (#27850)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

row, err = encoder.Encode(logger, []types.Datum{
types.NewIntDatum(3),
<<<<<<< HEAD
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "9.csv", 0)
c.Assert(err, IsNil)
=======
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "9.csv", 0)
require.NoError(t, err)
>>>>>>> c68791566... lightning: let ignore columns be compatible with tidb backend (#27850)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

row, err = encoder.Encode(logger, []types.Datum{
types.NewIntDatum(4),
<<<<<<< HEAD
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "10.csv", 0)
c.Assert(err, IsNil)
=======
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "10.csv", 0)
require.NoError(t, err)
>>>>>>> c68791566... lightning: let ignore columns be compatible with tidb backend (#27850)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

row, err = encoder.Encode(logger, []types.Datum{
types.NewIntDatum(5),
<<<<<<< HEAD
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "11.csv", 0)
c.Assert(err, IsNil)
=======
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "11.csv", 0)
require.NoError(t, err)
>>>>>>> c68791566... lightning: let ignore columns be compatible with tidb backend (#27850)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

Expand Down
8 changes: 8 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,14 @@ type IgnoreColumns struct {
Columns []string `toml:"columns" json:"columns"`
}

func (ic *IgnoreColumns) ColumnsMap() map[string]struct{} {
columnMap := make(map[string]struct{}, len(ic.Columns))
for _, c := range ic.Columns {
columnMap[c] = struct{}{}
}
return columnMap
}

// GetIgnoreColumns gets Ignore config by schema name/regex and table name/regex.
func (igCols AllIgnoreColumns) GetIgnoreColumns(db string, table string, caseSensitive bool) (*IgnoreColumns, error) {
if !caseSensitive {
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/lightning_server_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ func TestHTTPAPIOutsideServerMode(t *testing.T) {

errCh := make(chan error)
cfg := config.NewConfig()
cfg.TiDB.DistSQLScanConcurrency = 4
err := cfg.LoadFromGlobal(s.lightning.globalCfg)
require.NoError(t, err)
go func() {
Expand Down
7 changes: 2 additions & 5 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,14 +631,11 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTab
return msgs, nil
}

igCols := make(map[string]struct{})
igCol, err := rc.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(tableInfo.DB, tableInfo.Name, rc.cfg.Mydumper.CaseSensitive)
if err != nil {
return nil, errors.Trace(err)
}
for _, col := range igCol.Columns {
igCols[col] = struct{}{}
}
igCols := igCol.ColumnsMap()

if len(tableInfo.DataFiles) == 0 {
log.L().Info("no data files detected", zap.String("db", tableInfo.DB), zap.String("table", tableInfo.Name))
Expand Down Expand Up @@ -816,7 +813,7 @@ outloop:
case nil:
if !initializedColumns {
if len(columnPermutation) == 0 {
columnPermutation, err = createColumnPermutation(columnNames, igCols.Columns, tableInfo)
columnPermutation, err = createColumnPermutation(columnNames, igCols.ColumnsMap(), tableInfo)
if err != nil {
return errors.Trace(err)
}
Expand Down
34 changes: 32 additions & 2 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1490,7 +1490,7 @@ func (rc *Controller) restoreTables(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
tr, err := NewTableRestore(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.Columns)
tr, err := NewTableRestore(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap())
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -2301,6 +2301,16 @@ func (cr *chunkRestore) encodeLoop(

pauser, maxKvPairsCnt := rc.pauser, rc.cfg.TikvImporter.MaxKVPairs
initializedColumns, reachEOF := false, false
// filteredColumns is column names that excluded ignored columns
// WARN: this might be not correct when different SQL statements contains different fields,
// but since ColumnPermutation also depends on the hypothesis that the columns in one source file is the same
// so this should be ok.
var filteredColumns []string
ignoreColumns, err1 := rc.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(t.dbInfo.Name, t.tableInfo.Core.Name.O, rc.cfg.Mydumper.CaseSensitive)
if err1 != nil {
err = err1
return
}
for !reachEOF {
if err = pauser.Wait(ctx); err != nil {
return
Expand Down Expand Up @@ -2331,6 +2341,26 @@ func (cr *chunkRestore) encodeLoop(
return
}
}
filteredColumns = columnNames
if ignoreColumns != nil && len(ignoreColumns.Columns) > 0 {
filteredColumns = make([]string, 0, len(columnNames))
ignoreColsMap := ignoreColumns.ColumnsMap()
if len(columnNames) > 0 {
for _, c := range columnNames {
if _, ok := ignoreColsMap[c]; !ok {
filteredColumns = append(filteredColumns, c)
}
}
} else {
// init column names by table schema
// after filtered out some columns, we must explicitly set the columns for TiDB backend
for _, col := range t.tableInfo.Core.Columns {
if _, ok := ignoreColsMap[col.Name.L]; !col.Hidden && !ok {
filteredColumns = append(filteredColumns, col.Name.O)
}
}
}
}
initializedColumns = true
}
case io.EOF:
Expand Down Expand Up @@ -2364,7 +2394,7 @@ func (cr *chunkRestore) encodeLoop(
continue
}

kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: columnNames, offset: newOffset, rowID: rowID})
kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: filteredColumns, offset: newOffset, rowID: rowID})
kvSize += kvs.Size()
failpoint.Inject("mock-kv-size", func(val failpoint.Value) {
kvSize += uint64(val.(int))
Expand Down
Loading