From b4f89c8402d085f82cc1b9b8dc5bda53e8bf0bcd Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 29 Jul 2019 19:44:14 +0800 Subject: [PATCH 01/13] restore: update and restore GCLifeTime once when parallel --- lightning/restore/restore.go | 32 +++++++++++++------- lightning/restore/restore_test.go | 50 +++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 10 deletions(-) diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 4ebefcba1..3002d40ee 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -79,6 +79,11 @@ var ( // DeliverPauser is a shared pauser to pause progress to (*chunkRestore).encodeLoop var DeliverPauser = common.NewPauser() +var ( + runningChecksumJobs int32 + oriGCLifeTime string +) + func init() { cfg := tidbcfg.GetGlobalConfig() cfg.Log.SlowThreshold = 3000 @@ -1325,19 +1330,26 @@ func setSessionConcurrencyVars(ctx context.Context, db *sql.DB, dsn config.DBSto // DoChecksum do checksum for tables. // table should be in ., format. e.g. foo.bar func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum, error) { - ori, err := increaseGCLifeTime(ctx, db) - if err != nil { - return nil, errors.Trace(err) + var err error + + if atomic.AddInt32(&runningChecksumJobs, 1) == 1 { + oriGCLifeTime, err = increaseGCLifeTime(ctx, db) + if err != nil { + return nil, errors.Trace(err) + } } + // set it back finally defer func() { - err := UpdateGCLifeTime(ctx, db, ori) - if err != nil { - query := fmt.Sprintf("UPDATE mysql.tidb SET VARIABLE_VALUE = '%s' WHERE VARIABLE_NAME = 'tikv_gc_life_time'", ori) - log.L().Warn("revert GC lifetime failed, please reset the GC lifetime manually after Lightning completed", - zap.String("query", query), - log.ShortError(err), - ) + if atomic.AddInt32(&runningChecksumJobs, -1) == 0 { + err := UpdateGCLifeTime(ctx, db, oriGCLifeTime) + if err != nil { + query := fmt.Sprintf("UPDATE mysql.tidb SET VARIABLE_VALUE = '%s' WHERE VARIABLE_NAME = 'tikv_gc_life_time'", oriGCLifeTime) + log.L().Warn("revert GC lifetime failed, please reset the GC lifetime manually after Lightning completed", + zap.String("query", query), + log.ShortError(err), + ) + } } }() diff --git a/lightning/restore/restore_test.go b/lightning/restore/restore_test.go index 7812acafd..f3105224e 100644 --- a/lightning/restore/restore_test.go +++ b/lightning/restore/restore_test.go @@ -15,6 +15,9 @@ package restore import ( "context" + "sync" + "time" + // "encoding/json" "fmt" "io/ioutil" @@ -148,6 +151,53 @@ func (s *restoreSuite) TestDoChecksum(c *C) { }) } +func (s *restoreSuite) TestDoChecksumParallel(c *C) { + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + defer db.Close() + + mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). + WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("10m")) + mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). + WithArgs("100h0m0s"). + WillReturnResult(sqlmock.NewResult(1, 1)) + for i := 0; i < 5; i++ { + mock.ExpectQuery("\\QADMIN CHECKSUM TABLE `test`.`t`\\E"). + WillDelayFor(100 * time.Millisecond). + WillReturnRows( + sqlmock.NewRows([]string{"Db_name", "Table_name", "Checksum_crc64_xor", "Total_kvs", "Total_bytes"}). + AddRow("test", "t", 8520875019404689597, 7296873, 357601387), + ) + } + mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). + WithArgs("10m"). + WillReturnResult(sqlmock.NewResult(2, 1)) + mock.ExpectClose() + + var wg sync.WaitGroup + ctx := context.Background() + wg.Add(5) + mock.MatchExpectationsInOrder(false) + for i := 0; i < 5; i++ { + go func() { + checksum, err := DoChecksum(ctx, db, "`test`.`t`") + c.Assert(err, IsNil) + c.Assert(*checksum, DeepEquals, RemoteChecksum{ + Schema: "test", + Table: "t", + Checksum: 8520875019404689597, + TotalKVs: 7296873, + TotalBytes: 357601387, + }) + wg.Done() + }() + } + wg.Wait() + mock.MatchExpectationsInOrder(true) + err = mock.ExpectationsWereMet() + c.Assert(err, IsNil) +} + func (s *restoreSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) From 5c39c9851c7a8ce233a1f004a98c90f9e7feca89 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 1 Aug 2019 10:56:05 +0800 Subject: [PATCH 02/13] Update lightning/restore/restore.go Co-Authored-By: amyangfei --- lightning/restore/restore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 6b926a8a4..6a020b468 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -1363,7 +1363,7 @@ func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum, var err error helper, ok := ctx.Value(&gcLifeTimeKey).(gcLifeTimeHelper) if !ok { - return nil, errors.Errorf("No gcLifeTimeHelper found in context, check context initialization") + return nil, errors.New("No gcLifeTimeHelper found in context, check context initialization") } helper.increaseGCLock.Lock() From 2c03a760fd62a58bc2a708c44733114afb4b8cb3 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 1 Aug 2019 10:56:37 +0800 Subject: [PATCH 03/13] Update lightning/restore/restore.go Co-Authored-By: amyangfei --- lightning/restore/restore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 6a020b468..f11a7ca26 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -1372,7 +1372,7 @@ func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum, err = increaseGCLifeTime(ctx, db) if err != nil { helper.increaseGCLock.Unlock() - return nil, errors.Trace(err) + return nil, err } } helper.increaseGCLock.Unlock() From bf3c4929744bebca29d8125d74e845974dfbdb77 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 1 Aug 2019 10:56:54 +0800 Subject: [PATCH 04/13] Update lightning/restore/restore.go Co-Authored-By: amyangfei --- lightning/restore/restore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index f11a7ca26..ba8aade48 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -1441,7 +1441,7 @@ func increaseGCLifeTime(ctx context.Context, db *sql.DB) (err error) { if increaseGCLifeTime { err = UpdateGCLifeTime(ctx, db, defaultGCLifeTime.String()) if err != nil { - return errors.Trace(err) + return err } } From 3ffb925fce85d920d6b7bc25c17d95f0846dd9f8 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 1 Aug 2019 11:52:42 +0800 Subject: [PATCH 05/13] restore: fix bug in reviewing --- lightning/restore/restore.go | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index ba8aade48..ea59183f7 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -473,16 +473,12 @@ func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan } type gcLifeTimeHelper struct { - increaseGCLock *sync.Mutex - runningJobs *int32 - oriGCLifeTime string + runningJobsLock *sync.Mutex + runningJobs *int32 + oriGCLifeTime string } -var ( - gcLifeTimeKey struct{} - // in case there're multiple `restoreTables` jobs on one TiDB, one TiDB requires one lock - gcLifeTimeLock sync.Mutex -) +var gcLifeTimeKey struct{} func (rc *RestoreController) restoreTables(ctx context.Context) error { logTask := log.L().Begin(zap.InfoLevel, "restore all tables data") @@ -504,7 +500,11 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error { if err != nil { return err } - var runningJobs int32 + + var ( + gcLifeTimeLock sync.Mutex + runningJobs int32 + ) helper := gcLifeTimeHelper{ &gcLifeTimeLock, &runningJobs, @@ -1366,20 +1366,24 @@ func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum, return nil, errors.New("No gcLifeTimeHelper found in context, check context initialization") } - helper.increaseGCLock.Lock() + helper.runningJobsLock.Lock() *(helper.runningJobs) += 1 if *(helper.runningJobs) == 1 { err = increaseGCLifeTime(ctx, db) if err != nil { - helper.increaseGCLock.Unlock() + helper.runningJobsLock.Unlock() return nil, err } } - helper.increaseGCLock.Unlock() + helper.runningJobsLock.Unlock() // set it back finally defer func() { - if atomic.AddInt32(helper.runningJobs, -1) == 0 { + helper.runningJobsLock.Lock() + defer helper.runningJobsLock.Unlock() + + *(helper.runningJobs) -= 1 + if *(helper.runningJobs) == 0 { err := UpdateGCLifeTime(ctx, db, helper.oriGCLifeTime) if err != nil { query := fmt.Sprintf( From 0c3e19a741f718d582de39a5f22b957e6ebe50d1 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 1 Aug 2019 13:31:30 +0800 Subject: [PATCH 06/13] restore: call ObtainGCLifeTime closer to use --- lightning/restore/restore.go | 23 +++++++++++++---------- lightning/restore/restore_test.go | 20 +++++++++----------- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index ea59183f7..1899a2dd6 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -475,7 +475,7 @@ func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan type gcLifeTimeHelper struct { runningJobsLock *sync.Mutex runningJobs *int32 - oriGCLifeTime string + oriGCLifeTime *string } var gcLifeTimeKey struct{} @@ -496,19 +496,16 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error { } taskCh := make(chan task, rc.cfg.App.IndexConcurrency) defer close(taskCh) - oriGCLifeTime, err := ObtainGCLifeTime(ctx, rc.tidbMgr.db) - if err != nil { - return err - } var ( gcLifeTimeLock sync.Mutex runningJobs int32 + oriGCLifeTime string ) helper := gcLifeTimeHelper{ &gcLifeTimeLock, &runningJobs, - oriGCLifeTime, + &oriGCLifeTime, } ctx2 := context.WithValue(ctx, &gcLifeTimeKey, helper) for i := 0; i < rc.cfg.App.IndexConcurrency; i++ { @@ -562,7 +559,7 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error { wg.Wait() stopPeriodicActions <- struct{}{} - err = restoreErr.Get() + err := restoreErr.Get() logTask.End(zap.ErrorLevel, err) return err } @@ -1369,6 +1366,12 @@ func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum, helper.runningJobsLock.Lock() *(helper.runningJobs) += 1 if *(helper.runningJobs) == 1 { + oriGCLifeTime, err := ObtainGCLifeTime(ctx, db) + if err != nil { + helper.runningJobsLock.Unlock() + return nil, err + } + *helper.oriGCLifeTime = oriGCLifeTime err = increaseGCLifeTime(ctx, db) if err != nil { helper.runningJobsLock.Unlock() @@ -1384,11 +1387,11 @@ func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum, *(helper.runningJobs) -= 1 if *(helper.runningJobs) == 0 { - err := UpdateGCLifeTime(ctx, db, helper.oriGCLifeTime) + err := UpdateGCLifeTime(ctx, db, *helper.oriGCLifeTime) if err != nil { query := fmt.Sprintf( "UPDATE mysql.tidb SET VARIABLE_VALUE = '%s' WHERE VARIABLE_NAME = 'tikv_gc_life_time'", - helper.oriGCLifeTime, + *helper.oriGCLifeTime, ) log.L().Warn("revert GC lifetime failed, please reset the GC lifetime manually after Lightning completed", zap.String("query", query), @@ -1427,7 +1430,7 @@ func increaseGCLifeTime(ctx context.Context, db *sql.DB) (err error) { // try to get gcLifeTimeHelper from context first. // DoChecksum has assure this getting action success. helper, _ := ctx.Value(&gcLifeTimeKey).(gcLifeTimeHelper) - oriGCLifeTime := helper.oriGCLifeTime + oriGCLifeTime := *helper.oriGCLifeTime var increaseGCLifeTime bool if oriGCLifeTime != "" { diff --git a/lightning/restore/restore_test.go b/lightning/restore/restore_test.go index 4ae6fc02a..964e89b2d 100644 --- a/lightning/restore/restore_test.go +++ b/lightning/restore/restore_test.go @@ -15,7 +15,6 @@ package restore import ( "context" - "database/sql" "sync" "time" @@ -120,17 +119,16 @@ func (s *restoreSuite) TestErrorSummaries(c *C) { }) } -func MockDoChecksumCtx(c *C, db *sql.DB) (context.Context) { +func MockDoChecksumCtx() context.Context { ctx := context.Background() var gcLifeTimeLock sync.Mutex var runningJobs int32 - oriGCLifeTime, err := ObtainGCLifeTime(ctx, db) - c.Assert(err, IsNil) + var oriGCLifeTime string helper := gcLifeTimeHelper{ - &gcLifeTimeLock, // from restore.go + &gcLifeTimeLock, &runningJobs, - oriGCLifeTime, + &oriGCLifeTime, } return context.WithValue(ctx, &gcLifeTimeKey, helper) } @@ -155,7 +153,7 @@ func (s *restoreSuite) TestDoChecksum(c *C) { WillReturnResult(sqlmock.NewResult(2, 1)) mock.ExpectClose() - ctx := MockDoChecksumCtx(c, db) + ctx := MockDoChecksumCtx() checksum, err := DoChecksum(ctx, db, "`test`.`t`") c.Assert(err, IsNil) c.Assert(*checksum, DeepEquals, RemoteChecksum{ @@ -190,7 +188,7 @@ func (s *restoreSuite) TestDoChecksumParallel(c *C) { WillReturnResult(sqlmock.NewResult(2, 1)) mock.ExpectClose() - ctx := MockDoChecksumCtx(c, db) + ctx := MockDoChecksumCtx() var wg sync.WaitGroup wg.Add(5) @@ -227,7 +225,7 @@ func (s *restoreSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) { WithArgs("300h"). WillReturnResult(sqlmock.NewResult(1, 1)) - ctx := MockDoChecksumCtx(c, db) + ctx := MockDoChecksumCtx() _, err = DoChecksum(ctx, db, "`test`.`t`") c.Assert(err, ErrorMatches, "compute remote checksum failed: mock syntax error.*") c.Assert(mock.ExpectationsWereMet(), IsNil) @@ -445,7 +443,7 @@ func (s *tableRestoreSuite) TestCompareChecksumSuccess(c *C) { WithArgs("10m"). WillReturnResult(sqlmock.NewResult(2, 1)) - ctx := MockDoChecksumCtx(c, db) + ctx := MockDoChecksumCtx() err = s.tr.compareChecksum(ctx, db, verification.MakeKVChecksum(1234567, 12345, 1234567890)) c.Assert(err, IsNil) @@ -472,7 +470,7 @@ func (s *tableRestoreSuite) TestCompareChecksumFailure(c *C) { WithArgs("10m"). WillReturnResult(sqlmock.NewResult(2, 1)) - ctx := MockDoChecksumCtx(c, db) + ctx := MockDoChecksumCtx() err = s.tr.compareChecksum(ctx, db, verification.MakeKVChecksum(9876543, 54321, 1357924680)) c.Assert(err, ErrorMatches, "checksum mismatched.*") From 3401f0c8bdaf650f498eff89a352c23ef6c4be12 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 1 Aug 2019 15:42:40 +0800 Subject: [PATCH 07/13] restore: improve readabiilty --- lightning/restore/restore.go | 114 +++++++++++++++++------------- lightning/restore/restore_test.go | 2 +- 2 files changed, 64 insertions(+), 52 deletions(-) diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 1899a2dd6..46499e3d3 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -472,12 +472,64 @@ func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan } } -type gcLifeTimeHelper struct { +type gcLifeTimeManager struct { runningJobsLock *sync.Mutex runningJobs *int32 oriGCLifeTime *string } +func newManager() gcLifeTimeManager { + var ( + gcLifeTimeLock sync.Mutex + runningJobs int32 + oriGCLifeTime string + ) + return gcLifeTimeManager{ + &gcLifeTimeLock, + &runningJobs, + &oriGCLifeTime, + } +} + +func (m gcLifeTimeManager) addOneJob(ctx context.Context, db *sql.DB) error { + m.runningJobsLock.Lock() + defer m.runningJobsLock.Unlock() + + *m.runningJobs += 1 + if *m.runningJobs == 1 { + oriGCLifeTime, err := ObtainGCLifeTime(ctx, db) + if err != nil { + return err + } + *m.oriGCLifeTime = oriGCLifeTime + err = increaseGCLifeTime(ctx, db) + if err != nil { + return err + } + } + return nil +} + +func (m gcLifeTimeManager) removeOneJob(ctx context.Context, db *sql.DB) { + m.runningJobsLock.Lock() + defer m.runningJobsLock.Unlock() + + *m.runningJobs -= 1 + if *m.runningJobs == 0 { + err := UpdateGCLifeTime(ctx, db, *m.oriGCLifeTime) + if err != nil { + query := fmt.Sprintf( + "UPDATE mysql.tidb SET VARIABLE_VALUE = '%s' WHERE VARIABLE_NAME = 'tikv_gc_life_time'", + *m.oriGCLifeTime, + ) + log.L().Warn("revert GC lifetime failed, please reset the GC lifetime manually after Lightning completed", + zap.String("query", query), + log.ShortError(err), + ) + } + } +} + var gcLifeTimeKey struct{} func (rc *RestoreController) restoreTables(ctx context.Context) error { @@ -497,17 +549,8 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error { taskCh := make(chan task, rc.cfg.App.IndexConcurrency) defer close(taskCh) - var ( - gcLifeTimeLock sync.Mutex - runningJobs int32 - oriGCLifeTime string - ) - helper := gcLifeTimeHelper{ - &gcLifeTimeLock, - &runningJobs, - &oriGCLifeTime, - } - ctx2 := context.WithValue(ctx, &gcLifeTimeKey, helper) + manager := newManager() + ctx2 := context.WithValue(ctx, &gcLifeTimeKey, manager) for i := 0; i < rc.cfg.App.IndexConcurrency; i++ { go func() { for task := range taskCh { @@ -1358,48 +1401,17 @@ func setSessionConcurrencyVars(ctx context.Context, db *sql.DB, dsn config.DBSto // table should be in .
, format. e.g. foo.bar func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum, error) { var err error - helper, ok := ctx.Value(&gcLifeTimeKey).(gcLifeTimeHelper) + manager, ok := ctx.Value(&gcLifeTimeKey).(gcLifeTimeManager) if !ok { - return nil, errors.New("No gcLifeTimeHelper found in context, check context initialization") + return nil, errors.New("No gcLifeTimeManager found in context, check context initialization") } - helper.runningJobsLock.Lock() - *(helper.runningJobs) += 1 - if *(helper.runningJobs) == 1 { - oriGCLifeTime, err := ObtainGCLifeTime(ctx, db) - if err != nil { - helper.runningJobsLock.Unlock() - return nil, err - } - *helper.oriGCLifeTime = oriGCLifeTime - err = increaseGCLifeTime(ctx, db) - if err != nil { - helper.runningJobsLock.Unlock() - return nil, err - } + if err = manager.addOneJob(ctx, db); err != nil { + return nil, err } - helper.runningJobsLock.Unlock() // set it back finally - defer func() { - helper.runningJobsLock.Lock() - defer helper.runningJobsLock.Unlock() - - *(helper.runningJobs) -= 1 - if *(helper.runningJobs) == 0 { - err := UpdateGCLifeTime(ctx, db, *helper.oriGCLifeTime) - if err != nil { - query := fmt.Sprintf( - "UPDATE mysql.tidb SET VARIABLE_VALUE = '%s' WHERE VARIABLE_NAME = 'tikv_gc_life_time'", - *helper.oriGCLifeTime, - ) - log.L().Warn("revert GC lifetime failed, please reset the GC lifetime manually after Lightning completed", - zap.String("query", query), - log.ShortError(err), - ) - } - } - }() + defer manager.removeOneJob(ctx, db) task := log.With(zap.String("table", table)).Begin(zap.InfoLevel, "remote checksum") @@ -1427,10 +1439,10 @@ func increaseGCLifeTime(ctx context.Context, db *sql.DB) (err error) { // checksum command usually takes a long time to execute, // so here need to increase the gcLifeTime for single transaction. - // try to get gcLifeTimeHelper from context first. + // try to get gcLifeTimeManager from context first. // DoChecksum has assure this getting action success. - helper, _ := ctx.Value(&gcLifeTimeKey).(gcLifeTimeHelper) - oriGCLifeTime := *helper.oriGCLifeTime + manager, _ := ctx.Value(&gcLifeTimeKey).(gcLifeTimeManager) + oriGCLifeTime := *manager.oriGCLifeTime var increaseGCLifeTime bool if oriGCLifeTime != "" { diff --git a/lightning/restore/restore_test.go b/lightning/restore/restore_test.go index 964e89b2d..2f095a8d0 100644 --- a/lightning/restore/restore_test.go +++ b/lightning/restore/restore_test.go @@ -125,7 +125,7 @@ func MockDoChecksumCtx() context.Context { var runningJobs int32 var oriGCLifeTime string - helper := gcLifeTimeHelper{ + helper := gcLifeTimeManager{ &gcLifeTimeLock, &runningJobs, &oriGCLifeTime, From 6396cb05422c00a608eaeb2f90150b9f9dce95eb Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 1 Aug 2019 23:47:30 +0800 Subject: [PATCH 08/13] restore: reduce struct copy and pointer deref --- lightning/restore/restore.go | 47 +++++++++++++------------------ lightning/restore/restore_test.go | 12 ++------ 2 files changed, 21 insertions(+), 38 deletions(-) diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 46499e3d3..462acb2cd 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -473,35 +473,27 @@ func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan } type gcLifeTimeManager struct { - runningJobsLock *sync.Mutex - runningJobs *int32 - oriGCLifeTime *string + runningJobsLock sync.Mutex + runningJobs int + oriGCLifeTime string } -func newManager() gcLifeTimeManager { - var ( - gcLifeTimeLock sync.Mutex - runningJobs int32 - oriGCLifeTime string - ) - return gcLifeTimeManager{ - &gcLifeTimeLock, - &runningJobs, - &oriGCLifeTime, - } +func newManager() *gcLifeTimeManager { + // Default values of three member are enough to initialize this struct + return &gcLifeTimeManager{} } -func (m gcLifeTimeManager) addOneJob(ctx context.Context, db *sql.DB) error { +func (m *gcLifeTimeManager) addOneJob(ctx context.Context, db *sql.DB) error { m.runningJobsLock.Lock() defer m.runningJobsLock.Unlock() - *m.runningJobs += 1 - if *m.runningJobs == 1 { + m.runningJobs += 1 + if m.runningJobs == 1 { oriGCLifeTime, err := ObtainGCLifeTime(ctx, db) if err != nil { return err } - *m.oriGCLifeTime = oriGCLifeTime + m.oriGCLifeTime = oriGCLifeTime err = increaseGCLifeTime(ctx, db) if err != nil { return err @@ -510,17 +502,17 @@ func (m gcLifeTimeManager) addOneJob(ctx context.Context, db *sql.DB) error { return nil } -func (m gcLifeTimeManager) removeOneJob(ctx context.Context, db *sql.DB) { +func (m *gcLifeTimeManager) removeOneJob(ctx context.Context, db *sql.DB) { m.runningJobsLock.Lock() defer m.runningJobsLock.Unlock() - *m.runningJobs -= 1 - if *m.runningJobs == 0 { - err := UpdateGCLifeTime(ctx, db, *m.oriGCLifeTime) + m.runningJobs -= 1 + if m.runningJobs == 0 { + err := UpdateGCLifeTime(ctx, db, m.oriGCLifeTime) if err != nil { query := fmt.Sprintf( "UPDATE mysql.tidb SET VARIABLE_VALUE = '%s' WHERE VARIABLE_NAME = 'tikv_gc_life_time'", - *m.oriGCLifeTime, + m.oriGCLifeTime, ) log.L().Warn("revert GC lifetime failed, please reset the GC lifetime manually after Lightning completed", zap.String("query", query), @@ -1401,7 +1393,7 @@ func setSessionConcurrencyVars(ctx context.Context, db *sql.DB, dsn config.DBSto // table should be in .
, format. e.g. foo.bar func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum, error) { var err error - manager, ok := ctx.Value(&gcLifeTimeKey).(gcLifeTimeManager) + manager, ok := ctx.Value(&gcLifeTimeKey).(*gcLifeTimeManager) if !ok { return nil, errors.New("No gcLifeTimeManager found in context, check context initialization") } @@ -1441,12 +1433,11 @@ func increaseGCLifeTime(ctx context.Context, db *sql.DB) (err error) { // try to get gcLifeTimeManager from context first. // DoChecksum has assure this getting action success. - manager, _ := ctx.Value(&gcLifeTimeKey).(gcLifeTimeManager) - oriGCLifeTime := *manager.oriGCLifeTime + manager, _ := ctx.Value(&gcLifeTimeKey).(*gcLifeTimeManager) var increaseGCLifeTime bool - if oriGCLifeTime != "" { - ori, err := time.ParseDuration(oriGCLifeTime) + if manager.oriGCLifeTime != "" { + ori, err := time.ParseDuration(manager.oriGCLifeTime) if err != nil { return errors.Trace(err) } diff --git a/lightning/restore/restore_test.go b/lightning/restore/restore_test.go index 2f095a8d0..bfd5f7613 100644 --- a/lightning/restore/restore_test.go +++ b/lightning/restore/restore_test.go @@ -121,16 +121,8 @@ func (s *restoreSuite) TestErrorSummaries(c *C) { func MockDoChecksumCtx() context.Context { ctx := context.Background() - var gcLifeTimeLock sync.Mutex - var runningJobs int32 - var oriGCLifeTime string - - helper := gcLifeTimeManager{ - &gcLifeTimeLock, - &runningJobs, - &oriGCLifeTime, - } - return context.WithValue(ctx, &gcLifeTimeKey, helper) + manager := newManager() + return context.WithValue(ctx, &gcLifeTimeKey, manager) } func (s *restoreSuite) TestDoChecksum(c *C) { From a5ea50606e5e4676fe9d8b2312aaecb6445e9b35 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 2 Aug 2019 14:25:14 +0800 Subject: [PATCH 09/13] restore: fix bug in reviewing --- lightning/restore/restore.go | 12 +++++++-- lightning/restore/restore_test.go | 41 ++++++++++++++++++++++++++++--- 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 462acb2cd..e13af8d25 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -483,12 +483,15 @@ func newManager() *gcLifeTimeManager { return &gcLifeTimeManager{} } +// Pre- and post-condition: +// if m.runningJobs == 0, GC life time has not been increased. +// if m.runningJobs > 0, GC life time has been increased. +// m.runningJobs won't be negative(overflow) since index concurrency is relatively small func (m *gcLifeTimeManager) addOneJob(ctx context.Context, db *sql.DB) error { m.runningJobsLock.Lock() defer m.runningJobsLock.Unlock() - m.runningJobs += 1 - if m.runningJobs == 1 { + if m.runningJobs == 0 { oriGCLifeTime, err := ObtainGCLifeTime(ctx, db) if err != nil { return err @@ -499,9 +502,14 @@ func (m *gcLifeTimeManager) addOneJob(ctx context.Context, db *sql.DB) error { return err } } + m.runningJobs += 1 return nil } +// Pre- and post-condition: +// if m.runningJobs == 0, GC life time has been tried to recovered. If this try fails, a warning will be printed. +// if m.runningJobs > 0, GC life time has not been recovered. +// m.runningJobs won't minus to negative since removeOneJob follows a successful addOneJob. func (m *gcLifeTimeManager) removeOneJob(ctx context.Context, db *sql.DB) { m.runningJobsLock.Lock() defer m.runningJobsLock.Unlock() diff --git a/lightning/restore/restore_test.go b/lightning/restore/restore_test.go index bfd5f7613..596ff9459 100644 --- a/lightning/restore/restore_test.go +++ b/lightning/restore/restore_test.go @@ -178,7 +178,6 @@ func (s *restoreSuite) TestDoChecksumParallel(c *C) { mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). WithArgs("10m"). WillReturnResult(sqlmock.NewResult(2, 1)) - mock.ExpectClose() ctx := MockDoChecksumCtx() @@ -199,9 +198,45 @@ func (s *restoreSuite) TestDoChecksumParallel(c *C) { }() } wg.Wait() - db.Close() - err = mock.ExpectationsWereMet() + + c.Assert(mock.ExpectationsWereMet(), IsNil) + mock.ExpectClose() +} + +func (s *restoreSuite) TestIncreaseGCLifeTimeFail(c *C) { + db, mock, err := sqlmock.New() c.Assert(err, IsNil) + defer db.Close() + + for i := 0; i < 5; i++ { + mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). + WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("10m")) + mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). + WithArgs("100h0m0s"). + WillReturnError(errors.Annotate(context.Canceled, "update gc error")) + } + // This recover GC Life Time SQL should not be executed + mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). + WithArgs("10m"). + WillReturnResult(sqlmock.NewResult(1, 1)) + + ctx := MockDoChecksumCtx() + var wg sync.WaitGroup + wg.Add(5) + for i := 0; i < 5; i++ { + go func() { + _, err = DoChecksum(ctx, db, "`test`.`t`") + c.Assert(err, ErrorMatches, "update GC lifetime failed: update gc error: context canceled") + wg.Done() + }() + } + wg.Wait() + + // validate no more redundant GC recover + err = mock.ExpectationsWereMet() + c.Assert(err, ErrorMatches, "there is a remaining expectation.*\n.*\n.*\n.*\n.*\n.*\n.*") + + mock.ExpectClose() } func (s *restoreSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) { From aed522661d52a38caaedd80c9d305473e47b4681 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 2 Aug 2019 15:36:22 +0800 Subject: [PATCH 10/13] Update lightning/restore/restore.go Co-Authored-By: kennytm --- lightning/restore/restore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index e13af8d25..ca56fd658 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -478,7 +478,7 @@ type gcLifeTimeManager struct { oriGCLifeTime string } -func newManager() *gcLifeTimeManager { +func newGCLifeTimeManager() *gcLifeTimeManager { // Default values of three member are enough to initialize this struct return &gcLifeTimeManager{} } From f724534c425440c6c338939b7ce066aea700f126 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 2 Aug 2019 15:53:30 +0800 Subject: [PATCH 11/13] restore: improve readability --- lightning/restore/restore.go | 2 +- lightning/restore/restore_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index ca56fd658..02d60ea53 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -549,7 +549,7 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error { taskCh := make(chan task, rc.cfg.App.IndexConcurrency) defer close(taskCh) - manager := newManager() + manager := newGCLifeTimeManager() ctx2 := context.WithValue(ctx, &gcLifeTimeKey, manager) for i := 0; i < rc.cfg.App.IndexConcurrency; i++ { go func() { diff --git a/lightning/restore/restore_test.go b/lightning/restore/restore_test.go index 596ff9459..38f7c56a9 100644 --- a/lightning/restore/restore_test.go +++ b/lightning/restore/restore_test.go @@ -121,7 +121,7 @@ func (s *restoreSuite) TestErrorSummaries(c *C) { func MockDoChecksumCtx() context.Context { ctx := context.Background() - manager := newManager() + manager := newGCLifeTimeManager() return context.WithValue(ctx, &gcLifeTimeKey, manager) } From d4c7c168d344c1c7c789e9e84c26e76ff7a54401 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 5 Aug 2019 17:17:48 +0800 Subject: [PATCH 12/13] restore: refine mock expectation order --- lightning/restore/restore_test.go | 46 +++++++++++++++++-------------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/lightning/restore/restore_test.go b/lightning/restore/restore_test.go index 38f7c56a9..792b855ec 100644 --- a/lightning/restore/restore_test.go +++ b/lightning/restore/restore_test.go @@ -128,7 +128,6 @@ func MockDoChecksumCtx() context.Context { func (s *restoreSuite) TestDoChecksum(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) - defer db.Close() mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("10m")) @@ -155,12 +154,14 @@ func (s *restoreSuite) TestDoChecksum(c *C) { TotalKVs: 7296873, TotalBytes: 357601387, }) + + c.Assert(db.Close(), IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) } func (s *restoreSuite) TestDoChecksumParallel(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) - defer db.Close() mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("10m")) @@ -178,13 +179,17 @@ func (s *restoreSuite) TestDoChecksumParallel(c *C) { mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). WithArgs("10m"). WillReturnResult(sqlmock.NewResult(2, 1)) + mock.ExpectClose() ctx := MockDoChecksumCtx() + // db.Close() will close all connections from its idle pool, set it 1 to expect one close + db.SetMaxIdleConns(1) var wg sync.WaitGroup wg.Add(5) for i := 0; i < 5; i++ { go func() { + defer wg.Done() checksum, err := DoChecksum(ctx, db, "`test`.`t`") c.Assert(err, IsNil) c.Assert(*checksum, DeepEquals, RemoteChecksum{ @@ -194,19 +199,17 @@ func (s *restoreSuite) TestDoChecksumParallel(c *C) { TotalKVs: 7296873, TotalBytes: 357601387, }) - wg.Done() }() } wg.Wait() + c.Assert(db.Close(), IsNil) c.Assert(mock.ExpectationsWereMet(), IsNil) - mock.ExpectClose() } func (s *restoreSuite) TestIncreaseGCLifeTimeFail(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) - defer db.Close() for i := 0; i < 5; i++ { mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). @@ -215,10 +218,11 @@ func (s *restoreSuite) TestIncreaseGCLifeTimeFail(c *C) { WithArgs("100h0m0s"). WillReturnError(errors.Annotate(context.Canceled, "update gc error")) } - // This recover GC Life Time SQL should not be executed + // This recover GC Life Time SQL should not be executed in DoChecksum mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). WithArgs("10m"). WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectClose() ctx := MockDoChecksumCtx() var wg sync.WaitGroup @@ -232,17 +236,16 @@ func (s *restoreSuite) TestIncreaseGCLifeTimeFail(c *C) { } wg.Wait() - // validate no more redundant GC recover - err = mock.ExpectationsWereMet() - c.Assert(err, ErrorMatches, "there is a remaining expectation.*\n.*\n.*\n.*\n.*\n.*\n.*") + _, err = db.Exec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E", "10m") + c.Assert(err, IsNil) - mock.ExpectClose() + c.Assert(db.Close(), IsNil) + err = mock.ExpectationsWereMet() } func (s *restoreSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) - defer db.Close() mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("300h")) @@ -251,18 +254,19 @@ func (s *restoreSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) { mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). WithArgs("300h"). WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectClose() ctx := MockDoChecksumCtx() _, err = DoChecksum(ctx, db, "`test`.`t`") c.Assert(err, ErrorMatches, "compute remote checksum failed: mock syntax error.*") + + c.Assert(db.Close(), IsNil) c.Assert(mock.ExpectationsWereMet(), IsNil) - mock.ExpectClose() } func (s *restoreSuite) TestSetSessionConcurrencyVars(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) - defer db.Close() mock.ExpectExec( `SET\s+`+ @@ -272,6 +276,7 @@ func (s *restoreSuite) TestSetSessionConcurrencyVars(c *C) { `SESSION tidb_checksum_table_concurrency = \?`). WithArgs(123, 456, 789, 543). WillReturnResult(sqlmock.NewResult(1, 4)) + mock.ExpectClose() ctx := context.Background() setSessionConcurrencyVars(ctx, db, config.DBStore{ @@ -281,8 +286,8 @@ func (s *restoreSuite) TestSetSessionConcurrencyVars(c *C) { ChecksumTableConcurrency: 543, }) + c.Assert(db.Close(), IsNil) c.Assert(mock.ExpectationsWereMet(), IsNil) - mock.ExpectClose() } var _ = Suite(&tableRestoreSuite{}) @@ -454,7 +459,6 @@ func (s *tableRestoreSuite) TestInitializeColumns(c *C) { func (s *tableRestoreSuite) TestCompareChecksumSuccess(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) - defer db.Close() mock.ExpectQuery("SELECT.*tikv_gc_life_time.*"). WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("10m")) @@ -469,19 +473,20 @@ func (s *tableRestoreSuite) TestCompareChecksumSuccess(c *C) { mock.ExpectExec("UPDATE.*tikv_gc_life_time.*"). WithArgs("10m"). WillReturnResult(sqlmock.NewResult(2, 1)) + mock.ExpectClose() ctx := MockDoChecksumCtx() err = s.tr.compareChecksum(ctx, db, verification.MakeKVChecksum(1234567, 12345, 1234567890)) c.Assert(err, IsNil) + c.Assert(db.Close(), IsNil) c.Assert(mock.ExpectationsWereMet(), IsNil) - mock.ExpectClose() + } func (s *tableRestoreSuite) TestCompareChecksumFailure(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) - defer db.Close() mock.ExpectQuery("SELECT.*tikv_gc_life_time.*"). WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("10m")) @@ -496,29 +501,30 @@ func (s *tableRestoreSuite) TestCompareChecksumFailure(c *C) { mock.ExpectExec("UPDATE.*tikv_gc_life_time.*"). WithArgs("10m"). WillReturnResult(sqlmock.NewResult(2, 1)) + mock.ExpectClose() ctx := MockDoChecksumCtx() err = s.tr.compareChecksum(ctx, db, verification.MakeKVChecksum(9876543, 54321, 1357924680)) c.Assert(err, ErrorMatches, "checksum mismatched.*") + c.Assert(db.Close(), IsNil) c.Assert(mock.ExpectationsWereMet(), IsNil) - mock.ExpectClose() } func (s *tableRestoreSuite) TestAnalyzeTable(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) - defer db.Close() mock.ExpectExec("ANALYZE TABLE `db`\\.`table`"). WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectClose() ctx := context.Background() err = s.tr.analyzeTable(ctx, db) c.Assert(err, IsNil) + c.Assert(db.Close(), IsNil) c.Assert(mock.ExpectationsWereMet(), IsNil) - mock.ExpectClose() } func (s *tableRestoreSuite) TestImportKVSuccess(c *C) { From 844bb1f2431bed433ef4379eef32812436d81259 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 5 Aug 2019 17:27:10 +0800 Subject: [PATCH 13/13] restore: adjust detail --- lightning/restore/restore_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightning/restore/restore_test.go b/lightning/restore/restore_test.go index 792b855ec..563a33144 100644 --- a/lightning/restore/restore_test.go +++ b/lightning/restore/restore_test.go @@ -240,7 +240,7 @@ func (s *restoreSuite) TestIncreaseGCLifeTimeFail(c *C) { c.Assert(err, IsNil) c.Assert(db.Close(), IsNil) - err = mock.ExpectationsWereMet() + c.Assert(mock.ExpectationsWereMet(), IsNil) } func (s *restoreSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) {