Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
make ingest compatible with old tikv
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv committed May 7, 2021
1 parent 3bf39ff commit 5e5d384
Show file tree
Hide file tree
Showing 9 changed files with 324 additions and 134 deletions.
4 changes: 1 addition & 3 deletions go.mod1
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3
github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd
github.com/pingcap/kvproto v0.0.0-20210308063835-39b884695fb8
github.com/pingcap/kvproto v0.0.0-20210507074444-0ec2d0dc2e4b
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4
github.com/pingcap/parser v0.0.0-20210330190622-f959a136fc19
github.com/pingcap/tidb v1.1.0-beta.0.20210419040752-76ba3c84acbc
Expand Down Expand Up @@ -55,5 +55,3 @@ require (
)

replace cloud.google.com/go/storage => github.com/3pointer/google-cloud-go/storage v1.6.1-0.20210108125931-b59bfa0720b2

replace github.com/pingcap/kvproto => github.com/Little-Wallace/kvproto v0.0.0-20210430082408-3c390efa277e
4 changes: 2 additions & 2 deletions go.sum1
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ github.com/HdrHistogram/hdrhistogram-go v0.9.0/go.mod h1:nxrse8/Tzg2tg3DZcZjm6qE
github.com/Jeffail/gabs/v2 v2.5.1 h1:ANfZYjpMlfTTKebycu4X1AgkVWumFVDYQl7JwOr4mDk=
github.com/Jeffail/gabs/v2 v2.5.1/go.mod h1:xCn81vdHKxFUuWWAaD5jCTQDNPBMh5pPs9IJ+NcziBI=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/Little-Wallace/kvproto v0.0.0-20210430082408-3c390efa277e h1:QvMdPK33NFmes0xLpDC4U6fg+0vyKZ3pT28VaeIgJGE=
github.com/Little-Wallace/kvproto v0.0.0-20210430082408-3c390efa277e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/PuerkitoBio/purell v1.1.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
Expand Down Expand Up @@ -454,6 +452,8 @@ github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLy
github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210308063835-39b884695fb8 h1:t72qxPxunoKykkAuO5glpWGdoP+RmvKvX0lvmyFV0fI=
github.com/pingcap/kvproto v0.0.0-20210308063835-39b884695fb8/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210507074444-0ec2d0dc2e4b h1:e42N26QQjVA/obDrFFapJ1YLB+j5aPQOh7R+cIGR9Bk=
github.com/pingcap/kvproto v0.0.0-20210507074444-0ec2d0dc2e4b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
185 changes: 119 additions & 66 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"io"
"math"
"os"
Expand Down Expand Up @@ -750,6 +752,7 @@ type local struct {

engineMemCacheSize int
localWriterMemCacheSize int64
supportMultiIngest bool
}

// connPool is a lazy pool of gRPC channels.
Expand Down Expand Up @@ -868,9 +871,39 @@ func NewLocalBackend(
localWriterMemCacheSize: int64(cfg.LocalWriterMemCacheSize),
}
local.conns.conns = make(map[uint64]*connPool)
if err = local.checkMultiIngestSupport(ctx, pdCli); err != nil {
return backend.MakeBackend(nil), err
}

return backend.MakeBackend(local), nil
}

func (local *local) checkMultiIngestSupport(ctx context.Context, pdClient pd.Client) error {
stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return errors.Trace(err)
}
for _, s := range stores {
client, err := local.getImportClient(ctx, s.Id)
if err != nil {
return errors.Trace(err)
}
_, err = client.MultiIngest(ctx, &sst.MultiIngestRequest{})
if err != nil {
if s, ok := status.FromError(err); ok {
if s.Code() == codes.Unimplemented {
local.supportMultiIngest = false
return nil
}
}
return errors.Trace(err)
}
}

local.supportMultiIngest = true
return nil
}

// rlock read locks a local file and returns the File instance if it exists.
func (local *local) rLockEngine(engineId uuid.UUID) *File {
if e, ok := local.engines.Load(engineId); ok {
Expand Down Expand Up @@ -1134,11 +1167,11 @@ func (local *local) CloseEngine(ctx context.Context, engineUUID uuid.UUID) error
return engineFile.ingestErr.Get()
}

func (local *local) getImportClient(ctx context.Context, peer *metapb.Peer) (sst.ImportSSTClient, error) {
func (local *local) getImportClient(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error) {
local.conns.mu.Lock()
defer local.conns.mu.Unlock()

conn, err := local.getGrpcConnLocked(ctx, peer.GetStoreId())
conn, err := local.getGrpcConnLocked(ctx, storeID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1200,7 +1233,7 @@ func (local *local) WriteToTiKV(
clients := make([]sst.ImportSST_WriteClient, 0, len(region.Region.GetPeers()))
requests := make([]*sst.WriteRequest, 0, len(region.Region.GetPeers()))
for _, peer := range region.Region.GetPeers() {
cli, err := local.getImportClient(ctx, peer)
cli, err := local.getImportClient(ctx, peer.StoreId)
if err != nil {
return nil, Range{}, stats, err
}
Expand Down Expand Up @@ -1328,7 +1361,7 @@ func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *sp
leader = region.Region.GetPeers()[0]
}

cli, err := local.getImportClient(ctx, leader)
cli, err := local.getImportClient(ctx, leader.StoreId)
if err != nil {
return nil, err
}
Expand All @@ -1338,15 +1371,24 @@ func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *sp
Peer: leader,
}

if !local.supportMultiIngest {
if len(metas) > 0 {
return nil, errors.New("batch ingest is not support")
}
req := &sst.IngestRequest{
Context: reqCtx,
Sst: metas[0],
}
resp, err := cli.Ingest(ctx, req)
return resp, errors.Trace(err)
}

req := &sst.MultiIngestRequest{
Context: reqCtx,
Ssts: metas,
}
resp, err := cli.MultiIngest(ctx, req)
if err != nil {
return nil, errors.Trace(err)
}
return resp, nil
return resp, errors.Trace(err)
}

func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit int64, keysLimit int64) []Range {
Expand Down Expand Up @@ -1647,72 +1689,83 @@ loopWrite:
return errors.New("sst metas is empty")
}

errCnt := 0
for errCnt < maxRetryTimes {
log.L().Debug("ingest meta", zap.Reflect("metas", metas))
var resp *sst.IngestResponse
failpoint.Inject("FailIngestMeta", func(val failpoint.Value) {
// only inject the error once
switch val.(string) {
case "notleader":
resp = &sst.IngestResponse{
Error: &errorpb.Error{
NotLeader: &errorpb.NotLeader{
RegionId: region.Region.Id,
Leader: region.Leader,
batch := 1
if local.supportMultiIngest {
batch = len(metas)
}

for i := 0; i < len(metas); i += batch {
start := i * batch
end := utils.MinInt((i+1)*batch, len(metas))
ingestMetas := metas[start:end]
errCnt := 0
for errCnt < maxRetryTimes {
log.L().Debug("ingest meta", zap.Reflect("meta", ingestMetas))
var resp *sst.IngestResponse
failpoint.Inject("FailIngestMeta", func(val failpoint.Value) {
// only inject the error once
switch val.(string) {
case "notleader":
resp = &sst.IngestResponse{
Error: &errorpb.Error{
NotLeader: &errorpb.NotLeader{
RegionId: region.Region.Id,
Leader: region.Leader,
},
},
},
}
case "epochnotmatch":
resp = &sst.IngestResponse{
Error: &errorpb.Error{
EpochNotMatch: &errorpb.EpochNotMatch{
CurrentRegions: []*metapb.Region{region.Region},
}
case "epochnotmatch":
resp = &sst.IngestResponse{
Error: &errorpb.Error{
EpochNotMatch: &errorpb.EpochNotMatch{
CurrentRegions: []*metapb.Region{region.Region},
},
},
},
}
}
if resp != nil {
err = nil
}
})
if resp == nil {
resp, err = local.Ingest(ctx, ingestMetas, region)
}
if resp != nil {
err = nil
if err != nil {
if common.IsContextCanceledError(err) {
return err
}
log.L().Warn("ingest failed", log.ShortError(err), logutil.SSTMetas(ingestMetas),
logutil.Region(region.Region), logutil.Leader(region.Leader))
errCnt++
continue
}
})
if resp == nil {
resp, err = local.Ingest(ctx, metas, region)
}
if err != nil {

var retryTy retryType
var newRegion *split.RegionInfo
retryTy, newRegion, err = local.isIngestRetryable(ctx, resp, region, ingestMetas)
if common.IsContextCanceledError(err) {
return err
}
log.L().Warn("ingest failed", log.ShortError(err), logutil.SSTMetas(metas),
logutil.Region(region.Region), logutil.Leader(region.Leader))
errCnt++
continue
}

var retryTy retryType
var newRegion *split.RegionInfo
retryTy, newRegion, err = local.isIngestRetryable(ctx, resp, region, metas[0])
if common.IsContextCanceledError(err) {
return err
}
if err == nil {
// ingest next meta
break
}
switch retryTy {
case retryNone:
log.L().Warn("ingest failed noretry", log.ShortError(err), logutil.SSTMetas(metas),
logutil.Region(region.Region), logutil.Leader(region.Leader))
// met non-retryable error retry whole Write procedure
return err
case retryWrite:
region = newRegion
continue loopWrite
case retryIngest:
region = newRegion
continue
if err == nil {
// ingest next meta
break
}
switch retryTy {
case retryNone:
log.L().Warn("ingest failed noretry", log.ShortError(err), logutil.SSTMetas(ingestMetas),
logutil.Region(region.Region), logutil.Leader(region.Leader))
// met non-retryable error retry whole Write procedure
return err
case retryWrite:
region = newRegion
continue loopWrite
case retryIngest:
region = newRegion
continue
}
}
}

if err != nil {
log.L().Warn("write and ingest region, will retry import full range", log.ShortError(err),
logutil.Region(region.Region), logutil.Key("start", start),
Expand Down Expand Up @@ -2124,7 +2177,7 @@ func (local *local) isIngestRetryable(
ctx context.Context,
resp *sst.IngestResponse,
region *split.RegionInfo,
meta *sst.SSTMeta,
metas []*sst.SSTMeta,
) (retryType, *split.RegionInfo, error) {
if resp.GetError() == nil {
return retryNone, nil, nil
Expand Down Expand Up @@ -2173,7 +2226,7 @@ func (local *local) isIngestRetryable(
if currentRegions := errPb.GetEpochNotMatch().GetCurrentRegions(); currentRegions != nil {
var currentRegion *metapb.Region
for _, r := range currentRegions {
if insideRegion(r, meta) {
if insideRegion(r, metas) {
currentRegion = r
break
}
Expand Down
24 changes: 16 additions & 8 deletions pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,13 +461,21 @@ func (s *localSuite) TestIsIngestRetryable(c *C) {
},
},
}
meta := &sst.SSTMeta{
Range: &sst.Range{
Start: []byte{1},
End: []byte{2},
metas := []*sst.SSTMeta{
{
Range: &sst.Range{
Start: []byte{1},
End: []byte{2},
},
},
{
Range: &sst.Range{
Start: []byte{1, 1},
End: []byte{2},
},
},
}
retryType, newRegion, err := local.isIngestRetryable(ctx, resp, region, meta)
retryType, newRegion, err := local.isIngestRetryable(ctx, resp, region, metas)
c.Assert(retryType, Equals, retryWrite)
c.Assert(newRegion.Leader.Id, Equals, uint64(2))
c.Assert(err, NotNil)
Expand All @@ -488,18 +496,18 @@ func (s *localSuite) TestIsIngestRetryable(c *C) {
},
},
}
retryType, newRegion, err = local.isIngestRetryable(ctx, resp, region, meta)
retryType, newRegion, err = local.isIngestRetryable(ctx, resp, region, metas)
c.Assert(retryType, Equals, retryWrite)
c.Assert(newRegion.Region.RegionEpoch.Version, Equals, uint64(2))
c.Assert(err, NotNil)

resp.Error = &errorpb.Error{Message: "raft: proposal dropped"}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, meta)
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
c.Assert(retryType, Equals, retryWrite)
c.Assert(err, NotNil)

resp.Error = &errorpb.Error{Message: "unknown error"}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, meta)
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
c.Assert(retryType, Equals, retryNone)
c.Assert(err, ErrorMatches, "non-retryable error: unknown error")
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,13 @@ func beforeEnd(key []byte, end []byte) bool {
return bytes.Compare(key, end) < 0 || len(end) == 0
}

func insideRegion(region *metapb.Region, meta *sst.SSTMeta) bool {
rg := meta.GetRange()
return keyInsideRegion(region, rg.GetStart()) && keyInsideRegion(region, rg.GetEnd())
func insideRegion(region *metapb.Region, metas []*sst.SSTMeta) bool {
inside := true
for _, meta := range metas {
rg := meta.GetRange()
inside = inside && (keyInsideRegion(region, rg.GetStart()) && keyInsideRegion(region, rg.GetEnd()))
}
return inside
}

func keyInsideRegion(region *metapb.Region, key []byte) bool {
Expand Down
Loading

0 comments on commit 5e5d384

Please sign in to comment.