Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
*: abstract the Importer communication into an interface (#215)
Browse files Browse the repository at this point in the history
* common: allow retry on ErrWriteConflictInTiDB (mysql error 8005)

* *: perform SwitchMode and Compact directly through ImportSSTService

This allows us to perform these actions without relying on Importer.

* *: abstracted *kv.Importer into kv.Backend

* common,kv: fix comments

* kv,restore: addressed comments

* restore: fix comments
  • Loading branch information
kennytm authored Jul 29, 2019
1 parent ae59251 commit af34025
Show file tree
Hide file tree
Showing 17 changed files with 1,745 additions and 922 deletions.
42 changes: 20 additions & 22 deletions cmd/tidb-lightning-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"flag"
"fmt"
"net/http"
"os"
"path"
"strconv"
Expand Down Expand Up @@ -102,17 +103,15 @@ func run() error {
}

func compactCluster(ctx context.Context, cfg *config.Config) error {
importer, err := kv.NewImporter(ctx, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr)
if err != nil {
return errors.Trace(err)
}
defer importer.Close()

if err := importer.Compact(ctx, restore.FullLevelCompact); err != nil {
return errors.Trace(err)
}

return nil
return kv.ForAllStores(
ctx,
&http.Client{},
cfg.TiDB.PdAddr,
kv.StoreStateDisconnected,
func(c context.Context, store *kv.Store) error {
return kv.Compact(c, store.Address, restore.FullLevelCompact)
},
)
}

func switchMode(ctx context.Context, cfg *config.Config, mode string) error {
Expand All @@ -126,16 +125,15 @@ func switchMode(ctx context.Context, cfg *config.Config, mode string) error {
return errors.Errorf("invalid mode %s, must use %s or %s", mode, config.ImportMode, config.NormalMode)
}

importer, err := kv.NewImporter(ctx, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr)
if err != nil {
return errors.Trace(err)
}
defer importer.Close()

if err := importer.SwitchMode(ctx, m); err != nil {
return errors.Trace(err)
}
return nil
return kv.ForAllStores(
ctx,
&http.Client{},
cfg.TiDB.PdAddr,
kv.StoreStateDisconnected,
func(c context.Context, store *kv.Store) error {
return kv.SwitchMode(c, store.Address, m)
},
)
}

func checkpointRemove(ctx context.Context, cfg *config.Config, tableName string) error {
Expand Down Expand Up @@ -253,7 +251,7 @@ func checkpointDump(ctx context.Context, cfg *config.Config, dumpFolder string)
return nil
}

func unsafeCloseEngine(ctx context.Context, importer *kv.Importer, engine string) (*kv.ClosedEngine, error) {
func unsafeCloseEngine(ctx context.Context, importer kv.Backend, engine string) (*kv.ClosedEngine, error) {
if index := strings.LastIndexByte(engine, ':'); index >= 0 {
tableName := engine[:index]
engineID, err := strconv.Atoi(engine[index+1:])
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/satori/go.uuid v1.2.0
github.com/shurcooL/httpgzip v0.0.0-20190516014818-1c7afaae1203
go.uber.org/zap v1.10.0
golang.org/x/sync v0.0.0-20190423024810-112230192c58
golang.org/x/text v0.3.2
google.golang.org/grpc v1.21.1
modernc.org/mathutil v1.0.0
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAG
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
2 changes: 1 addition & 1 deletion lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func IsRetryableError(err error) bool {
case *mysql.MySQLError:
switch nerr.Number {
// ErrLockDeadlock can retry to commit while meet deadlock
case tmysql.ErrUnknown, tmysql.ErrLockDeadlock, tmysql.ErrPDServerTimeout, tmysql.ErrTiKVServerTimeout, tmysql.ErrTiKVServerBusy, tmysql.ErrResolveLockTimeout, tmysql.ErrRegionUnavailable:
case tmysql.ErrUnknown, tmysql.ErrLockDeadlock, tmysql.ErrWriteConflictInTiDB, tmysql.ErrPDServerTimeout, tmysql.ErrTiKVServerTimeout, tmysql.ErrTiKVServerBusy, tmysql.ErrResolveLockTimeout, tmysql.ErrRegionUnavailable:
return true
default:
return false
Expand Down
1 change: 1 addition & 0 deletions lightning/common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (s *utilSuite) TestIsRetryableError(c *C) {
c.Assert(common.IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrTiKVServerBusy}), IsTrue)
c.Assert(common.IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrResolveLockTimeout}), IsTrue)
c.Assert(common.IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrRegionUnavailable}), IsTrue)
c.Assert(common.IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrWriteConflictInTiDB}), IsTrue)

// gRPC Errors
c.Assert(common.IsRetryableError(status.Error(codes.Canceled, "")), IsFalse)
Expand Down
Loading

0 comments on commit af34025

Please sign in to comment.