Skip to content

Commit

Permalink
dumpling: fix cannot dump data bug when dumpling fails to check has t…
Browse files Browse the repository at this point in the history
  • Loading branch information
lichunzhu authored and blacktear23 committed Feb 15, 2023
1 parent 0bc5691 commit 4c85870
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 2 deletions.
1 change: 1 addition & 0 deletions dumpling/export/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ go_test(
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_prometheus_client_golang//prometheus/collectors",
"@com_github_stretchr_testify//require",
"@org_golang_x_sync//errgroup",
Expand Down
2 changes: 1 addition & 1 deletion dumpling/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -1575,7 +1575,7 @@ func setSessionParam(d *Dumper) error {
d.L().Info("cannot check whether TiDB has TiKV, will apply tidb_snapshot by default. This won't affect dump process", log.ShortError(err))
}
if conf.ServerInfo.HasTiKV {
sessionParam["tidb_snapshot"] = snapshot
sessionParam[snapshotVar] = snapshot
}
}
}
Expand Down
63 changes: 63 additions & 0 deletions dumpling/export/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (

"github.com/DATA-DOG/go-sqlmock"
"github.com/coreos/go-semver/semver"
"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/version"
tcontext "github.com/pingcap/tidb/dumpling/context"
"github.com/pingcap/tidb/parser"
Expand Down Expand Up @@ -289,3 +291,64 @@ func TestSetDefaultSessionParams(t *testing.T) {
require.Equal(t, testCase.expectedParams, testCase.sessionParams)
}
}

func TestSetSessionParams(t *testing.T) {
// case 1: fail to set tidb_snapshot, should return error with hint
db, mock, err := sqlmock.New()
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()

mock.ExpectQuery("SELECT @@tidb_config").
WillReturnError(errors.New("mock error"))
mock.ExpectQuery("SELECT COUNT\\(1\\) as c FROM MYSQL.TiDB WHERE VARIABLE_NAME='tikv_gc_safe_point'").
WillReturnError(errors.New("mock error"))
tikvErr := &mysql.MySQLError{
Number: 1105,
Message: "can not get 'tikv_gc_safe_point'",
}
mock.ExpectExec("SET SESSION tidb_snapshot").
WillReturnError(tikvErr)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/dumpling/export/SkipResetDB", "return(true)"))
defer failpoint.Disable("github.com/pingcap/tidb/dumpling/export/SkipResetDB=return(true)")

tctx, cancel := tcontext.Background().WithLogger(appLogger).WithCancel()
defer cancel()

conf := DefaultConfig()
conf.ServerInfo = version.ServerInfo{
ServerType: version.ServerTypeTiDB,
HasTiKV: false,
}
conf.Snapshot = "439153276059648000"
conf.Consistency = ConsistencyTypeSnapshot
d := &Dumper{
tctx: tctx,
conf: conf,
cancelCtx: cancel,
dbHandle: db,
}
err = setSessionParam(d)
require.ErrorContains(t, err, "consistency=none")

// case 2: fail to set other
conf.ServerInfo = version.ServerInfo{
ServerType: version.ServerTypeMySQL,
HasTiKV: false,
}
conf.Snapshot = ""
conf.Consistency = ConsistencyTypeFlush
conf.SessionParams = map[string]interface{}{
"mock": "UTC",
}
d.dbHandle = db
mock.ExpectExec("SET SESSION mock").
WillReturnError(errors.New("Unknown system variable mock"))
mock.ExpectClose()
mock.ExpectClose()

err = setSessionParam(d)
require.NoError(t, err)
}
8 changes: 7 additions & 1 deletion dumpling/export/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

const (
orderByTiDBRowID = "ORDER BY `_tidb_rowid`"
snapshotVar = "tidb_snapshot"
)

type listTableType int
Expand Down Expand Up @@ -851,7 +852,9 @@ func resetDBWithSessionParams(tctx *tcontext.Context, db *sql.DB, cfg *mysql.Con
s := fmt.Sprintf("SET SESSION %s = ?", k)
_, err := db.ExecContext(tctx, s, pv)
if err != nil {
if isUnknownSystemVariableErr(err) {
if k == snapshotVar {
err = errors.Annotate(err, "fail to set snapshot for tidb, please set --consistency=none/--consistency=lock or fix snapshot problem")
} else if isUnknownSystemVariableErr(err) {
tctx.L().Info("session variable is not supported by db", zap.String("variable", k), zap.Reflect("value", v))
continue
}
Expand All @@ -876,6 +879,9 @@ func resetDBWithSessionParams(tctx *tcontext.Context, db *sql.DB, cfg *mysql.Con
}
cfg.Params[k] = s
}
failpoint.Inject("SkipResetDB", func(_ failpoint.Value) {
failpoint.Return(db, nil)
})

db.Close()
c, err := mysql.NewConnector(cfg)
Expand Down

0 comments on commit 4c85870

Please sign in to comment.