Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Detect duplicate data from TiKV #1144

Merged
merged 26 commits into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 19 additions & 20 deletions pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ import (

"golang.org/x/sync/errgroup"

split "github.com/pingcap/br/pkg/restore"
"github.com/pingcap/br/pkg/lightning/backend/kv"
"github.com/pingcap/br/pkg/lightning/common"
"github.com/pingcap/br/pkg/lightning/log"
"github.com/pingcap/br/pkg/logutil"
"github.com/pingcap/br/pkg/restore"
kennytm marked this conversation as resolved.
Show resolved Hide resolved

"github.com/cockroachdb/pebble"
"github.com/pingcap/errors"
Expand All @@ -33,7 +37,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/kv"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/ranger"
Expand All @@ -42,11 +46,6 @@ import (
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"

backendkv "github.com/pingcap/br/pkg/lightning/backend/kv"
"github.com/pingcap/br/pkg/lightning/common"
"github.com/pingcap/br/pkg/lightning/log"
"github.com/pingcap/br/pkg/logutil"
)

const (
Expand All @@ -56,15 +55,15 @@ const (

type DuplicateRequest struct {
tableID int64
start kv.Key
end kv.Key
start tidbkv.Key
end tidbkv.Key
indexInfo *model.IndexInfo
}

type DuplicateManager struct {
// TODO: Remote the member `db` and store the result in another place.
db *pebble.DB
splitCli split.SplitClient
splitCli restore.SplitClient
regionConcurrency int
connPool common.GRPCConns
tls *common.TLS
Expand All @@ -74,7 +73,7 @@ type DuplicateManager struct {

func NewDuplicateManager(
db *pebble.DB,
splitCli split.SplitClient,
splitCli restore.SplitClient,
ts uint64,
tls *common.TLS,
regionConcurrency int) (*DuplicateManager, error) {
Expand All @@ -96,7 +95,7 @@ func (manager *DuplicateManager) CollectDuplicateRowsFromTiKV(ctx context.Contex
return err
}

decoder, err := backendkv.NewTableKVDecoder(tbl, &backendkv.SessionOptions{
decoder, err := kv.NewTableKVDecoder(tbl, &kv.SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
})
if err != nil {
Expand All @@ -119,7 +118,7 @@ func (manager *DuplicateManager) CollectDuplicateRowsFromTiKV(ctx context.Contex
}

func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context,
decoder *backendkv.TableKVDecoder,
decoder *kv.TableKVDecoder,
req *DuplicateRequest) error {
startKey := codec.EncodeBytes([]byte{}, req.start)
endKey := codec.EncodeBytes([]byte{}, req.end)
Expand All @@ -137,9 +136,9 @@ func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context,
if tryTimes > maxRetryTimes {
return errors.Errorf("retry time exceed limit")
}
unfinishedRegions := make([]*split.RegionInfo, 0)
unfinishedRegions := make([]*restore.RegionInfo, 0)
waitingClients := make([]import_sstpb.ImportSST_DuplicateDetectClient, 0)
watingRegions := make([]*split.RegionInfo, 0)
watingRegions := make([]*restore.RegionInfo, 0)
for idx, region := range regions {
if len(waitingClients) > manager.regionConcurrency {
r := regions[idx:]
Expand Down Expand Up @@ -247,7 +246,7 @@ func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context,
func (manager *DuplicateManager) storeDuplicateData(
ctx context.Context,
resp *import_sstpb.DuplicateDetectResponse,
decoder *backendkv.TableKVDecoder,
decoder *kv.TableKVDecoder,
req *DuplicateRequest,
) ([][]byte, error) {
opts := &pebble.WriteOptions{Sync: false}
Expand Down Expand Up @@ -307,14 +306,14 @@ func (manager *DuplicateManager) CollectDuplicateRowsFromLocalIndex(
tbl table.Table,
db *pebble.DB,
) error {
decoder, err := backendkv.NewTableKVDecoder(tbl, &backendkv.SessionOptions{
decoder, err := kv.NewTableKVDecoder(tbl, &kv.SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
})
if err != nil {
return err
}
handles := make([][]byte, 0)
allRanges := make([]kv.KeyRange, 0)
allRanges := make([]tidbkv.KeyRange, 0)
for _, indexInfo := range tbl.Meta().Indices {
if indexInfo.State != model.StatePublic {
continue
Expand Down Expand Up @@ -433,7 +432,7 @@ func (manager *DuplicateManager) getValues(

func (manager *DuplicateManager) getValuesFromRegion(
ctx context.Context,
region *split.RegionInfo,
region *restore.RegionInfo,
handles [][]byte,
) error {
kvclient, err := manager.getKvClient(ctx, region.Leader)
Expand Down Expand Up @@ -498,7 +497,7 @@ func (manager *DuplicateManager) getValuesFromRegion(
}

func (manager *DuplicateManager) getDuplicateStream(ctx context.Context,
region *split.RegionInfo,
region *restore.RegionInfo,
start []byte, end []byte) (import_sstpb.ImportSST_DuplicateDetectClient, error) {
leader := region.Leader
if leader == nil {
Expand Down
8 changes: 3 additions & 5 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@ import (
"sync"
"time"

"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/util/ranger"

"github.com/pingcap/parser/mysql"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/sstable"
"github.com/coreos/go-semver/semver"
Expand All @@ -47,10 +42,13 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/ranger"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/atomic"
Expand Down