Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

lightning: support restore data into tables that contains data #784

Merged
merged 46 commits into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
cc0337e
support restore data into tables that contains data
glorv Mar 2, 2021
5fdbb50
add integration test
glorv Mar 3, 2021
df174b6
allow run multi lightning in parallel
glorv Mar 9, 2021
224906c
reuse task id when recover from checkpoint
glorv Mar 9, 2021
d459855
Merge branch 'master' of https://github.com/pingcap/br into increment…
glorv Mar 10, 2021
bcba066
fix lint
glorv Mar 10, 2021
03399d7
fix meta table field name
glorv Mar 10, 2021
6d89f9d
fix integration tests
glorv Mar 10, 2021
bdcbf3b
fix rowid check for auto-random
glorv Mar 10, 2021
b0ba237
Merge branch 'master' into incremental-restore
glorv Mar 12, 2021
d7d0db1
fix version
glorv Mar 12, 2021
a55d286
fix auto random
glorv Mar 15, 2021
e8dceff
Merge branch 'master' of https://github.com/pingcap/br into increment…
glorv Mar 15, 2021
0c4d45f
fix allocate row_id
glorv Mar 16, 2021
4c977d1
fix
glorv Mar 16, 2021
f818bff
fix close rows
glorv Mar 16, 2021
7d64011
add unit test and fix bug
glorv Mar 17, 2021
b7ed935
Merge branch 'master' into incremental-restore
glorv Mar 17, 2021
e2b0a91
fix integration test
glorv Mar 17, 2021
df4a3fa
Merge branch 'incremental-restore' of https://github.com/glorv/br int…
glorv Mar 17, 2021
232dc1e
Merge branch 'master' of https://github.com/pingcap/br into increment…
glorv Mar 17, 2021
e0b5d04
fix import name change
glorv Mar 18, 2021
ce55e35
Merge branch 'master' into incremental-restore
glorv Mar 18, 2021
458b299
fix auto_random primary key
glorv Mar 18, 2021
de8f71a
Merge branch 'master' into incremental-restore
glorv Mar 25, 2021
9edd635
Merge branch 'master' of ssh://github.com/pingcap/br into incremental…
glorv Apr 15, 2021
ae6d57f
Merge branch 'incremental-restore' of ssh://github.com/glorv/br into …
glorv Apr 15, 2021
dae79a2
add taskmeta to sync schedulers and swith-mode between multi lightning
glorv Apr 20, 2021
62f10db
use custom db to store lighting metas
glorv Apr 20, 2021
ddc8151
Merge branch 'master' of ssh://github.com/pingcap/br into incremental…
glorv Apr 30, 2021
1991589
Merge branch 'master' of ssh://github.com/pingcap/br into incremental…
glorv May 24, 2021
3beafa5
fix some bug in task and table meta
glorv May 24, 2021
06c8a38
fix tests
glorv May 24, 2021
e25fa78
fix build
glorv May 24, 2021
6a9ee4f
add log
glorv May 25, 2021
3df592b
fix meta mgr for tidb backend
glorv May 25, 2021
953aa9c
remove table empty check
glorv May 25, 2021
8da7fd4
fix schedulers
glorv May 25, 2021
dcf1b7d
add importer backend
glorv May 26, 2021
0c42432
Merge branch 'master' of ssh://github.com/pingcap/br into incremental…
glorv Jun 3, 2021
965c7eb
return error if target table is in checksum phase
glorv Jun 3, 2021
059175a
resolve comments
glorv Jun 3, 2021
ef782e1
remove useless code
glorv Jun 3, 2021
a895b15
fmt code
glorv Jun 3, 2021
6a3be0b
Merge branch 'master' of ssh://github.com/pingcap/br into incremental…
glorv Jun 4, 2021
a352c3a
add copy right header
glorv Jun 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 66 additions & 42 deletions pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,48 +504,23 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st
if rows.Err() != nil {
return rows.Err()
}
// for version < v4.0.0 we can use `show table next_row_id` to fetch auto id info, so about should be enough
// shard_row_id/auto random is only available after tidb v4.0.0
// `show table next_row_id` is also not available before tidb v4.0.0
if tidbVersion.Major < 4 {
return nil
}

// init auto id column for each table
for _, tbl := range tables {
tblName := common.UniqueTable(schemaName, tbl.Name.O)
rows, e = tx.Query(fmt.Sprintf("SHOW TABLE %s NEXT_ROW_ID", tblName))
if e != nil {
return e
autoIDInfos, err := FetchTableAutoIDInfos(ctx, tx, tblName)
if err != nil {
return errors.Trace(err)
}
for rows.Next() {
var (
dbName, tblName, columnName, idType string
nextID int64
)
columns, err := rows.Columns()
if err != nil {
return err
}

// +--------------+------------+-------------+--------------------+----------------+
// | DB_NAME | TABLE_NAME | COLUMN_NAME | NEXT_GLOBAL_ROW_ID | ID_TYPE |
// +--------------+------------+-------------+--------------------+----------------+
// | testsysbench | t | _tidb_rowid | 1 | AUTO_INCREMENT |
// +--------------+------------+-------------+--------------------+----------------+

// if columns length is 4, it doesn't contains the last column `ID_TYPE`, and it will always be 'AUTO_INCREMENT'
// for v4.0.0~v4.0.2 show table t next_row_id only returns 4 columns.
if len(columns) == 4 {
err = rows.Scan(&dbName, &tblName, &columnName, &nextID)
idType = "AUTO_INCREMENT"
} else {
err = rows.Scan(&dbName, &tblName, &columnName, &nextID, &idType)
}
if err != nil {
return err
}

for _, info := range autoIDInfos {
for _, col := range tbl.Columns {
if col.Name.O == columnName {
switch idType {
if col.Name.O == info.Column {
switch info.Type {
case "AUTO_INCREMENT":
col.Flag |= mysql.AutoIncrementFlag
case "AUTO_RANDOM":
Expand All @@ -557,14 +532,7 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st
}
}
}
// Defer in for-loop would be costly, anyway, we don't need those rows after this turn of iteration.
//nolint:sqlclosecheck
if err := rows.Close(); err != nil {
return errors.Trace(err)
}
if rows.Err() != nil {
return errors.Trace(rows.Err())
}

}
return nil
})
Expand Down Expand Up @@ -607,3 +575,59 @@ func (w *Writer) Close(ctx context.Context) error {
func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames []string, arg1 uint64, rows kv.Rows) error {
return w.be.WriteRows(ctx, w.engineUUID, tableName, columnNames, arg1, rows)
}

type TableAutoIDInfo struct {
Column string
NextID int64
Type string
}

func FetchTableAutoIDInfos(ctx context.Context, exec common.QueryExecutor, tableName string) ([]*TableAutoIDInfo, error) {
rows, e := exec.QueryContext(ctx, fmt.Sprintf("SHOW TABLE %s NEXT_ROW_ID", tableName))
if e != nil {
return nil, errors.Trace(e)
}
var autoIDInfos []*TableAutoIDInfo
for rows.Next() {
var (
dbName, tblName, columnName, idType string
nextID int64
)
columns, err := rows.Columns()
if err != nil {
return nil, errors.Trace(err)
}

//+--------------+------------+-------------+--------------------+----------------+
//| DB_NAME | TABLE_NAME | COLUMN_NAME | NEXT_GLOBAL_ROW_ID | ID_TYPE |
//+--------------+------------+-------------+--------------------+----------------+
//| testsysbench | t | _tidb_rowid | 1 | AUTO_INCREMENT |
//+--------------+------------+-------------+--------------------+----------------+

// if columns length is 4, it doesn't contains the last column `ID_TYPE`, and it will always be 'AUTO_INCREMENT'
// for v4.0.0~v4.0.2 show table t next_row_id only returns 4 columns.
if len(columns) == 4 {
err = rows.Scan(&dbName, &tblName, &columnName, &nextID)
idType = "AUTO_INCREMENT"
} else {
err = rows.Scan(&dbName, &tblName, &columnName, &nextID, &idType)
}
if err != nil {
return nil, errors.Trace(err)
}
autoIDInfos = append(autoIDInfos, &TableAutoIDInfo{
Column: columnName,
NextID: nextID,
Type: idType,
})
}
// Defer in for-loop would be costly, anyway, we don't need those rows after this turn of iteration.
//nolint:sqlclosecheck
if err := rows.Close(); err != nil {
return nil, errors.Trace(err)
}
if rows.Err() != nil {
return nil, errors.Trace(rows.Err())
}
return autoIDInfos, nil
}
63 changes: 47 additions & 16 deletions pkg/lightning/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"math"
"os"
"sort"
"strings"
"sync"

"github.com/joho/sqltocsv"
Expand Down Expand Up @@ -63,7 +62,7 @@ const (
// the table names to store each kind of checkpoint in the checkpoint database
// remember to increase the version number in case of incompatible change.
CheckpointTableNameTask = "task_v2"
CheckpointTableNameTable = "table_v6"
CheckpointTableNameTable = "table_v7"
CheckpointTableNameEngine = "engine_v5"
CheckpointTableNameChunk = "chunk_v5"

Expand Down Expand Up @@ -99,6 +98,9 @@ const (
table_id bigint NOT NULL DEFAULT 0,
create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
kv_bytes bigint unsigned NOT NULL DEFAULT 0,
kv_kvs bigint unsigned NOT NULL DEFAULT 0,
kv_checksum bigint unsigned NOT NULL DEFAULT 0,
INDEX(task_id)
);`
CreateEngineTableTemplate = `
Expand Down Expand Up @@ -154,7 +156,7 @@ const (
FROM %s.%s WHERE table_name = ?
ORDER BY engine_id, path, offset;`
ReadTableRemainTemplate = `
SELECT status, alloc_base, table_id FROM %s.%s WHERE table_name = ?;`
SELECT status, alloc_base, table_id, kv_bytes, kv_kvs, kv_checksum FROM %s.%s WHERE table_name = ?;`
ReplaceEngineTemplate = `
REPLACE INTO %s.%s (table_name, engine_id, status) VALUES (?, ?, ?);`
ReplaceChunkTemplate = `
Expand All @@ -176,7 +178,8 @@ const (
UPDATE %s.%s SET alloc_base = GREATEST(?, alloc_base) WHERE table_name = ?;`
UpdateTableStatusTemplate = `
UPDATE %s.%s SET status = ? WHERE table_name = ?;`
UpdateEngineTemplate = `
UpdateTableChecksumTemplate = `UPDATE %s.%s SET kv_bytes = ?, kv_kvs = ?, kv_checksum = ? WHERE table_name = ?;`
UpdateEngineTemplate = `
UPDATE %s.%s SET status = ? WHERE (table_name, engine_id) = (?, ?);`
DeleteCheckpointRecordTemplate = "DELETE FROM %s.%s WHERE table_name = ?;"
)
Expand Down Expand Up @@ -278,6 +281,8 @@ type TableCheckpoint struct {
AllocBase int64
Engines map[int32]*EngineCheckpoint
TableID int64
// remote checksum before restore
Checksum verify.KVChecksum
}

func (cp *TableCheckpoint) DeepCopy() *TableCheckpoint {
Expand All @@ -290,6 +295,7 @@ func (cp *TableCheckpoint) DeepCopy() *TableCheckpoint {
AllocBase: cp.AllocBase,
Engines: engines,
TableID: cp.TableID,
Checksum: cp.Checksum,
}
}

Expand All @@ -315,11 +321,13 @@ type engineCheckpointDiff struct {
}

type TableCheckpointDiff struct {
hasStatus bool
hasRebase bool
status CheckpointStatus
allocBase int64
engines map[int32]engineCheckpointDiff
hasStatus bool
hasRebase bool
hasChecksum bool
status CheckpointStatus
allocBase int64
engines map[int32]engineCheckpointDiff
checksum verify.KVChecksum
}

func NewTableCheckpointDiff() *TableCheckpointDiff {
Expand Down Expand Up @@ -438,6 +446,15 @@ func (merger *ChunkCheckpointMerger) MergeInto(cpd *TableCheckpointDiff) {
})
}

type TableChecksumMerger struct {
Checksum verify.KVChecksum
}

func (m *TableChecksumMerger) MergeInto(cpd *TableCheckpointDiff) {
cpd.hasChecksum = true
cpd.checksum = m.Checksum
}

type RebaseCheckpointMerger struct {
AllocBase int64
}
Expand Down Expand Up @@ -591,10 +608,7 @@ type MySQLCheckpointsDB struct {
}

func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string) (*MySQLCheckpointsDB, error) {
var escapedSchemaName strings.Builder
common.WriteMySQLIdentifier(&escapedSchemaName, schemaName)
schema := escapedSchemaName.String()

schema := common.EscapeIdentifier(schemaName)
sql := common.SQLWithRetry{
DB: db,
Logger: log.With(zap.String("schema", schemaName)),
Expand Down Expand Up @@ -780,12 +794,13 @@ func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*Tab
tableRow := tx.QueryRowContext(c, tableQuery, tableName)

var status uint8
if err := tableRow.Scan(&status, &cp.AllocBase, &cp.TableID); err != nil {
var kvs, bytes, checksum uint64
if err := tableRow.Scan(&status, &cp.AllocBase, &cp.TableID, &bytes, &kvs, &checksum); err != nil {
if err == sql.ErrNoRows {
return errors.NotFoundf("checkpoint for table %s", tableName)
}
return errors.Trace(err)
}
cp.Checksum = verify.MakeKVChecksum(bytes, kvs, checksum)
cp.Status = CheckpointStatus(status)
return nil
})
Expand Down Expand Up @@ -849,6 +864,7 @@ func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpoi
chunkQuery := fmt.Sprintf(UpdateChunkTemplate, cpdb.schema, CheckpointTableNameChunk)
rebaseQuery := fmt.Sprintf(UpdateTableRebaseTemplate, cpdb.schema, CheckpointTableNameTable)
tableStatusQuery := fmt.Sprintf(UpdateTableStatusTemplate, cpdb.schema, CheckpointTableNameTable)
tableChecksumQuery := fmt.Sprintf(UpdateTableChecksumTemplate, cpdb.schema, CheckpointTableNameTable)
engineStatusQuery := fmt.Sprintf(UpdateEngineTemplate, cpdb.schema, CheckpointTableNameEngine)

s := common.SQLWithRetry{DB: cpdb.db, Logger: log.L()}
Expand All @@ -868,12 +884,16 @@ func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpoi
return errors.Trace(e)
}
defer tableStatusStmt.Close()
tableChecksumStmt, e := tx.PrepareContext(c, tableChecksumQuery)
if e != nil {
return errors.Trace(e)
}
defer tableChecksumStmt.Close()
engineStatusStmt, e := tx.PrepareContext(c, engineStatusQuery)
if e != nil {
return errors.Trace(e)
}
defer engineStatusStmt.Close()

for tableName, cpd := range checkpointDiffs {
if cpd.hasStatus {
if _, e := tableStatusStmt.ExecContext(c, cpd.status, tableName); e != nil {
Expand All @@ -885,6 +905,11 @@ func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpoi
return errors.Trace(e)
}
}
if cpd.hasChecksum {
if _, e := tableChecksumStmt.ExecContext(c, cpd.checksum.SumSize(), cpd.checksum.SumKVS(), cpd.checksum.Sum(), tableName); e != nil {
return errors.Trace(e)
}
}
for engineID, engineDiff := range cpd.engines {
if engineDiff.hasStatus {
if _, e := engineStatusStmt.ExecContext(c, engineDiff.status, tableName, engineID); e != nil {
Expand Down Expand Up @@ -1054,6 +1079,7 @@ func (cpdb *FileCheckpointsDB) Get(_ context.Context, tableName string) (*TableC
AllocBase: tableModel.AllocBase,
Engines: make(map[int32]*EngineCheckpoint, len(tableModel.Engines)),
TableID: tableModel.TableID,
Checksum: verify.MakeKVChecksum(tableModel.KvBytes, tableModel.KvKvs, tableModel.KvChecksum),
}

for engineID, engineModel := range tableModel.Engines {
Expand Down Expand Up @@ -1152,6 +1178,11 @@ func (cpdb *FileCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpoin
if cpd.hasRebase {
tableModel.AllocBase = cpd.allocBase
}
if cpd.hasChecksum {
tableModel.KvBytes = cpd.checksum.SumSize()
tableModel.KvKvs = cpd.checksum.SumKVS()
tableModel.KvChecksum = cpd.checksum.Sum()
}
for engineID, engineDiff := range cpd.engines {
engineModel := tableModel.Engines[engineID]
if engineDiff.hasStatus {
Expand Down
5 changes: 5 additions & 0 deletions pkg/lightning/checkpoints/checkpoints_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ func (s *cpFileSuite) SetUpTest(c *C) {
AllocBase: 132861,
}
rcm.MergeInto(cpd)
cksum := checkpoints.TableChecksumMerger{
Checksum: verification.MakeKVChecksum(4492, 686, 486070148910),
}
cksum.MergeInto(cpd)
ccm := checkpoints.ChunkCheckpointMerger{
EngineID: 0,
Key: checkpoints.ChunkCheckpointKey{Path: "/tmp/path/1.sql", Offset: 0},
Expand Down Expand Up @@ -158,6 +162,7 @@ func (s *cpFileSuite) TestGet(c *C) {
c.Assert(cp, DeepEquals, &checkpoints.TableCheckpoint{
Status: checkpoints.CheckpointStatusAllWritten,
AllocBase: 132861,
Checksum: verification.MakeKVChecksum(4492, 686, 486070148910),
Engines: map[int32]*checkpoints.EngineCheckpoint{
-1: {
Status: checkpoints.CheckpointStatusLoaded,
Expand Down
15 changes: 13 additions & 2 deletions pkg/lightning/checkpoints/checkpoints_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ func (s *cpSQLSuite) TestNormalOperations(c *C) {
AllocBase: 132861,
}
rcm.MergeInto(cpd)
cksum := checkpoints.TableChecksumMerger{
Checksum: verification.MakeKVChecksum(4492, 686, 486070148910),
}
cksum.MergeInto(cpd)
ccm := checkpoints.ChunkCheckpointMerger{
EngineID: 0,
Key: checkpoints.ChunkCheckpointKey{Path: "/tmp/path/1.sql", Offset: 0},
Expand Down Expand Up @@ -208,6 +212,12 @@ func (s *cpSQLSuite) TestNormalOperations(c *C) {
ExpectExec().
WithArgs(60, "`db1`.`t2`").
WillReturnResult(sqlmock.NewResult(14, 1))
s.mock.
ExpectPrepare("UPDATE `mock-schema`\\.table_v\\d+ SET kv_bytes = .+").
ExpectExec().
WithArgs(4492, 686, 486070148910, "`db1`.`t2`").
WillReturnResult(sqlmock.NewResult(15, 1))

s.mock.ExpectCommit()

s.mock.MatchExpectationsInOrder(false)
Expand Down Expand Up @@ -245,8 +255,8 @@ func (s *cpSQLSuite) TestNormalOperations(c *C) {
ExpectQuery("SELECT .+ FROM `mock-schema`\\.table_v\\d+").
WithArgs("`db1`.`t2`").
WillReturnRows(
sqlmock.NewRows([]string{"status", "alloc_base", "table_id"}).
AddRow(60, 132861, int64(2)),
sqlmock.NewRows([]string{"status", "alloc_base", "table_id", "kv_bytes", "kv_kvs", "kv_checksum"}).
AddRow(60, 132861, int64(2), uint64(4492), uint64(686), uint64(486070148910)),
)
s.mock.ExpectCommit()

Expand Down Expand Up @@ -282,6 +292,7 @@ func (s *cpSQLSuite) TestNormalOperations(c *C) {
}},
},
},
Checksum: verification.MakeKVChecksum(4492, 686, 486070148910),
})
c.Assert(s.mock.ExpectationsWereMet(), IsNil)
}
Expand Down
Loading