Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
This is an automated cherry-pick of #1089
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
glorv authored and ti-chi-bot committed May 27, 2021
1 parent 079faaf commit fd12d0a
Show file tree
Hide file tree
Showing 10 changed files with 334 additions and 80 deletions.
7 changes: 7 additions & 0 deletions go.mod1
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,18 @@ require (
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3
github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd
<<<<<<< HEAD
github.com/pingcap/kvproto v0.0.0-20210308063835-39b884695fb8
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8
github.com/pingcap/parser v0.0.0-20210325072920-0d17053a8a69
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 // indirect
github.com/pingcap/tidb v1.1.0-beta.0.20210416032353-4c49754750e5
=======
github.com/pingcap/kvproto v0.0.0-20210507074444-0ec2d0dc2e4b
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4
github.com/pingcap/parser v0.0.0-20210513020953-ae2c4497c07b
github.com/pingcap/tidb v1.1.0-beta.0.20210517044538-8ad868f801fc
>>>>>>> 179e15db (lightning/restore: support ingset multi ssts for same range (#1089))
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible
github.com/pingcap/tipb v0.0.0-20210326161441-1164ca065d1b
github.com/prometheus/client_golang v1.5.1
Expand Down
6 changes: 6 additions & 0 deletions go.sum1
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,14 @@ github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200810113304-6157337686b1/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
<<<<<<< HEAD
github.com/pingcap/kvproto v0.0.0-20210308063835-39b884695fb8 h1:t72qxPxunoKykkAuO5glpWGdoP+RmvKvX0lvmyFV0fI=
github.com/pingcap/kvproto v0.0.0-20210308063835-39b884695fb8/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
=======
github.com/pingcap/kvproto v0.0.0-20210507054410-a8152f8a876c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210507074444-0ec2d0dc2e4b h1:e42N26QQjVA/obDrFFapJ1YLB+j5aPQOh7R+cIGR9Bk=
github.com/pingcap/kvproto v0.0.0-20210507074444-0ec2d0dc2e4b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
>>>>>>> 179e15db (lightning/restore: support ingset multi ssts for same range (#1089))
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8 h1:M+DNpOu/I3uDmwee6vcnoPd6GgSMqND4gxvDQ/W584U=
Expand Down
116 changes: 101 additions & 15 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,12 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"

"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/lightning/backend"
"github.com/pingcap/br/pkg/lightning/backend/kv"
"github.com/pingcap/br/pkg/lightning/common"
Expand Down Expand Up @@ -362,6 +365,7 @@ type local struct {

engineMemCacheSize int
localWriterMemCacheSize int64
supportMultiIngest bool
}

// connPool is a lazy pool of gRPC channels.
Expand Down Expand Up @@ -480,9 +484,54 @@ func NewLocalBackend(
localWriterMemCacheSize: int64(cfg.LocalWriterMemCacheSize),
}
local.conns.conns = make(map[uint64]*connPool)
if err = local.checkMultiIngestSupport(ctx, pdCli); err != nil {
return backend.MakeBackend(nil), err
}

return backend.MakeBackend(local), nil
}

<<<<<<< HEAD
=======
func (local *local) checkMultiIngestSupport(ctx context.Context, pdClient pd.Client) error {
stores, err := conn.GetAllTiKVStores(ctx, pdClient, conn.SkipTiFlash)
if err != nil {
return errors.Trace(err)
}
for _, s := range stores {
client, err := local.getImportClient(ctx, s.Id)
if err != nil {
return errors.Trace(err)
}
_, err = client.MultiIngest(ctx, &sst.MultiIngestRequest{})
if err != nil {
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unimplemented {
log.L().Info("multi ingest not support", zap.Any("unsupported store", s))
local.supportMultiIngest = false
return nil
}
}
return errors.Trace(err)
}
}

local.supportMultiIngest = true
log.L().Info("multi ingest support")
return nil
}

// rlock read locks a local file and returns the File instance if it exists.
func (local *local) rLockEngine(engineId uuid.UUID) *File {
if e, ok := local.engines.Load(engineId); ok {
engine := e.(*File)
engine.rLock()
return engine
}
return nil
}

>>>>>>> 179e15db (lightning/restore: support ingset multi ssts for same range (#1089))
// lock locks a local file and returns the File instance if it exists.
func (local *local) lockEngine(engineID uuid.UUID, state importMutexState) *File {
if e, ok := local.engines.Load(engineID); ok {
Expand Down Expand Up @@ -681,11 +730,11 @@ func (local *local) CloseEngine(ctx context.Context, engineUUID uuid.UUID) error
return engineFile.flushEngineWithoutLock(ctx)
}

func (local *local) getImportClient(ctx context.Context, peer *metapb.Peer) (sst.ImportSSTClient, error) {
func (local *local) getImportClient(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error) {
local.conns.mu.Lock()
defer local.conns.mu.Unlock()

conn, err := local.getGrpcConnLocked(ctx, peer.GetStoreId())
conn, err := local.getGrpcConnLocked(ctx, storeID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -747,7 +796,7 @@ func (local *local) WriteToTiKV(
clients := make([]sst.ImportSST_WriteClient, 0, len(region.Region.GetPeers()))
requests := make([]*sst.WriteRequest, 0, len(region.Region.GetPeers()))
for _, peer := range region.Region.GetPeers() {
cli, err := local.getImportClient(ctx, peer)
cli, err := local.getImportClient(ctx, peer.StoreId)
if err != nil {
return nil, nil, stats, err
}
Expand Down Expand Up @@ -869,13 +918,13 @@ func (local *local) WriteToTiKV(
return leaderPeerMetas, remainRange, stats, nil
}

func (local *local) Ingest(ctx context.Context, meta *sst.SSTMeta, region *split.RegionInfo) (*sst.IngestResponse, error) {
func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *split.RegionInfo) (*sst.IngestResponse, error) {
leader := region.Leader
if leader == nil {
leader = region.Region.GetPeers()[0]
}

cli, err := local.getImportClient(ctx, leader)
cli, err := local.getImportClient(ctx, leader.StoreId)
if err != nil {
return nil, err
}
Expand All @@ -885,15 +934,30 @@ func (local *local) Ingest(ctx context.Context, meta *sst.SSTMeta, region *split
Peer: leader,
}

req := &sst.IngestRequest{
Context: reqCtx,
Sst: meta,
if !local.supportMultiIngest {
if len(metas) != 1 {
return nil, errors.New("batch ingest is not support")
}
req := &sst.IngestRequest{
Context: reqCtx,
Sst: metas[0],
}
resp, err := cli.Ingest(ctx, req)
return resp, errors.Trace(err)
}
<<<<<<< HEAD
resp, err := cli.Ingest(ctx, req)
if err != nil {
return nil, err
=======

req := &sst.MultiIngestRequest{
Context: reqCtx,
Ssts: metas,
>>>>>>> 179e15db (lightning/restore: support ingset multi ssts for same range (#1089))
}
return resp, nil
resp, err := cli.MultiIngest(ctx, req)
return resp, errors.Trace(err)
}

func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit int64, keysLimit int64) []Range {
Expand Down Expand Up @@ -1183,10 +1247,22 @@ loopWrite:
return nil, err
}

for _, meta := range metas {
if len(metas) == 0 {
return nil
}

batch := 1
if local.supportMultiIngest {
batch = len(metas)
}

for i := 0; i < len(metas); i += batch {
start := i * batch
end := utils.MinInt((i+1)*batch, len(metas))
ingestMetas := metas[start:end]
errCnt := 0
for errCnt < maxRetryTimes {
log.L().Debug("ingest meta", zap.Reflect("meta", meta))
log.L().Debug("ingest meta", zap.Reflect("meta", ingestMetas))
var resp *sst.IngestResponse
failpoint.Inject("FailIngestMeta", func(val failpoint.Value) {
// only inject the error once
Expand Down Expand Up @@ -1214,20 +1290,25 @@ loopWrite:
}
})
if resp == nil {
resp, err = local.Ingest(ctx, meta, region)
resp, err = local.Ingest(ctx, ingestMetas, region)
}
if err != nil {
if common.IsContextCanceledError(err) {
return nil, err
}
<<<<<<< HEAD
log.L().Warn("ingest failed", log.ShortError(err), log.ZapRedactReflect("meta", meta),
log.ZapRedactReflect("region", region))
=======
log.L().Warn("ingest failed", log.ShortError(err), logutil.SSTMetas(ingestMetas),
logutil.Region(region.Region), logutil.Leader(region.Leader))
>>>>>>> 179e15db (lightning/restore: support ingset multi ssts for same range (#1089))
errCnt++
continue
}
var retryTy retryType
var newRegion *split.RegionInfo
retryTy, newRegion, err = local.isIngestRetryable(ctx, resp, region, meta)
retryTy, newRegion, err = local.isIngestRetryable(ctx, resp, region, ingestMetas)
if common.IsContextCanceledError(err) {
return nil, err
}
Expand All @@ -1237,8 +1318,13 @@ loopWrite:
}
switch retryTy {
case retryNone:
<<<<<<< HEAD
log.L().Warn("ingest failed noretry", log.ShortError(err), log.ZapRedactReflect("meta", meta),
log.ZapRedactReflect("region", region))
=======
log.L().Warn("ingest failed noretry", log.ShortError(err), logutil.SSTMetas(ingestMetas),
logutil.Region(region.Region), logutil.Leader(region.Leader))
>>>>>>> 179e15db (lightning/restore: support ingset multi ssts for same range (#1089))
// met non-retryable error retry whole Write procedure
return remainRange, err
case retryWrite:
Expand Down Expand Up @@ -1575,7 +1661,7 @@ func (local *local) isIngestRetryable(
ctx context.Context,
resp *sst.IngestResponse,
region *split.RegionInfo,
meta *sst.SSTMeta,
metas []*sst.SSTMeta,
) (retryType, *split.RegionInfo, error) {
if resp.GetError() == nil {
return retryNone, nil, nil
Expand Down Expand Up @@ -1624,7 +1710,7 @@ func (local *local) isIngestRetryable(
if currentRegions := errPb.GetEpochNotMatch().GetCurrentRegions(); currentRegions != nil {
var currentRegion *metapb.Region
for _, r := range currentRegions {
if insideRegion(r, meta) {
if insideRegion(r, metas) {
currentRegion = r
break
}
Expand Down
24 changes: 16 additions & 8 deletions pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,13 +443,21 @@ func (s *localSuite) TestIsIngestRetryable(c *C) {
},
},
}
meta := &sst.SSTMeta{
Range: &sst.Range{
Start: []byte{1},
End: []byte{2},
metas := []*sst.SSTMeta{
{
Range: &sst.Range{
Start: []byte{1},
End: []byte{2},
},
},
{
Range: &sst.Range{
Start: []byte{1, 1},
End: []byte{2},
},
},
}
retryType, newRegion, err := local.isIngestRetryable(ctx, resp, region, meta)
retryType, newRegion, err := local.isIngestRetryable(ctx, resp, region, metas)
c.Assert(retryType, Equals, retryWrite)
c.Assert(newRegion.Leader.Id, Equals, uint64(2))
c.Assert(err, NotNil)
Expand All @@ -470,18 +478,18 @@ func (s *localSuite) TestIsIngestRetryable(c *C) {
},
},
}
retryType, newRegion, err = local.isIngestRetryable(ctx, resp, region, meta)
retryType, newRegion, err = local.isIngestRetryable(ctx, resp, region, metas)
c.Assert(retryType, Equals, retryWrite)
c.Assert(newRegion.Region.RegionEpoch.Version, Equals, uint64(2))
c.Assert(err, NotNil)

resp.Error = &errorpb.Error{Message: "raft: proposal dropped"}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, meta)
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
c.Assert(retryType, Equals, retryWrite)
c.Assert(err, NotNil)

resp.Error = &errorpb.Error{Message: "unknown error"}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, meta)
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
c.Assert(retryType, Equals, retryNone)
c.Assert(err, ErrorMatches, "non-retryable error: unknown error")
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,9 +464,13 @@ func beforeEnd(key []byte, end []byte) bool {
return bytes.Compare(key, end) < 0 || len(end) == 0
}

func insideRegion(region *metapb.Region, meta *sst.SSTMeta) bool {
rg := meta.GetRange()
return keyInsideRegion(region, rg.GetStart()) && keyInsideRegion(region, rg.GetEnd())
func insideRegion(region *metapb.Region, metas []*sst.SSTMeta) bool {
inside := true
for _, meta := range metas {
rg := meta.GetRange()
inside = inside && (keyInsideRegion(region, rg.GetStart()) && keyInsideRegion(region, rg.GetEnd()))
}
return inside
}

func keyInsideRegion(region *metapb.Region, key []byte) bool {
Expand Down
17 changes: 17 additions & 0 deletions pkg/logutil/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/google/uuid"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -160,6 +161,22 @@ func SSTMeta(sstMeta *import_sstpb.SSTMeta) zap.Field {
return zap.Object("sstMeta", zapSSTMetaMarshaler{sstMeta})
}

type zapSSTMetasMarshaler []*import_sstpb.SSTMeta

func (m zapSSTMetasMarshaler) MarshalLogArray(encoder zapcore.ArrayEncoder) error {
for _, meta := range m {
if err := encoder.AppendObject(zapSSTMetaMarshaler{meta}); err != nil {
return errors.Trace(err)
}
}
return nil
}

// SSTMetas make the zap fields for SST metas.
func SSTMetas(sstMetas []*import_sstpb.SSTMeta) zap.Field {
return zap.Array("sstMetas", zapSSTMetasMarshaler(sstMetas))
}

type zapKeysMarshaler [][]byte

func (keys zapKeysMarshaler) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
Expand Down
Loading

0 comments on commit fd12d0a

Please sign in to comment.