Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
fix issue related to '_tidb_rowid' and move column count to tidb encoder
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv committed Dec 24, 2020
1 parent 72de282 commit e104101
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 71 deletions.
36 changes: 32 additions & 4 deletions lightning/backend/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ import (
"github.com/pingcap/tidb-lightning/lightning/verification"
)

var (
extraHandleTableColumn = &table.Column{
ColumnInfo: extraHandleColumnInfo,
GeneratedExpr: nil,
DefaultExpr: nil,
}
)

type tidbRow string

type tidbRows []tidbRow
Expand All @@ -51,10 +59,13 @@ func (row tidbRows) MarshalLogArray(encoder zapcore.ArrayEncoder) error {
}

type tidbEncoder struct {
mode mysql.SQLMode
tbl table.Table
se *session
mode mysql.SQLMode
tbl table.Table
se *session
// the index of table columns for each data field.
// index == len(table.columns) means this field is `_tidb_rowid`
columnIdx []int
columnCnt int
}

type tidbBackend struct {
Expand Down Expand Up @@ -228,17 +239,34 @@ func (enc *tidbEncoder) appendSQL(sb *strings.Builder, datum *types.Datum, col *

func (*tidbEncoder) Close() {}

func getColumnByIndex(cols []*table.Column, index int) *table.Column {
if index == len(cols) {
return extraHandleTableColumn
} else {
return cols[index]
}
}

func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, columnPermutation []int) (Row, error) {
cols := enc.tbl.Cols()

if len(enc.columnIdx) == 0 {
columnCount := 0
columnIdx := make([]int, len(columnPermutation))
for i, idx := range columnPermutation {
if idx >= 0 {
columnIdx[idx] = i
columnCount++
}
}
enc.columnIdx = columnIdx
enc.columnCnt = columnCount
}

if len(row) != enc.columnCnt {
log.L().Error("column count mismatch", zap.Ints("column_permutation", columnPermutation),
zap.Array("data", rowArrayMarshaler(row)))
return nil, errors.Errorf("column count mismatch, expected %d, got %d", enc.columnCnt, len(row))
}

var encoded strings.Builder
Expand All @@ -248,7 +276,7 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co
if i != 0 {
encoded.WriteByte(',')
}
if err := enc.appendSQL(&encoded, &field, cols[enc.columnIdx[i]]); err != nil {
if err := enc.appendSQL(&encoded, &field, getColumnByIndex(cols, enc.columnIdx[i])); err != nil {
logger.Error("tidb encode failed",
zap.Array("original", rowArrayMarshaler(row)),
zap.Int("originalCol", i),
Expand Down
20 changes: 16 additions & 4 deletions lightning/backend/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,21 @@ func (s *mysqlSuite) TestWriteRowsIgnoreOnDup(c *C) {
c.Assert(err, IsNil)
row, err := encoder.Encode(logger, []types.Datum{
types.NewIntDatum(1),
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1})
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1})
c.Assert(err, IsNil)
row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

err = engine.WriteRows(ctx, []string{"a"}, dataRows)
c.Assert(err, IsNil)

// test encode rows with _tidb_rowid
encoder, err = ignoreBackend.NewEncoder(s.tbl, &kv.SessionOptions{})
c.Assert(err, IsNil)
row, err = encoder.Encode(logger, []types.Datum{
types.NewIntDatum(1),
types.NewIntDatum(1), // _tidb_rowid field
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1})
c.Assert(err, IsNil)
}

func (s *mysqlSuite) TestWriteRowsErrorOnDup(c *C) {
Expand All @@ -165,7 +174,7 @@ func (s *mysqlSuite) TestWriteRowsErrorOnDup(c *C) {
c.Assert(err, IsNil)
row, err := encoder.Encode(logger, []types.Datum{
types.NewIntDatum(1),
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1})
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1})
c.Assert(err, IsNil)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)
Expand All @@ -192,14 +201,17 @@ func (s *mysqlSuite) TestStrictMode(c *C) {
logger := log.L()
_, err = encoder.Encode(logger, []types.Datum{
types.NewStringDatum("test"),
}, 1, []int{0, 1, -1})
}, 1, []int{0, -1, -1})
c.Assert(err, IsNil)

_, err = encoder.Encode(logger, []types.Datum{
types.NewStringDatum("\xff\xff\xff\xff"),
}, 1, []int{0, 1, -1})
}, 1, []int{0, -1, -1})
c.Assert(err, ErrorMatches, `.*incorrect utf8 value .* for column s0`)

// oepn a new encode because column count changed.
encoder, err = bk.NewEncoder(tbl, &kv.SessionOptions{SQLMode: mysql.ModeStrictAllTables})
c.Assert(err, IsNil)
_, err = encoder.Encode(logger, []types.Datum{
types.NewStringDatum(""),
types.NewStringDatum("非 ASCII 字符串"),
Expand Down
21 changes: 6 additions & 15 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1675,7 +1675,12 @@ func getColumnNames(tableInfo *model.TableInfo, permutation []int) []string {
for _, idx := range colIndexes {
// skip columns with index -1
if idx >= 0 {
names = append(names, tableInfo.Columns[idx].Name.O)
// original fiels contains _tidb_rowid field
if idx == len(tableInfo.Columns) {
names = append(names, model.ExtraHandleName.O)
} else {
names = append(names, tableInfo.Columns[idx].Name.O)
}
}
}
return names
Expand Down Expand Up @@ -1903,7 +1908,6 @@ func (cr *chunkRestore) encodeLoop(

pauser, maxKvPairsCnt := rc.pauser, rc.cfg.TikvImporter.MaxKVPairs
initializedColumns, reachEOF := false, false
columnCnt := len(t.tableInfo.Core.Columns)
for !reachEOF {
if err = pauser.Wait(ctx); err != nil {
return
Expand Down Expand Up @@ -1932,13 +1936,6 @@ func (cr *chunkRestore) encodeLoop(
}
}
initializedColumns = true
// check the private column '_tidb_row_id',
for _, c := range columnNames {
if strings.ToLower(c) == model.ExtraHandleName.L {
columnCnt++
break
}
}
}
case io.EOF:
reachEOF = true
Expand All @@ -1950,12 +1947,6 @@ func (cr *chunkRestore) encodeLoop(
readDur += time.Since(readDurStart)
encodeDurStart := time.Now()
lastRow := cr.parser.LastRow()
if columnCnt < len(lastRow.Row) {
logger.Error("row fields is more than table fields", zap.Int("tableFields", columnCnt),
zap.Int("rowFields", len(lastRow.Row)), zap.Int64("position", newOffset), zap.Array("row", lastRow))
err = errors.Errorf("row field count %d is bigger than table fields count %d", len(lastRow.Row), columnCnt)
return
}
// sql -> kv
kvs, encodeErr := kvEncoder.Encode(logger, lastRow.Row, lastRow.RowID, cr.chunk.ColumnPermutation)
encodeDur += time.Since(encodeDurStart)
Expand Down
17 changes: 12 additions & 5 deletions lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,11 @@ func (s *tableRestoreSuite) TestGetColumnsNames(c *C) {
c.Assert(getColumnNames(s.tableInfo.Core, []int{0, 1, -1, -1}), DeepEquals, []string{"a", "b"})
c.Assert(getColumnNames(s.tableInfo.Core, []int{1, -1, 0, -1}), DeepEquals, []string{"c", "a"})
c.Assert(getColumnNames(s.tableInfo.Core, []int{-1, 0, -1, -1}), DeepEquals, []string{"b"})
c.Assert(getColumnNames(s.tableInfo.Core, []int{1, 2, 3, 0}), DeepEquals, []string{"_tidb_rowid", "a", "b", "c"})
c.Assert(getColumnNames(s.tableInfo.Core, []int{1, 0, 2, 3}), DeepEquals, []string{"b", "a", "c", "_tidb_rowid"})
c.Assert(getColumnNames(s.tableInfo.Core, []int{-1, 0, 2, 1}), DeepEquals, []string{"b", "_tidb_rowid", "c"})
c.Assert(getColumnNames(s.tableInfo.Core, []int{2, -1, 0, 1}), DeepEquals, []string{"c", "_tidb_rowid", "a"})
c.Assert(getColumnNames(s.tableInfo.Core, []int{-1, 1, -1, 0}), DeepEquals, []string{"_tidb_rowid", "b"})
}

func (s *tableRestoreSuite) TestInitializeColumns(c *C) {
Expand Down Expand Up @@ -1031,14 +1036,16 @@ func (s *chunkRestoreSuite) TestEncodeLoopColumnsMismatch(c *C) {

kvsCh := make(chan []deliveredKVs, 2)
deliverCompleteCh := make(chan deliverResult)
kvEncoder, err := kv.NewTableKVEncoder(s.tr.encTable, &kv.SessionOptions{
SQLMode: s.cfg.TiDB.SQLMode,
Timestamp: 1234567895,
})
kvEncoder, err := kv.NewTiDBBackend(nil, config.ReplaceOnDup).NewEncoder(
s.tr.encTable,
&kv.SessionOptions{
SQLMode: s.cfg.TiDB.SQLMode,
Timestamp: 1234567895,
})
c.Assert(err, IsNil)

_, _, err = s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, rc)
c.Assert(err, ErrorMatches, "row field count 4 is bigger than table fields count 3")
c.Assert(err, ErrorMatches, "in file db.table.2.sql:0 at offset 8: column count mismatch, expected 3, got 4")
c.Assert(kvsCh, HasLen, 0)
}

Expand Down
88 changes: 45 additions & 43 deletions tests/tidb_rowid/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,50 +17,52 @@

set -eu

run_sql 'DROP DATABASE IF EXISTS rowid;'
run_lightning
echo 'Import finished'
for BACKEND in local importer tidb; do
run_sql 'DROP DATABASE IF EXISTS rowid;'
run_lightning -backend $BACKEND
echo 'Import finished'

run_sql 'SELECT count(*), max(id), min(_tidb_rowid), max(_tidb_rowid) FROM rowid.`non_pk_auto_inc`'
check_contains 'count(*): 22'
check_contains 'max(id): 37'
check_contains 'min(_tidb_rowid): 1'
check_contains 'max(_tidb_rowid): 22'
run_sql 'INSERT INTO rowid.`non_pk_auto_inc` (`pk`) VALUES ("?")'
run_sql 'SELECT id > 37, _tidb_rowid > 22 FROM rowid.`non_pk_auto_inc` WHERE `pk` = "?"'
check_contains 'id > 37: 1'
check_contains '_tidb_rowid > 22: 1'
run_sql 'SELECT count(*), max(id), min(_tidb_rowid), max(_tidb_rowid) FROM rowid.`non_pk_auto_inc`'
check_contains 'count(*): 22'
check_contains 'max(id): 37'
check_contains 'min(_tidb_rowid): 1'
check_contains 'max(_tidb_rowid): 22'
run_sql 'INSERT INTO rowid.`non_pk_auto_inc` (`pk`) VALUES ("?")'
run_sql 'SELECT id > 37, _tidb_rowid > 22 FROM rowid.`non_pk_auto_inc` WHERE `pk` = "?"'
check_contains 'id > 37: 1'
check_contains '_tidb_rowid > 22: 1'

for table_name in non_pk explicit_tidb_rowid; do
run_sql "SELECT count(*), min(_tidb_rowid), max(_tidb_rowid) FROM rowid.${table_name}"
check_contains 'count(*): 10'
check_contains 'min(_tidb_rowid): 1'
check_contains 'max(_tidb_rowid): 10'
run_sql "SELECT _tidb_rowid FROM rowid.${table_name} WHERE pk = 'five'"
check_contains '_tidb_rowid: 5'
run_sql "INSERT INTO rowid.${table_name} VALUES ('eleven')"
run_sql "SELECT count(*) FROM rowid.${table_name}"
check_contains 'count(*): 11'
run_sql "SELECT count(*) FROM rowid.${table_name} WHERE pk > '!'"
check_contains 'count(*): 11'
run_sql "SELECT _tidb_rowid > 10 FROM rowid.${table_name} WHERE pk = 'eleven'"
check_contains '_tidb_rowid > 10: 1'
done
for table_name in non_pk explicit_tidb_rowid; do
run_sql "SELECT count(*), min(_tidb_rowid), max(_tidb_rowid) FROM rowid.${table_name}"
check_contains 'count(*): 10'
check_contains 'min(_tidb_rowid): 1'
check_contains 'max(_tidb_rowid): 10'
run_sql "SELECT _tidb_rowid FROM rowid.${table_name} WHERE pk = 'five'"
check_contains '_tidb_rowid: 5'
run_sql "INSERT INTO rowid.${table_name} VALUES ('eleven')"
run_sql "SELECT count(*) FROM rowid.${table_name}"
check_contains 'count(*): 11'
run_sql "SELECT count(*) FROM rowid.${table_name} WHERE pk > '!'"
check_contains 'count(*): 11'
run_sql "SELECT _tidb_rowid > 10 FROM rowid.${table_name} WHERE pk = 'eleven'"
check_contains '_tidb_rowid > 10: 1'
done

run_sql 'SELECT count(*), min(_tidb_rowid), max(_tidb_rowid) FROM rowid.pre_rebase'
check_contains 'count(*): 1'
check_contains 'min(_tidb_rowid): 1'
check_contains 'max(_tidb_rowid): 1'
run_sql 'INSERT INTO rowid.pre_rebase VALUES ("?")'
run_sql 'SELECT _tidb_rowid > 70000 FROM rowid.pre_rebase WHERE pk = "?"'
check_contains '_tidb_rowid > 70000: 1'
run_sql 'SELECT count(*), min(_tidb_rowid), max(_tidb_rowid) FROM rowid.pre_rebase'
check_contains 'count(*): 1'
check_contains 'min(_tidb_rowid): 1'
check_contains 'max(_tidb_rowid): 1'
run_sql 'INSERT INTO rowid.pre_rebase VALUES ("?")'
run_sql 'SELECT _tidb_rowid > 70000 FROM rowid.pre_rebase WHERE pk = "?"'
check_contains '_tidb_rowid > 70000: 1'

run_sql 'SELECT count(*) FROM rowid.specific_auto_inc'
check_contains 'count(*): 5'
run_sql 'INSERT INTO rowid.specific_auto_inc (a) VALUES ("ffffff"), ("gggggg")'
run_sql 'SELECT _tidb_rowid > 80000, b > 80000 FROM rowid.specific_auto_inc WHERE a = "ffffff"'
check_contains '_tidb_rowid > 80000: 1'
check_contains 'b > 80000: 1'
run_sql 'SELECT _tidb_rowid > 80000, b > 80000 FROM rowid.specific_auto_inc WHERE a = "gggggg"'
check_contains '_tidb_rowid > 80000: 1'
check_contains 'b > 80000: 1'
run_sql 'SELECT count(*) FROM rowid.specific_auto_inc'
check_contains 'count(*): 5'
run_sql 'INSERT INTO rowid.specific_auto_inc (a) VALUES ("ffffff"), ("gggggg")'
run_sql 'SELECT _tidb_rowid > 80000, b > 80000 FROM rowid.specific_auto_inc WHERE a = "ffffff"'
check_contains '_tidb_rowid > 80000: 1'
check_contains 'b > 80000: 1'
run_sql 'SELECT _tidb_rowid > 80000, b > 80000 FROM rowid.specific_auto_inc WHERE a = "gggggg"'
check_contains '_tidb_rowid > 80000: 1'
check_contains 'b > 80000: 1'
done

0 comments on commit e104101

Please sign in to comment.