Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: local backend support keyspace #40628

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b51cc89
lightning: local backend support keyspace
iosmanthus Dec 13, 2022
c1a83d4
Merge branch 'master' of github.com:pingcap/tidb into keyspace-lightn…
iosmanthus Jan 16, 2023
3436e4b
fix checksum in sql2kv
iosmanthus Jan 17, 2023
ee9f20a
Merge branch 'master' of github.com:pingcap/tidb into keyspace-lightn…
iosmanthus Jan 28, 2023
dbe57d6
address comments from tangenta
iosmanthus Jan 28, 2023
e210ff1
Merge branch 'master' of github.com:pingcap/tidb into keyspace-lightn…
iosmanthus Jan 28, 2023
bed74f2
fix lint
iosmanthus Jan 28, 2023
e8bc6e4
fix goimports
iosmanthus Jan 28, 2023
c1ed3b6
fix lint
iosmanthus Jan 28, 2023
e8496a3
return empty keyspace name while db is nil
iosmanthus Jan 29, 2023
cf85f86
Merge branch 'master' of github.com:pingcap/tidb into keyspace-lightn…
iosmanthus Jan 29, 2023
d77cdb1
fix default codec for lightning test
iosmanthus Jan 30, 2023
d29c443
fix default codec for TestChunkRestoreSuite
iosmanthus Jan 31, 2023
6386200
Merge branch 'master' of github.com:pingcap/tidb into keyspace-lightn…
iosmanthus Jan 31, 2023
3a37ce7
fix lint
iosmanthus Jan 31, 2023
b7a9c10
fix unit test panic
iosmanthus Jan 31, 2023
77f1a5b
make bazel_prepare
iosmanthus Jan 31, 2023
7133d1e
revert .gitignore for test
iosmanthus Jan 31, 2023
53050be
fix lint
iosmanthus Jan 31, 2023
bb9057f
make bazel_prepare
iosmanthus Jan 31, 2023
8fb89ef
fix bazel ignore path
iosmanthus Jan 31, 2023
db76add
Merge branch 'master' into keyspace-lightning-local-backend
iosmanthus Jan 31, 2023
96426c3
fix bazel ignore path
iosmanthus Jan 31, 2023
01c327f
revert .gitignore for test
iosmanthus Jan 31, 2023
ce84419
Merge branch 'master' into keyspace-lightning-local-backend
iosmanthus Jan 31, 2023
ac35029
Merge branch 'master' into keyspace-lightning-local-backend
iosmanthus Jan 31, 2023
4d99d37
Merge branch 'master' into keyspace-lightning-local-backend
ti-chi-bot Feb 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ go_test(
"//br/pkg/restore/split",
"//br/pkg/utils",
"//ddl",
"//keyspace",
"//kv",
"//parser",
"//parser/ast",
Expand Down
16 changes: 16 additions & 0 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ type DuplicateManager struct {
tableName string
splitCli split.SplitClient
tikvCli *tikv.KVStore
tikvCodec tikv.Codec
errorMgr *errormanager.ErrorManager
decoder *kv.TableKVDecoder
logger log.Logger
Expand All @@ -401,6 +402,7 @@ func NewDuplicateManager(
tableName string,
splitCli split.SplitClient,
tikvCli *tikv.KVStore,
tikvCodec tikv.Codec,
errMgr *errormanager.ErrorManager,
sessOpts *kv.SessionOptions,
concurrency int,
Expand All @@ -417,6 +419,7 @@ func NewDuplicateManager(
tableName: tableName,
splitCli: splitCli,
tikvCli: tikvCli,
tikvCodec: tikvCodec,
errorMgr: errMgr,
decoder: decoder,
logger: logger,
Expand All @@ -439,6 +442,10 @@ func (m *DuplicateManager) RecordDataConflictError(ctx context.Context, stream D
if err != nil {
return errors.Trace(err)
}
key, err = m.tikvCodec.DecodeKey(key)
if err != nil {
return errors.Trace(err)
}
m.hasDupe.Store(true)

h, err := m.decoder.DecodeHandleFromRowKey(key)
Expand Down Expand Up @@ -504,6 +511,10 @@ func (m *DuplicateManager) RecordIndexConflictError(ctx context.Context, stream
if err != nil {
return errors.Trace(err)
}
key, err = m.tikvCodec.DecodeKey(key)
if err != nil {
return errors.Trace(err)
}
m.hasDupe.Store(true)

h, err := m.decoder.DecodeHandleFromIndex(indexInfo, key, val)
Expand Down Expand Up @@ -581,6 +592,11 @@ func (m *DuplicateManager) buildDupTasks() ([]dupTask, error) {
putToTaskFunc(ranges, indexInfo)
})
}

// Encode all the tasks
for i := range tasks {
tasks[i].StartKey, tasks[i].EndKey = m.tikvCodec.EncodeRange(tasks[i].StartKey, tasks[i].EndKey)
}
return tasks, nil
}

Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/local/duplicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/keyspace"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -52,7 +53,7 @@ func TestBuildDupTask(t *testing.T) {
{&lkv.SessionOptions{IndexID: info.Indices[1].ID}, false},
}
for _, tc := range testCases {
dupMgr, err := local.NewDuplicateManager(tbl, "t", nil, nil, nil,
dupMgr, err := local.NewDuplicateManager(tbl, "t", nil, nil, keyspace.CodecV1, nil,
tc.sessOpt, 4, atomic.NewBool(false), log.FromContext(context.Background()))
require.NoError(t, err)
tasks, err := local.BuildDuplicateTaskForTest(dupMgr)
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util/hack"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -1045,6 +1046,8 @@ type Writer struct {
batchSize int64

lastMetaSeq int32

tikvCodec tikv.Codec
}

func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
Expand Down Expand Up @@ -1127,6 +1130,10 @@ func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames [
return errorEngineClosed
}

for i := range kvs {
kvs[i].Key = w.tikvCodec.EncodeKey(kvs[i].Key)
}

w.Lock()
defer w.Unlock()

Expand Down
54 changes: 35 additions & 19 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,12 +349,13 @@ func checkTiFlashVersion(ctx context.Context, g glue.Glue, checkCtx *backend.Che
type local struct {
engines sync.Map // sync version of map[uuid.UUID]*Engine

pdCtl *pdutil.PdController
splitCli split.SplitClient
tikvCli *tikvclient.KVStore
tls *common.TLS
pdAddr string
g glue.Glue
pdCtl *pdutil.PdController
splitCli split.SplitClient
tikvCli *tikvclient.KVStore
tls *common.TLS
pdAddr string
g glue.Glue
tikvCodec tikvclient.Codec

localStoreDir string

Expand Down Expand Up @@ -419,6 +420,7 @@ func NewLocalBackend(
g glue.Glue,
maxOpenFiles int,
errorMgr *errormanager.ErrorManager,
keyspaceName string,
) (backend.Backend, error) {
localFile := cfg.TikvImporter.SortedKVDir
rangeConcurrency := cfg.TikvImporter.RangeConcurrency
Expand Down Expand Up @@ -460,8 +462,19 @@ func NewLocalBackend(
if err != nil {
return backend.MakeBackend(nil), common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs()
}
rpcCli := tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig()))
pdCliForTiKV := tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCtl.GetPDClient())

var pdCliForTiKV *tikvclient.CodecPDClient
if keyspaceName == "" {
pdCliForTiKV = tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCtl.GetPDClient())
} else {
pdCliForTiKV, err = tikvclient.NewCodecPDClientWithKeyspace(tikvclient.ModeTxn, pdCtl.GetPDClient(), keyspaceName)
if err != nil {
return backend.MakeBackend(nil), common.ErrCreatePDClient.Wrap(err).GenWithStackByArgs()
}
}

tikvCodec := pdCliForTiKV.GetCodec()
rpcCli := tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig()), tikvclient.WithCodec(tikvCodec))
tikvCli, err := tikvclient.NewKVStore("lightning-local-backend", pdCliForTiKV, spkv, rpcCli)
if err != nil {
return backend.MakeBackend(nil), common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs()
Expand All @@ -484,13 +497,14 @@ func NewLocalBackend(
LastAlloc = alloc
}
local := &local{
engines: sync.Map{},
pdCtl: pdCtl,
splitCli: splitCli,
tikvCli: tikvCli,
tls: tls,
pdAddr: cfg.TiDB.PdAddr,
g: g,
engines: sync.Map{},
pdCtl: pdCtl,
splitCli: splitCli,
tikvCli: tikvCli,
tls: tls,
pdAddr: cfg.TiDB.PdAddr,
g: g,
tikvCodec: tikvCodec,

localStoreDir: localFile,
rangeConcurrency: worker.NewPool(ctx, rangeConcurrency, "range"),
Expand Down Expand Up @@ -975,6 +989,7 @@ func (local *local) WriteToTiKV(
Start: firstKey,
End: lastKey,
},
ApiVersion: local.tikvCodec.GetAPIVersion(),
}

leaderID := region.Leader.GetId()
Expand Down Expand Up @@ -1676,7 +1691,7 @@ func (local *local) CollectLocalDuplicateRows(ctx context.Context, tbl table.Tab
}()

atomicHasDupe := atomic.NewBool(false)
duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli,
duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli, local.tikvCodec,
local.errorMgr, opts, local.dupeConcurrency, atomicHasDupe, log.FromContext(ctx))
if err != nil {
return false, errors.Trace(err)
Expand All @@ -1694,7 +1709,7 @@ func (local *local) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Ta
}()

atomicHasDupe := atomic.NewBool(false)
duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli,
duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli, local.tikvCodec,
local.errorMgr, opts, local.dupeConcurrency, atomicHasDupe, log.FromContext(ctx))
if err != nil {
return false, errors.Trace(err)
Expand Down Expand Up @@ -1908,16 +1923,17 @@ func (local *local) LocalWriter(ctx context.Context, cfg *backend.LocalWriterCon
return nil, errors.Errorf("could not find engine for %s", engineUUID.String())
}
engine := e.(*Engine)
return openLocalWriter(cfg, engine, local.localWriterMemCacheSize, local.bufferPool.NewBuffer())
return openLocalWriter(cfg, engine, local.tikvCodec, local.localWriterMemCacheSize, local.bufferPool.NewBuffer())
}

func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, cacheSize int64, kvBuffer *membuf.Buffer) (*Writer, error) {
func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, tikvCodec tikvclient.Codec, cacheSize int64, kvBuffer *membuf.Buffer) (*Writer, error) {
w := &Writer{
engine: engine,
memtableSizeLimit: cacheSize,
kvBuffer: kvBuffer,
isKVSorted: cfg.IsKVSorted,
isWriteBatchSorted: true,
tikvCodec: tikvCodec,
}
// pre-allocate a long enough buffer to avoid a lot of runtime.growslice
// this can help save about 3% of CPU.
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/keyspace"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -332,7 +333,7 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) {
pool := membuf.NewPool()
defer pool.Destroy()
kvBuffer := pool.NewBuffer()
w, err := openLocalWriter(&backend.LocalWriterConfig{IsKVSorted: sorted}, f, 1024, kvBuffer)
w, err := openLocalWriter(&backend.LocalWriterConfig{IsKVSorted: sorted}, f, keyspace.CodecV1, 1024, kvBuffer)
require.NoError(t, err)

ctx := context.Background()
Expand Down Expand Up @@ -1290,6 +1291,7 @@ func TestCheckPeersBusy(t *testing.T) {
bufferPool: membuf.NewPool(),
supportMultiIngest: true,
shouldCheckWriteStall: true,
tikvCodec: keyspace.CodecV1,
}

db, tmpPath := makePebbleDB(t, nil)
Expand Down
40 changes: 40 additions & 0 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,38 @@ var (
taskCfgRecorderKey = "taskCfgRecorderKey"
)

func getKeyspaceName(g glue.Glue) (string, error) {
db, err := g.GetDB()
if err != nil {
return "", err
}
if db == nil {
return "", nil
}

rows, err := db.Query("show config where Type = 'tidb' and name = 'keyspace-name'")
if err != nil {
return "", err
}
//nolint: errcheck
defer rows.Close()

var (
_type string
_instance string
_name string
value string
)
if rows.Next() {
err = rows.Scan(&_type, &_instance, &_name, &value)
if err != nil {
return "", err
}
}

return value, rows.Err()
}

func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *options) (err error) {
build.LogInfo(build.Lightning)
o.logger.Info("cfg", zap.Stringer("cfg", taskCfg))
Expand Down Expand Up @@ -541,6 +573,13 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti
dbMetas := mdl.GetDatabases()
web.BroadcastInitProgress(dbMetas)

keyspaceName, err := getKeyspaceName(g)
if err != nil {
o.logger.Error("fail to get keyspace name", zap.Error(err))
return errors.Trace(err)
}
o.logger.Info("acquired keyspace name", zap.String("keyspaceName", keyspaceName))

param := &restore.ControllerParam{
DBMetas: dbMetas,
Status: &l.status,
Expand All @@ -550,6 +589,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti
CheckpointStorage: o.checkpointStorage,
CheckpointName: o.checkpointName,
DupIndicator: o.dupIndicator,
KeyspaceName: keyspaceName,
}

var procedure *restore.Controller
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_library(
"//br/pkg/version/build",
"//ddl",
"//errno",
"//keyspace",
"//kv",
"//meta/autoid",
"//parser",
Expand Down
Loading