Skip to content

Commit

Permalink
lightning: support restore data into tables that contains data (pingc…
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv authored and 3pointer committed Jun 4, 2021
1 parent 2fbf361 commit ac452ad
Show file tree
Hide file tree
Showing 39 changed files with 1,954 additions and 283 deletions.
108 changes: 66 additions & 42 deletions pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,48 +504,23 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st
if rows.Err() != nil {
return rows.Err()
}
// for version < v4.0.0 we can use `show table next_row_id` to fetch auto id info, so about should be enough
// shard_row_id/auto random is only available after tidb v4.0.0
// `show table next_row_id` is also not available before tidb v4.0.0
if tidbVersion.Major < 4 {
return nil
}

// init auto id column for each table
for _, tbl := range tables {
tblName := common.UniqueTable(schemaName, tbl.Name.O)
rows, e = tx.Query(fmt.Sprintf("SHOW TABLE %s NEXT_ROW_ID", tblName))
if e != nil {
return e
autoIDInfos, err := FetchTableAutoIDInfos(ctx, tx, tblName)
if err != nil {
return errors.Trace(err)
}
for rows.Next() {
var (
dbName, tblName, columnName, idType string
nextID int64
)
columns, err := rows.Columns()
if err != nil {
return err
}

// +--------------+------------+-------------+--------------------+----------------+
// | DB_NAME | TABLE_NAME | COLUMN_NAME | NEXT_GLOBAL_ROW_ID | ID_TYPE |
// +--------------+------------+-------------+--------------------+----------------+
// | testsysbench | t | _tidb_rowid | 1 | AUTO_INCREMENT |
// +--------------+------------+-------------+--------------------+----------------+

// if columns length is 4, it doesn't contains the last column `ID_TYPE`, and it will always be 'AUTO_INCREMENT'
// for v4.0.0~v4.0.2 show table t next_row_id only returns 4 columns.
if len(columns) == 4 {
err = rows.Scan(&dbName, &tblName, &columnName, &nextID)
idType = "AUTO_INCREMENT"
} else {
err = rows.Scan(&dbName, &tblName, &columnName, &nextID, &idType)
}
if err != nil {
return err
}

for _, info := range autoIDInfos {
for _, col := range tbl.Columns {
if col.Name.O == columnName {
switch idType {
if col.Name.O == info.Column {
switch info.Type {
case "AUTO_INCREMENT":
col.Flag |= mysql.AutoIncrementFlag
case "AUTO_RANDOM":
Expand All @@ -557,14 +532,7 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st
}
}
}
// Defer in for-loop would be costly, anyway, we don't need those rows after this turn of iteration.
//nolint:sqlclosecheck
if err := rows.Close(); err != nil {
return errors.Trace(err)
}
if rows.Err() != nil {
return errors.Trace(rows.Err())
}

}
return nil
})
Expand Down Expand Up @@ -607,3 +575,59 @@ func (w *Writer) Close(ctx context.Context) error {
func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames []string, arg1 uint64, rows kv.Rows) error {
return w.be.WriteRows(ctx, w.engineUUID, tableName, columnNames, arg1, rows)
}

type TableAutoIDInfo struct {
Column string
NextID int64
Type string
}

func FetchTableAutoIDInfos(ctx context.Context, exec common.QueryExecutor, tableName string) ([]*TableAutoIDInfo, error) {
rows, e := exec.QueryContext(ctx, fmt.Sprintf("SHOW TABLE %s NEXT_ROW_ID", tableName))
if e != nil {
return nil, errors.Trace(e)
}
var autoIDInfos []*TableAutoIDInfo
for rows.Next() {
var (
dbName, tblName, columnName, idType string
nextID int64
)
columns, err := rows.Columns()
if err != nil {
return nil, errors.Trace(err)
}

//+--------------+------------+-------------+--------------------+----------------+
//| DB_NAME | TABLE_NAME | COLUMN_NAME | NEXT_GLOBAL_ROW_ID | ID_TYPE |
//+--------------+------------+-------------+--------------------+----------------+
//| testsysbench | t | _tidb_rowid | 1 | AUTO_INCREMENT |
//+--------------+------------+-------------+--------------------+----------------+

// if columns length is 4, it doesn't contains the last column `ID_TYPE`, and it will always be 'AUTO_INCREMENT'
// for v4.0.0~v4.0.2 show table t next_row_id only returns 4 columns.
if len(columns) == 4 {
err = rows.Scan(&dbName, &tblName, &columnName, &nextID)
idType = "AUTO_INCREMENT"
} else {
err = rows.Scan(&dbName, &tblName, &columnName, &nextID, &idType)
}
if err != nil {
return nil, errors.Trace(err)
}
autoIDInfos = append(autoIDInfos, &TableAutoIDInfo{
Column: columnName,
NextID: nextID,
Type: idType,
})
}
// Defer in for-loop would be costly, anyway, we don't need those rows after this turn of iteration.
//nolint:sqlclosecheck
if err := rows.Close(); err != nil {
return nil, errors.Trace(err)
}
if rows.Err() != nil {
return nil, errors.Trace(rows.Err())
}
return autoIDInfos, nil
}
63 changes: 47 additions & 16 deletions pkg/lightning/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"math"
"os"
"sort"
"strings"
"sync"

"github.com/joho/sqltocsv"
Expand Down Expand Up @@ -63,7 +62,7 @@ const (
// the table names to store each kind of checkpoint in the checkpoint database
// remember to increase the version number in case of incompatible change.
CheckpointTableNameTask = "task_v2"
CheckpointTableNameTable = "table_v6"
CheckpointTableNameTable = "table_v7"
CheckpointTableNameEngine = "engine_v5"
CheckpointTableNameChunk = "chunk_v5"

Expand Down Expand Up @@ -99,6 +98,9 @@ const (
table_id bigint NOT NULL DEFAULT 0,
create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
kv_bytes bigint unsigned NOT NULL DEFAULT 0,
kv_kvs bigint unsigned NOT NULL DEFAULT 0,
kv_checksum bigint unsigned NOT NULL DEFAULT 0,
INDEX(task_id)
);`
CreateEngineTableTemplate = `
Expand Down Expand Up @@ -154,7 +156,7 @@ const (
FROM %s.%s WHERE table_name = ?
ORDER BY engine_id, path, offset;`
ReadTableRemainTemplate = `
SELECT status, alloc_base, table_id FROM %s.%s WHERE table_name = ?;`
SELECT status, alloc_base, table_id, kv_bytes, kv_kvs, kv_checksum FROM %s.%s WHERE table_name = ?;`
ReplaceEngineTemplate = `
REPLACE INTO %s.%s (table_name, engine_id, status) VALUES (?, ?, ?);`
ReplaceChunkTemplate = `
Expand All @@ -176,7 +178,8 @@ const (
UPDATE %s.%s SET alloc_base = GREATEST(?, alloc_base) WHERE table_name = ?;`
UpdateTableStatusTemplate = `
UPDATE %s.%s SET status = ? WHERE table_name = ?;`
UpdateEngineTemplate = `
UpdateTableChecksumTemplate = `UPDATE %s.%s SET kv_bytes = ?, kv_kvs = ?, kv_checksum = ? WHERE table_name = ?;`
UpdateEngineTemplate = `
UPDATE %s.%s SET status = ? WHERE (table_name, engine_id) = (?, ?);`
DeleteCheckpointRecordTemplate = "DELETE FROM %s.%s WHERE table_name = ?;"
)
Expand Down Expand Up @@ -278,6 +281,8 @@ type TableCheckpoint struct {
AllocBase int64
Engines map[int32]*EngineCheckpoint
TableID int64
// remote checksum before restore
Checksum verify.KVChecksum
}

func (cp *TableCheckpoint) DeepCopy() *TableCheckpoint {
Expand All @@ -290,6 +295,7 @@ func (cp *TableCheckpoint) DeepCopy() *TableCheckpoint {
AllocBase: cp.AllocBase,
Engines: engines,
TableID: cp.TableID,
Checksum: cp.Checksum,
}
}

Expand All @@ -315,11 +321,13 @@ type engineCheckpointDiff struct {
}

type TableCheckpointDiff struct {
hasStatus bool
hasRebase bool
status CheckpointStatus
allocBase int64
engines map[int32]engineCheckpointDiff
hasStatus bool
hasRebase bool
hasChecksum bool
status CheckpointStatus
allocBase int64
engines map[int32]engineCheckpointDiff
checksum verify.KVChecksum
}

func NewTableCheckpointDiff() *TableCheckpointDiff {
Expand Down Expand Up @@ -438,6 +446,15 @@ func (merger *ChunkCheckpointMerger) MergeInto(cpd *TableCheckpointDiff) {
})
}

type TableChecksumMerger struct {
Checksum verify.KVChecksum
}

func (m *TableChecksumMerger) MergeInto(cpd *TableCheckpointDiff) {
cpd.hasChecksum = true
cpd.checksum = m.Checksum
}

type RebaseCheckpointMerger struct {
AllocBase int64
}
Expand Down Expand Up @@ -591,10 +608,7 @@ type MySQLCheckpointsDB struct {
}

func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string) (*MySQLCheckpointsDB, error) {
var escapedSchemaName strings.Builder
common.WriteMySQLIdentifier(&escapedSchemaName, schemaName)
schema := escapedSchemaName.String()

schema := common.EscapeIdentifier(schemaName)
sql := common.SQLWithRetry{
DB: db,
Logger: log.With(zap.String("schema", schemaName)),
Expand Down Expand Up @@ -780,12 +794,13 @@ func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*Tab
tableRow := tx.QueryRowContext(c, tableQuery, tableName)

var status uint8
if err := tableRow.Scan(&status, &cp.AllocBase, &cp.TableID); err != nil {
var kvs, bytes, checksum uint64
if err := tableRow.Scan(&status, &cp.AllocBase, &cp.TableID, &bytes, &kvs, &checksum); err != nil {
if err == sql.ErrNoRows {
return errors.NotFoundf("checkpoint for table %s", tableName)
}
return errors.Trace(err)
}
cp.Checksum = verify.MakeKVChecksum(bytes, kvs, checksum)
cp.Status = CheckpointStatus(status)
return nil
})
Expand Down Expand Up @@ -849,6 +864,7 @@ func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpoi
chunkQuery := fmt.Sprintf(UpdateChunkTemplate, cpdb.schema, CheckpointTableNameChunk)
rebaseQuery := fmt.Sprintf(UpdateTableRebaseTemplate, cpdb.schema, CheckpointTableNameTable)
tableStatusQuery := fmt.Sprintf(UpdateTableStatusTemplate, cpdb.schema, CheckpointTableNameTable)
tableChecksumQuery := fmt.Sprintf(UpdateTableChecksumTemplate, cpdb.schema, CheckpointTableNameTable)
engineStatusQuery := fmt.Sprintf(UpdateEngineTemplate, cpdb.schema, CheckpointTableNameEngine)

s := common.SQLWithRetry{DB: cpdb.db, Logger: log.L()}
Expand All @@ -868,12 +884,16 @@ func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpoi
return errors.Trace(e)
}
defer tableStatusStmt.Close()
tableChecksumStmt, e := tx.PrepareContext(c, tableChecksumQuery)
if e != nil {
return errors.Trace(e)
}
defer tableChecksumStmt.Close()
engineStatusStmt, e := tx.PrepareContext(c, engineStatusQuery)
if e != nil {
return errors.Trace(e)
}
defer engineStatusStmt.Close()

for tableName, cpd := range checkpointDiffs {
if cpd.hasStatus {
if _, e := tableStatusStmt.ExecContext(c, cpd.status, tableName); e != nil {
Expand All @@ -885,6 +905,11 @@ func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpoi
return errors.Trace(e)
}
}
if cpd.hasChecksum {
if _, e := tableChecksumStmt.ExecContext(c, cpd.checksum.SumSize(), cpd.checksum.SumKVS(), cpd.checksum.Sum(), tableName); e != nil {
return errors.Trace(e)
}
}
for engineID, engineDiff := range cpd.engines {
if engineDiff.hasStatus {
if _, e := engineStatusStmt.ExecContext(c, engineDiff.status, tableName, engineID); e != nil {
Expand Down Expand Up @@ -1054,6 +1079,7 @@ func (cpdb *FileCheckpointsDB) Get(_ context.Context, tableName string) (*TableC
AllocBase: tableModel.AllocBase,
Engines: make(map[int32]*EngineCheckpoint, len(tableModel.Engines)),
TableID: tableModel.TableID,
Checksum: verify.MakeKVChecksum(tableModel.KvBytes, tableModel.KvKvs, tableModel.KvChecksum),
}

for engineID, engineModel := range tableModel.Engines {
Expand Down Expand Up @@ -1152,6 +1178,11 @@ func (cpdb *FileCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpoin
if cpd.hasRebase {
tableModel.AllocBase = cpd.allocBase
}
if cpd.hasChecksum {
tableModel.KvBytes = cpd.checksum.SumSize()
tableModel.KvKvs = cpd.checksum.SumKVS()
tableModel.KvChecksum = cpd.checksum.Sum()
}
for engineID, engineDiff := range cpd.engines {
engineModel := tableModel.Engines[engineID]
if engineDiff.hasStatus {
Expand Down
5 changes: 5 additions & 0 deletions pkg/lightning/checkpoints/checkpoints_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ func (s *cpFileSuite) SetUpTest(c *C) {
AllocBase: 132861,
}
rcm.MergeInto(cpd)
cksum := checkpoints.TableChecksumMerger{
Checksum: verification.MakeKVChecksum(4492, 686, 486070148910),
}
cksum.MergeInto(cpd)
ccm := checkpoints.ChunkCheckpointMerger{
EngineID: 0,
Key: checkpoints.ChunkCheckpointKey{Path: "/tmp/path/1.sql", Offset: 0},
Expand Down Expand Up @@ -158,6 +162,7 @@ func (s *cpFileSuite) TestGet(c *C) {
c.Assert(cp, DeepEquals, &checkpoints.TableCheckpoint{
Status: checkpoints.CheckpointStatusAllWritten,
AllocBase: 132861,
Checksum: verification.MakeKVChecksum(4492, 686, 486070148910),
Engines: map[int32]*checkpoints.EngineCheckpoint{
-1: {
Status: checkpoints.CheckpointStatusLoaded,
Expand Down
15 changes: 13 additions & 2 deletions pkg/lightning/checkpoints/checkpoints_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ func (s *cpSQLSuite) TestNormalOperations(c *C) {
AllocBase: 132861,
}
rcm.MergeInto(cpd)
cksum := checkpoints.TableChecksumMerger{
Checksum: verification.MakeKVChecksum(4492, 686, 486070148910),
}
cksum.MergeInto(cpd)
ccm := checkpoints.ChunkCheckpointMerger{
EngineID: 0,
Key: checkpoints.ChunkCheckpointKey{Path: "/tmp/path/1.sql", Offset: 0},
Expand Down Expand Up @@ -208,6 +212,12 @@ func (s *cpSQLSuite) TestNormalOperations(c *C) {
ExpectExec().
WithArgs(60, "`db1`.`t2`").
WillReturnResult(sqlmock.NewResult(14, 1))
s.mock.
ExpectPrepare("UPDATE `mock-schema`\\.table_v\\d+ SET kv_bytes = .+").
ExpectExec().
WithArgs(4492, 686, 486070148910, "`db1`.`t2`").
WillReturnResult(sqlmock.NewResult(15, 1))

s.mock.ExpectCommit()

s.mock.MatchExpectationsInOrder(false)
Expand Down Expand Up @@ -245,8 +255,8 @@ func (s *cpSQLSuite) TestNormalOperations(c *C) {
ExpectQuery("SELECT .+ FROM `mock-schema`\\.table_v\\d+").
WithArgs("`db1`.`t2`").
WillReturnRows(
sqlmock.NewRows([]string{"status", "alloc_base", "table_id"}).
AddRow(60, 132861, int64(2)),
sqlmock.NewRows([]string{"status", "alloc_base", "table_id", "kv_bytes", "kv_kvs", "kv_checksum"}).
AddRow(60, 132861, int64(2), uint64(4492), uint64(686), uint64(486070148910)),
)
s.mock.ExpectCommit()

Expand Down Expand Up @@ -282,6 +292,7 @@ func (s *cpSQLSuite) TestNormalOperations(c *C) {
}},
},
},
Checksum: verification.MakeKVChecksum(4492, 686, 486070148910),
})
c.Assert(s.mock.ExpectationsWereMet(), IsNil)
}
Expand Down
Loading

0 comments on commit ac452ad

Please sign in to comment.