Skip to content

Commit

Permalink
add some node
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
  • Loading branch information
Little-Wallace committed May 27, 2021
1 parent 34b2150 commit b9b6a31
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 54 deletions.
14 changes: 0 additions & 14 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,20 +714,6 @@ func (e *File) loadEngineMeta() {
zap.Int64("size", e.TotalSize.Load()))
}

type gRPCConns struct {
mu sync.Mutex
conns map[uint64]*connPool
}

func (conns *gRPCConns) Close() {
conns.mu.Lock()
defer conns.mu.Unlock()

for _, cp := range conns.conns {
cp.Close()
}
}

type local struct {
engines sync.Map // sync version of map[uuid.UUID]*File

Expand Down
13 changes: 13 additions & 0 deletions pkg/lightning/common/conn.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
Expand Down
13 changes: 13 additions & 0 deletions pkg/lightning/restore/checksum.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package restore

import (
Expand Down
110 changes: 73 additions & 37 deletions pkg/lightning/restore/duplicate.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package restore

import (
"bytes"
"context"
"github.com/docker/go-units"
backendkv "github.com/pingcap/br/pkg/lightning/backend/kv"
"github.com/pingcap/br/pkg/lightning/checkpoints"
"github.com/pingcap/br/pkg/lightning/common"
"github.com/pingcap/br/pkg/lightning/log"
"github.com/pingcap/br/pkg/logutil"
Expand All @@ -24,8 +36,6 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"math"
"os"
"path/filepath"
"sort"
"strconv"
"sync"
Expand All @@ -40,13 +50,13 @@ import (
const (
dialTimeout = 5 * time.Second
maxRetryTimes = 5
defaultRetryBackoffTime = 3 * time.Second
defaultRetryBackoffTime = 10 * time.Second

gRPCKeepAliveTime = 10 * time.Second
gRPCKeepAliveTimeout = 3 * time.Second
gRPCBackOffMaxDelay = 3 * time.Second
defaultEngineMemCacheSize = 512 * units.MiB
maxWriteBatchSize = 256
maxScanRegionSize = 256
)

type DuplicateRequest struct {
Expand Down Expand Up @@ -124,10 +134,18 @@ func (manager *DuplicateManager) DuplicateTable(ctx context.Context, tbl table.T
return err
}
var wg sync.WaitGroup
var tableErr common.OnceError
rpcctx, cancel := context.WithCancel(ctx)
defer cancel()
for _, r := range reqs {
wg.Add(1)
go func(req *DuplicateRequest) {
manager.sendRequestToTiKV(ctx, decoder, req)
err := manager.sendRequestToTiKV(rpcctx, decoder, req)
if err != nil {
log.L().Error("error occur when collect duplicate data from TiKV", zap.Error(err))
tableErr.Set(err)
cancel()
}
wg.Done()
}(r)
}
Expand All @@ -136,26 +154,29 @@ func (manager *DuplicateManager) DuplicateTable(ctx context.Context, tbl table.T
}

func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context, decoder *backendkv.TableKVDecoder, req *DuplicateRequest) error {
rpcctx, cancel := context.WithCancel(ctx)
defer cancel()
startKey := codec.EncodeBytes([]byte{}, req.start)
endKey := codec.EncodeBytes([]byte{}, req.end)

regions, err := paginateScanRegion(rpcctx, manager.pdClient, startKey, endKey, 128)
regions, err := paginateScanRegion(ctx, manager.pdClient, startKey, endKey)
if err != nil {
return err
}
tryTimes := 0;
for {
if len(regions) == 0 {
break
}
if tryTimes > maxRetryTimes {
return errors.Errorf("retry time exceed limit")
}
unfinishedRegions := make([]*pd.Region, len(regions))
waitingClients := make([]sst.ImportSST_DuplicateDetectClient, len(regions))
watingRegions := make([]*pd.Region, len(regions))
for _, region := range regions {
for idx, region := range regions {
if len(waitingClients) > manager.regionConcurrency {
unfinishedRegions = append(unfinishedRegions, region)
continue
r := regions[idx:]
unfinishedRegions = append(unfinishedRegions, r...)
break
}
_, start, _ := codec.DecodeBytes(region.Meta.StartKey, []byte{})
_, end, _ := codec.DecodeBytes(region.Meta.EndKey, []byte{})
Expand All @@ -168,7 +189,7 @@ func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context, decoder

cli, err := manager.getDuplicateStream(ctx, region, start, end, req.indexInfo == nil)
if err != nil {
r, err := manager.pdClient.GetRegionByID(rpcctx, region.Meta.GetId())
r, err := manager.pdClient.GetRegionByID(ctx, region.Meta.GetId())
if err != nil {
unfinishedRegions = append(unfinishedRegions, region)
} else {
Expand All @@ -185,7 +206,7 @@ func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context, decoder
for {
resp, err := cli.Recv()
if err != nil || resp.GetKeyError() != nil {
r, err := manager.pdClient.GetRegionByID(rpcctx, region.Meta.GetId())
r, err := manager.pdClient.GetRegionByID(ctx, region.Meta.GetId())
if err != nil {
unfinishedRegions = append(unfinishedRegions, region)
} else {
Expand All @@ -209,7 +230,7 @@ func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context, decoder
logutil.Region(region.Meta), logutil.Leader(region.Leader),
zap.String("RegionError", resp.GetRegionError().GetMessage()))

r, err := paginateScanRegion(rpcctx, manager.pdClient, watingRegions[idx].Meta.GetStartKey(), watingRegions[idx].Meta.GetEndKey(), 128)
r, err := paginateScanRegion(ctx, manager.pdClient, watingRegions[idx].Meta.GetStartKey(), watingRegions[idx].Meta.GetEndKey())
if err != nil {
unfinishedRegions = append(unfinishedRegions, watingRegions[idx])
} else {
Expand All @@ -220,9 +241,15 @@ func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context, decoder
if len(resp.Pairs) == 0 {
break
}
manager.storeDuplicateData(resp, decoder, req)
if err := manager.storeDuplicateData(resp, decoder, req); err != nil {
return err
}
}
}
if len(unfinishedRegions) == len(regions) {
tryTimes += 1
time.Sleep(defaultRetryBackoffTime)
}
regions = unfinishedRegions
}
return nil
Expand Down Expand Up @@ -397,32 +424,41 @@ func buildIndexRequest(tableID int64, indexInfo *model.IndexInfo) ([]*DuplicateR
}

func paginateScanRegion(
ctx context.Context, client pd.Client, startKey, endKey []byte, limit int,
ctx context.Context, client pd.Client, start, end []byte,
) ([]*pd.Region, error) {
if len(endKey) != 0 && bytes.Compare(startKey, endKey) >= 0 {
if len(end) != 0 && bytes.Compare(start, end) >= 0 {
return nil, errors.Errorf("startKey > endKey when paginating scan region")
}

var regions []*pd.Region
for {
batch, err := client.ScanRegions(ctx, startKey, endKey, limit)
if err != nil {
return nil, errors.Trace(err)
}
regions = append(regions, batch...)
if len(batch) < limit {
// No more region
break
var globalErr error
for i := 0; i < maxRetryTimes; i ++ {
startKey := start
endKey := end
var regions []*pd.Region
for {
batch, err := client.ScanRegions(ctx, startKey, endKey, maxScanRegionSize)
if err != nil {
globalErr = err
break
}
regions = append(regions, batch...)
if len(batch) < maxScanRegionSize {
// No more region
break
}
startKey = batch[len(batch)-1].Meta.GetEndKey()
if len(startKey) == 0 ||
(len(endKey) > 0 && bytes.Compare(startKey, endKey) >= 0) {
// All key space have scanned
break
}
}
startKey = batch[len(batch)-1].Meta.GetEndKey()
if len(startKey) == 0 ||
(len(endKey) > 0 && bytes.Compare(startKey, endKey) >= 0) {
// All key space have scanned
break
if globalErr == nil {
sort.Slice(regions, func(i, j int) bool {
return bytes.Compare(regions[i].Meta.StartKey, regions[j].Meta.StartKey) < 0
})
return regions, nil
}
}
sort.Slice(regions, func(i, j int) bool {
return bytes.Compare(regions[i].Meta.StartKey, regions[j].Meta.StartKey) < 0
})
return regions, nil
return nil, globalErr
}
4 changes: 1 addition & 3 deletions pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ type Controller struct {

// commit ts for local and importer backend
ts uint64
duplicateManager *DuplicateManager
}

func NewRestoreController(
Expand Down Expand Up @@ -2153,7 +2154,6 @@ type TableRestore struct {
dbInfo *checkpoints.TidbDBInfo
tableInfo *checkpoints.TidbTableInfo
tableMeta *mydump.MDTableMeta
manager *DuplicateManager
encTable table.Table
alloc autoid.Allocators
logger log.Logger
Expand All @@ -2165,7 +2165,6 @@ func NewTableRestore(
dbInfo *checkpoints.TidbDBInfo,
tableInfo *checkpoints.TidbTableInfo,
cp *checkpoints.TableCheckpoint,
manager *DuplicateManager,
) (*TableRestore, error) {
idAlloc := kv.NewPanickingAllocators(cp.AllocBase)
tbl, err := tables.TableFromMeta(idAlloc, tableInfo.Core)
Expand All @@ -2178,7 +2177,6 @@ func NewTableRestore(
dbInfo: dbInfo,
tableInfo: tableInfo,
tableMeta: tableMeta,
manager: manager,
encTable: tbl,
alloc: idAlloc,
logger: log.With(zap.String("table", tableName)),
Expand Down

0 comments on commit b9b6a31

Please sign in to comment.