Skip to content

Commit

Permalink
lightning: use SprintfWithIdentifier to process strings with identifi…
Browse files Browse the repository at this point in the history
…er (#50434)

ref #30699
  • Loading branch information
lance6716 authored Jan 17, 2024
1 parent 245951f commit 5fee756
Show file tree
Hide file tree
Showing 15 changed files with 264 additions and 1,030 deletions.
5 changes: 0 additions & 5 deletions br/pkg/lightning/checkpoints/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go_library(
name = "checkpoints",
srcs = [
"checkpoints.go",
"glue_checkpoint.go",
"tidb.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/lightning/checkpoints",
Expand All @@ -18,11 +17,7 @@ go_library(
"//br/pkg/lightning/verification",
"//br/pkg/storage",
"//br/pkg/version/build",
"//pkg/parser/ast",
"//pkg/parser/model",
"//pkg/types",
"//pkg/util/chunk",
"//pkg/util/sqlexec",
"@com_github_joho_sqltocsv//:sqltocsv",
"@com_github_pingcap_errors//:errors",
"@org_uber_go_zap//:zap",
Expand Down
181 changes: 96 additions & 85 deletions br/pkg/lightning/checkpoints/checkpoints.go

Large diffs are not rendered by default.

92 changes: 46 additions & 46 deletions br/pkg/lightning/checkpoints/checkpoints_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ func newCPSQLSuite(t *testing.T) *cpSQLSuite {
ExpectExec("CREATE DATABASE IF NOT EXISTS `mock-schema`").
WillReturnResult(sqlmock.NewResult(1, 1))
s.mock.
ExpectExec("CREATE TABLE IF NOT EXISTS `mock-schema`\\.task_v\\d+ .+").
ExpectExec("CREATE TABLE IF NOT EXISTS `mock-schema`\\.`task_v\\d+` .+").
WillReturnResult(sqlmock.NewResult(2, 1))
s.mock.
ExpectExec("CREATE TABLE IF NOT EXISTS `mock-schema`\\.table_v\\d+ .+").
ExpectExec("CREATE TABLE IF NOT EXISTS `mock-schema`\\.`table_v\\d+` .+").
WillReturnResult(sqlmock.NewResult(3, 1))
s.mock.
ExpectExec("CREATE TABLE IF NOT EXISTS `mock-schema`\\.engine_v\\d+ .+").
ExpectExec("CREATE TABLE IF NOT EXISTS `mock-schema`\\.`engine_v\\d+` .+").
WillReturnResult(sqlmock.NewResult(4, 1))
s.mock.
ExpectExec("CREATE TABLE IF NOT EXISTS `mock-schema`\\.chunk_v\\d+ .+").
ExpectExec("CREATE TABLE IF NOT EXISTS `mock-schema`\\.`chunk_v\\d+` .+").
WillReturnResult(sqlmock.NewResult(5, 1))

cpdb, err := checkpoints.NewMySQLCheckpointsDB(context.Background(), s.db, "mock-schema")
Expand Down Expand Up @@ -82,12 +82,12 @@ func TestNormalOperations(t *testing.T) {

s.mock.ExpectBegin()
initializeStmt := s.mock.ExpectPrepare(
"REPLACE INTO `mock-schema`\\.task_v\\d+")
"REPLACE INTO `mock-schema`\\.`task_v\\d+`")
initializeStmt.ExpectExec().
WithArgs(123, "/data", "local", "127.0.0.1:8287", "127.0.0.1", 4000, "127.0.0.1:2379", "/tmp/sorted-kv", build.ReleaseVersion).
WillReturnResult(sqlmock.NewResult(6, 1))
initializeStmt = s.mock.
ExpectPrepare("INSERT INTO `mock-schema`\\.table_v\\d+")
ExpectPrepare("INSERT INTO `mock-schema`\\.`table_v\\d+`")
initializeStmt.ExpectExec().
WithArgs(123, "`db1`.`t1`", sqlmock.AnyArg(), int64(1), t1Info).
WillReturnResult(sqlmock.NewResult(7, 1))
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestNormalOperations(t *testing.T) {

s.mock.ExpectBegin()
insertEngineStmt := s.mock.
ExpectPrepare("REPLACE INTO `mock-schema`\\.engine_v\\d+ .+")
ExpectPrepare("REPLACE INTO `mock-schema`\\.`engine_v\\d+` .+")
insertEngineStmt.
ExpectExec().
WithArgs("`db1`.`t2`", 0, 30).
Expand All @@ -152,7 +152,7 @@ func TestNormalOperations(t *testing.T) {
WithArgs("`db1`.`t2`", -1, 30).
WillReturnResult(sqlmock.NewResult(9, 1))
insertChunkStmt := s.mock.
ExpectPrepare("REPLACE INTO `mock-schema`\\.chunk_v\\d+ .+")
ExpectPrepare("REPLACE INTO `mock-schema`\\.`chunk_v\\d+` .+")
insertChunkStmt.
ExpectExec().
WithArgs("`db1`.`t2`", 0, "/tmp/path/1.sql", 0, mydump.SourceTypeSQL, 0, "", 123, []byte("null"), 12, 102400, 1, 5000, 1234567890).
Expand Down Expand Up @@ -223,30 +223,30 @@ func TestNormalOperations(t *testing.T) {

s.mock.ExpectBegin()
s.mock.
ExpectPrepare("UPDATE `mock-schema`\\.chunk_v\\d+ SET pos = .+").
ExpectPrepare("UPDATE `mock-schema`\\.`chunk_v\\d+` SET pos = .+").
ExpectExec().
WithArgs(
55904, 681, 4491, 586, 486070148917, []byte("null"),
"`db1`.`t2`", 0, "/tmp/path/1.sql", 0,
).
WillReturnResult(sqlmock.NewResult(11, 1))
s.mock.
ExpectPrepare("UPDATE `mock-schema`\\.table_v\\d+ SET alloc_base = .+").
ExpectPrepare("UPDATE `mock-schema`\\.`table_v\\d+` SET alloc_base = .+").
ExpectExec().
WithArgs(132861, "`db1`.`t2`").
WillReturnResult(sqlmock.NewResult(12, 1))
s.mock.
ExpectPrepare("UPDATE `mock-schema`\\.engine_v\\d+ SET status = .+").
ExpectPrepare("UPDATE `mock-schema`\\.`engine_v\\d+` SET status = .+").
ExpectExec().
WithArgs(120, "`db1`.`t2`", 0).
WillReturnResult(sqlmock.NewResult(13, 1))
s.mock.
ExpectPrepare("UPDATE `mock-schema`\\.table_v\\d+ SET status = .+").
ExpectPrepare("UPDATE `mock-schema`\\.`table_v\\d+` SET status = .+").
ExpectExec().
WithArgs(60, "`db1`.`t2`").
WillReturnResult(sqlmock.NewResult(14, 1))
s.mock.
ExpectPrepare("UPDATE `mock-schema`\\.table_v\\d+ SET kv_bytes = .+").
ExpectPrepare("UPDATE `mock-schema`\\.`table_v\\d+` SET kv_bytes = .+").
ExpectExec().
WithArgs(4492, 686, 486070148910, "`db1`.`t2`").
WillReturnResult(sqlmock.NewResult(15, 1))
Expand All @@ -262,15 +262,15 @@ func TestNormalOperations(t *testing.T) {

s.mock.ExpectBegin()
s.mock.
ExpectQuery("SELECT .+ FROM `mock-schema`\\.engine_v\\d+").
ExpectQuery("SELECT .+ FROM `mock-schema`\\.`engine_v\\d+`").
WithArgs("`db1`.`t2`").
WillReturnRows(
sqlmock.NewRows([]string{"engine_id", "status"}).
AddRow(0, 120).
AddRow(-1, 30),
)
s.mock.
ExpectQuery("SELECT (?s:.+) FROM `mock-schema`\\.chunk_v\\d+").
ExpectQuery("SELECT (?s:.+) FROM `mock-schema`\\.`chunk_v\\d+`").
WithArgs("`db1`.`t2`").
WillReturnRows(
sqlmock.NewRows([]string{
Expand All @@ -285,7 +285,7 @@ func TestNormalOperations(t *testing.T) {
),
)
s.mock.
ExpectQuery("SELECT .+ FROM `mock-schema`\\.table_v\\d+").
ExpectQuery("SELECT .+ FROM `mock-schema`\\.`table_v\\d+`").
WithArgs("`db1`.`t2`").
WillReturnRows(
sqlmock.NewRows([]string{"status", "alloc_base", "table_id", "table_info", "kv_bytes", "kv_kvs", "kv_checksum"}).
Expand Down Expand Up @@ -345,11 +345,11 @@ func TestRemoveAllCheckpoints_SQL(t *testing.T) {

s.mock.ExpectBegin()
s.mock.
ExpectQuery("SELECT .+ FROM `mock-schema`\\.engine_v\\d+").
ExpectQuery("SELECT .+ FROM `mock-schema`\\.`engine_v\\d+`").
WithArgs("`db1`.`t2`").
WillReturnRows(sqlmock.NewRows([]string{"engine_id", "status"}))
s.mock.
ExpectQuery("SELECT (?s:.+) FROM `mock-schema`\\.chunk_v\\d+").
ExpectQuery("SELECT (?s:.+) FROM `mock-schema`\\.`chunk_v\\d+`").
WithArgs("`db1`.`t2`").
WillReturnRows(
sqlmock.NewRows([]string{
Expand All @@ -358,7 +358,7 @@ func TestRemoveAllCheckpoints_SQL(t *testing.T) {
"kvc_bytes", "kvc_kvs", "kvc_checksum", "unix_timestamp(create_time)",
}))
s.mock.
ExpectQuery("SELECT .+ FROM `mock-schema`\\.table_v\\d+").
ExpectQuery("SELECT .+ FROM `mock-schema`\\.`table_v\\d+`").
WithArgs("`db1`.`t2`").
WillReturnRows(sqlmock.NewRows([]string{"status", "alloc_base", "table_id"}))
s.mock.ExpectRollback()
Expand All @@ -373,15 +373,15 @@ func TestRemoveOneCheckpoint_SQL(t *testing.T) {

s.mock.ExpectBegin()
s.mock.
ExpectExec("DELETE FROM `mock-schema`\\.chunk_v\\d+ WHERE table_name = \\?").
ExpectExec("DELETE FROM `mock-schema`\\.`chunk_v\\d+` WHERE table_name = \\?").
WithArgs("`db1`.`t2`").
WillReturnResult(sqlmock.NewResult(0, 4))
s.mock.
ExpectExec("DELETE FROM `mock-schema`\\.engine_v\\d+ WHERE table_name = \\?").
ExpectExec("DELETE FROM `mock-schema`\\.`engine_v\\d+` WHERE table_name = \\?").
WithArgs("`db1`.`t2`").
WillReturnResult(sqlmock.NewResult(0, 2))
s.mock.
ExpectExec("DELETE FROM `mock-schema`\\.table_v\\d+ WHERE table_name = \\?").
ExpectExec("DELETE FROM `mock-schema`\\.`table_v\\d+` WHERE table_name = \\?").
WithArgs("`db1`.`t2`").
WillReturnResult(sqlmock.NewResult(0, 1))
s.mock.ExpectCommit()
Expand All @@ -395,12 +395,12 @@ func TestIgnoreAllErrorCheckpoints_SQL(t *testing.T) {

s.mock.ExpectBegin()
s.mock.
ExpectExec("UPDATE `mock-schema`\\.engine_v\\d+ SET status = \\? WHERE 'all' = \\? AND status <= \\?").
WithArgs(checkpoints.CheckpointStatusLoaded, sqlmock.AnyArg(), 25).
ExpectExec("UPDATE `mock-schema`\\.`engine_v\\d+` SET status = \\? WHERE status <= \\?").
WithArgs(checkpoints.CheckpointStatusLoaded, 25).
WillReturnResult(sqlmock.NewResult(5, 3))
s.mock.
ExpectExec("UPDATE `mock-schema`\\.table_v\\d+ SET status = \\? WHERE 'all' = \\? AND status <= \\?").
WithArgs(checkpoints.CheckpointStatusLoaded, sqlmock.AnyArg(), 25).
ExpectExec("UPDATE `mock-schema`\\.`table_v\\d+` SET status = \\? WHERE status <= \\?").
WithArgs(checkpoints.CheckpointStatusLoaded, 25).
WillReturnResult(sqlmock.NewResult(6, 2))
s.mock.ExpectCommit()

Expand All @@ -413,11 +413,11 @@ func TestIgnoreOneErrorCheckpoint(t *testing.T) {

s.mock.ExpectBegin()
s.mock.
ExpectExec("UPDATE `mock-schema`\\.engine_v\\d+ SET status = \\? WHERE table_name = \\? AND status <= \\?").
ExpectExec("UPDATE `mock-schema`\\.`engine_v\\d+` SET status = \\? WHERE table_name = \\? AND status <= \\?").
WithArgs(checkpoints.CheckpointStatusLoaded, "`db1`.`t2`", 25).
WillReturnResult(sqlmock.NewResult(5, 2))
s.mock.
ExpectExec("UPDATE `mock-schema`\\.table_v\\d+ SET status = \\? WHERE table_name = \\? AND status <= \\?").
ExpectExec("UPDATE `mock-schema`\\.`table_v\\d+` SET status = \\? WHERE table_name = \\? AND status <= \\?").
WithArgs(checkpoints.CheckpointStatusLoaded, "`db1`.`t2`", 25).
WillReturnResult(sqlmock.NewResult(6, 1))
s.mock.ExpectCommit()
Expand All @@ -431,23 +431,23 @@ func TestDestroyAllErrorCheckpoints_SQL(t *testing.T) {

s.mock.ExpectBegin()
s.mock.
ExpectQuery("SELECT (?s:.+)'all' = \\?").
WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg()).
ExpectQuery("SELECT (?s:.+)").
WithArgs(sqlmock.AnyArg()).
WillReturnRows(
sqlmock.NewRows([]string{"table_name", "__min__", "__max__"}).
AddRow("`db1`.`t2`", -1, 0),
)
s.mock.
ExpectExec("DELETE FROM `mock-schema`\\.chunk_v\\d+ WHERE table_name IN .+ 'all' = \\?").
WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg()).
ExpectExec("DELETE FROM `mock-schema`\\.`chunk_v\\d+` WHERE table_name IN").
WithArgs(sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 5))
s.mock.
ExpectExec("DELETE FROM `mock-schema`\\.engine_v\\d+ WHERE table_name IN .+ 'all' = \\?").
WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg()).
ExpectExec("DELETE FROM `mock-schema`\\.`engine_v\\d+` WHERE table_name IN").
WithArgs(sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 3))
s.mock.
ExpectExec("DELETE FROM `mock-schema`\\.table_v\\d+ WHERE 'all' = \\?").
WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg()).
ExpectExec("DELETE FROM `mock-schema`\\.`table_v\\d+`").
WithArgs(sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 2))
s.mock.ExpectCommit()

Expand All @@ -472,15 +472,15 @@ func TestDestroyOneErrorCheckpoints(t *testing.T) {
AddRow("`db1`.`t2`", -1, 0),
)
s.mock.
ExpectExec("DELETE FROM `mock-schema`\\.chunk_v\\d+ WHERE .+table_name = \\?").
ExpectExec("DELETE FROM `mock-schema`\\.`chunk_v\\d+` WHERE .+table_name = \\?").
WithArgs("`db1`.`t2`", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 4))
s.mock.
ExpectExec("DELETE FROM `mock-schema`\\.engine_v\\d+ WHERE .+table_name = \\?").
ExpectExec("DELETE FROM `mock-schema`\\.`engine_v\\d+` WHERE .+table_name = \\?").
WithArgs("`db1`.`t2`", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 2))
s.mock.
ExpectExec("DELETE FROM `mock-schema`\\.table_v\\d+ WHERE table_name = \\?").
ExpectExec("DELETE FROM `mock-schema`\\.`table_v\\d+` WHERE table_name = \\?").
WithArgs("`db1`.`t2`", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
s.mock.ExpectCommit()
Expand All @@ -500,7 +500,7 @@ func TestDump(t *testing.T) {
tm := time.Unix(1555555555, 0).UTC()

s.mock.
ExpectQuery("SELECT (?s:.+) FROM `mock-schema`\\.chunk_v\\d+").
ExpectQuery("SELECT (?s:.+) FROM `mock-schema`\\.`chunk_v\\d+`").
WillReturnRows(
sqlmock.NewRows([]string{
"table_name", "path", "offset", "type", "compression", "sort_key", "file_size", "columns",
Expand All @@ -525,7 +525,7 @@ func TestDump(t *testing.T) {
)

s.mock.
ExpectQuery("SELECT .+ FROM `mock-schema`\\.engine_v\\d+").
ExpectQuery("SELECT .+ FROM `mock-schema`\\.`engine_v\\d+`").
WillReturnRows(
sqlmock.NewRows([]string{"table_name", "engine_id", "status", "create_time", "update_time"}).
AddRow("`db1`.`t2`", -1, 30, tm, tm).
Expand All @@ -541,7 +541,7 @@ func TestDump(t *testing.T) {
csvBuilder.String())

s.mock.
ExpectQuery("SELECT .+ FROM `mock-schema`\\.table_v\\d+").
ExpectQuery("SELECT .+ FROM `mock-schema`\\.`table_v\\d+`").
WillReturnRows(
sqlmock.NewRows([]string{"task_id", "table_name", "hash", "status", "alloc_base", "create_time", "update_time"}).
AddRow(1555555555, "`db1`.`t2`", 0, 90, 132861, tm, tm),
Expand All @@ -564,16 +564,16 @@ func TestMoveCheckpoints(t *testing.T) {
ExpectExec("CREATE SCHEMA IF NOT EXISTS `mock-schema\\.12345678\\.bak`").
WillReturnResult(sqlmock.NewResult(1, 1))
s.mock.
ExpectExec("RENAME TABLE `mock-schema`\\.chunk_v\\d+ TO `mock-schema\\.12345678\\.bak`\\.chunk_v\\d+").
ExpectExec("RENAME TABLE `mock-schema`\\.`chunk_v\\d+` TO `mock-schema\\.12345678\\.bak`\\.`chunk_v\\d+`").
WillReturnResult(sqlmock.NewResult(0, 1))
s.mock.
ExpectExec("RENAME TABLE `mock-schema`\\.engine_v\\d+ TO `mock-schema\\.12345678\\.bak`\\.engine_v\\d+").
ExpectExec("RENAME TABLE `mock-schema`\\.`engine_v\\d+` TO `mock-schema\\.12345678\\.bak`\\.`engine_v\\d+`").
WillReturnResult(sqlmock.NewResult(0, 1))
s.mock.
ExpectExec("RENAME TABLE `mock-schema`\\.table_v\\d+ TO `mock-schema\\.12345678\\.bak`\\.table_v\\d+").
ExpectExec("RENAME TABLE `mock-schema`\\.`table_v\\d+` TO `mock-schema\\.12345678\\.bak`\\.`table_v\\d+`").
WillReturnResult(sqlmock.NewResult(0, 1))
s.mock.
ExpectExec("RENAME TABLE `mock-schema`\\.task_v\\d+ TO `mock-schema\\.12345678\\.bak`\\.task_v\\d+").
ExpectExec("RENAME TABLE `mock-schema`\\.`task_v\\d+` TO `mock-schema\\.12345678\\.bak`\\.`task_v\\d+`").
WillReturnResult(sqlmock.NewResult(0, 1))

err := s.cpdb.MoveCheckpoints(ctx, 12345678)
Expand Down
Loading

0 comments on commit 5fee756

Please sign in to comment.