Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
Revert "restore: update and restore GCLifeTime once when parallel (#220
Browse files Browse the repository at this point in the history
…)"

This reverts commit 9f36067.
  • Loading branch information
lance6716 committed Aug 7, 2019
1 parent 9f36067 commit 1bb77eb
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 835 deletions.
107 changes: 31 additions & 76 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ var (
requiredTiKVVersion = *semver.New("2.1.0")
)

var gcLifeTimeKey struct{}

func init() {
cfg := tidbcfg.GetGlobalConfig()
Expand Down Expand Up @@ -448,59 +449,6 @@ func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan
}
}

type gcLifeTimeManager struct {
runningJobsLock sync.Mutex
runningJobs int
oriGCLifeTime string
}

func newGCLifeTimeManager() *gcLifeTimeManager {
// Default values of three member are enough to initialize this struct
return &gcLifeTimeManager{}
}

// Pre- and post-condition:
// if m.runningJobs == 0, GC life time has not been increased.
// if m.runningJobs > 0, GC life time has been increased.
// m.runningJobs won't be negative(overflow) since index concurrency is relatively small
func (m *gcLifeTimeManager) addOneJob(ctx context.Context, db *sql.DB) error {
m.runningJobsLock.Lock()
defer m.runningJobsLock.Unlock()

if m.runningJobs == 0 {
oriGCLifeTime, err := ObtainGCLifeTime(ctx, db)
if err != nil {
return err
}
m.oriGCLifeTime = oriGCLifeTime
err = increaseGCLifeTime(ctx, db)
if err != nil {
return err
}
}
m.runningJobs += 1
return nil
}

// Pre- and post-condition:
// if m.runningJobs == 0, GC life time has been tried to recovered. If this try fails, a warning will be printed.
// if m.runningJobs > 0, GC life time has not been recovered.
// m.runningJobs won't minus to negative since removeOneJob follows a successful addOneJob.
func (m *gcLifeTimeManager) removeOneJob(ctx context.Context, db *sql.DB, table string) {
m.runningJobsLock.Lock()
defer m.runningJobsLock.Unlock()

m.runningJobs -= 1
if m.runningJobs == 0 {
err := UpdateGCLifeTime(ctx, db, m.oriGCLifeTime)
if err != nil && common.ShouldLogError(err) {
common.AppLogger.Errorf("[%s] update tikv_gc_life_time error %v", table, errors.ErrorStack(err))
}
}
}

var gcLifeTimeKey struct{}

func (rc *RestoreController) restoreTables(ctx context.Context) error {
timer := time.Now()
var wg sync.WaitGroup
Expand All @@ -516,9 +464,11 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
}
taskCh := make(chan task, rc.cfg.App.IndexConcurrency)
defer close(taskCh)

manager := newGCLifeTimeManager()
ctx2 := context.WithValue(ctx, &gcLifeTimeKey, manager)
oriGCLifeTime, err := ObtainGCLifeTime(ctx, rc.tidbMgr.db)
if err != nil {
return err
}
ctx2 := context.WithValue(ctx, &gcLifeTimeKey, oriGCLifeTime)
for i := 0; i < rc.cfg.App.IndexConcurrency; i++ {
go func() {
for task := range taskCh {
Expand Down Expand Up @@ -1365,18 +1315,17 @@ func setSessionConcurrencyVars(ctx context.Context, db *sql.DB, dsn config.DBSto
func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum, error) {
timer := time.Now()

var err error
manager, ok := ctx.Value(&gcLifeTimeKey).(*gcLifeTimeManager)
if !ok {
return nil, errors.New("No gcLifeTimeManager found in context, check context initialization")
}

if err = manager.addOneJob(ctx, db); err != nil {
return nil, err
ori, err := increaseGCLifeTime(ctx, db)
if err != nil {
return nil, errors.Trace(err)
}

// set it back finally
defer manager.removeOneJob(ctx, db, table)
defer func() {
err = UpdateGCLifeTime(ctx, db, ori)
if err != nil && common.ShouldLogError(err) {
common.AppLogger.Errorf("[%s] update tikv_gc_life_time error %v", table, errors.ErrorStack(err))
}
}()

// ADMIN CHECKSUM TABLE <table>,<table> example.
// mysql> admin checksum table test.t;
Expand All @@ -1398,19 +1347,25 @@ func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum,
return &cs, nil
}

func increaseGCLifeTime(ctx context.Context, db *sql.DB) (err error) {
func increaseGCLifeTime(ctx context.Context, db *sql.DB) (oriGCLifeTime string, err error) {
// checksum command usually takes a long time to execute,
// so here need to increase the gcLifeTime for single transaction.

// try to get gcLifeTimeManager from context first.
// DoChecksum has assure this getting action success.
manager, _ := ctx.Value(&gcLifeTimeKey).(*gcLifeTimeManager)
// try to get gcLifeTime from context first.
gcLifeTime, ok := ctx.Value(&gcLifeTimeKey).(string)
if !ok {
oriGCLifeTime, err = ObtainGCLifeTime(ctx, db)
if err != nil {
return "", err
}
} else {
oriGCLifeTime = gcLifeTime
}

var increaseGCLifeTime bool
if manager.oriGCLifeTime != "" {
ori, err := time.ParseDuration(manager.oriGCLifeTime)
if oriGCLifeTime != "" {
ori, err := time.ParseDuration(oriGCLifeTime)
if err != nil {
return errors.Trace(err)
return "", errors.Trace(err)
}
if ori < defaultGCLifeTime {
increaseGCLifeTime = true
Expand All @@ -1422,13 +1377,13 @@ func increaseGCLifeTime(ctx context.Context, db *sql.DB) (err error) {
if increaseGCLifeTime {
err = UpdateGCLifeTime(ctx, db, defaultGCLifeTime.String())
if err != nil {
return err
return "", errors.Trace(err)
}
}

failpoint.Inject("IncreaseGCUpdateDuration", func() {})

return nil
return oriGCLifeTime, nil
}

////////////////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit 1bb77eb

Please sign in to comment.