Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

lightning/restore: support ingset multi ssts for same range (#1089) #1150

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions go.mod1
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ require (
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3
github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd
github.com/pingcap/kvproto v0.0.0-20210308063835-39b884695fb8
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8
github.com/pingcap/kvproto v0.0.0-20210526125828-7790f94ac2ad
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4
github.com/pingcap/parser v0.0.0-20210325072920-0d17053a8a69
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 // indirect
github.com/pingcap/tidb v1.1.0-beta.0.20210416032353-4c49754750e5
github.com/pingcap/tidb v1.1.0-beta.0.20210526134734-25b427673b49
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible
github.com/pingcap/tipb v0.0.0-20210326161441-1164ca065d1b
github.com/pingcap/tipb v0.0.0-20210422074242-57dd881b81b1
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0
Expand All @@ -48,9 +48,11 @@ require (
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/sys v0.0.0-20210324051608-47abb6519492
golang.org/x/text v0.3.5
golang.org/x/text v0.3.6
google.golang.org/api v0.22.0
google.golang.org/grpc v1.27.1
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
honnef.co/go/tools v0.1.4 // indirect
modernc.org/mathutil v1.2.2
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0
)
Expand Down
19 changes: 15 additions & 4 deletions go.sum1
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUM
github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pingcap/badger v1.5.1-0.20200908111422-2e78ee155d19 h1:IXpGy7y9HyoShAFmzW2OPF0xCA5EOoSTyZHwsgYk9Ro=
github.com/pingcap/badger v1.5.1-0.20200908111422-2e78ee155d19/go.mod h1:LyrqUOHZrUDf9oGi1yoz1+qw9ckSIhQb5eMa1acOLNQ=
github.com/pingcap/br v5.0.0-nightly.0.20210319135924-c01fcc757681+incompatible/go.mod h1:ymVmo50lQydxib0tmK5hHk4oteB7hZ0IMCArunwy3UQ=
github.com/pingcap/br v5.0.1-0.20210416053351-e5ca378903a1+incompatible/go.mod h1:ymVmo50lQydxib0tmK5hHk4oteB7hZ0IMCArunwy3UQ=
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ=
github.com/pingcap/check v0.0.0-20191107115940-caf2b9e6ccf4/go.mod h1:PYMCGwN0JHjoqGr3HrZoD+b8Tgx8bKnArhSq8YVzUMc=
github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12/go.mod h1:PYMCGwN0JHjoqGr3HrZoD+b8Tgx8bKnArhSq8YVzUMc=
Expand All @@ -474,25 +474,30 @@ github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200810113304-6157337686b1/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210308063835-39b884695fb8 h1:t72qxPxunoKykkAuO5glpWGdoP+RmvKvX0lvmyFV0fI=
github.com/pingcap/kvproto v0.0.0-20210308063835-39b884695fb8/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210526125828-7790f94ac2ad h1:2Zu34Pqg8LGGGTZ6h+TFKWmNciQyPIOPoY6CG2o3Lxk=
github.com/pingcap/kvproto v0.0.0-20210526125828-7790f94ac2ad/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8 h1:M+DNpOu/I3uDmwee6vcnoPd6GgSMqND4gxvDQ/W584U=
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 h1:ERrF0fTuIOnwfGbt71Ji3DKbOEaP189tjym50u8gpC8=
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/parser v0.0.0-20210325072920-0d17053a8a69 h1:NbBYs3yBHZYmVZFVzAgZoje6jEWnTYprp5D5ELMtXyI=
github.com/pingcap/parser v0.0.0-20210325072920-0d17053a8a69/go.mod h1:GbEr2PgY72/4XqPZzmzstlOU/+il/wrjeTNFs6ihsSE=
github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI=
github.com/pingcap/sysutil v0.0.0-20210221112134-a07bda3bde99/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI=
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 h1:A9KL9R+lWSVPH8IqUuH1QSTRJ5FGoY1bT2IcfPKsWD8=
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041UWP+NqYzrJ3fMgC/Hw9wnmQ/tUkp/JaHly8=
github.com/pingcap/tidb v1.1.0-beta.0.20210416032353-4c49754750e5 h1:G8vUpPdDXQZhcekOfr2h39nMXBwTPXHDNmySNlyaX4c=
github.com/pingcap/tidb v1.1.0-beta.0.20210416032353-4c49754750e5/go.mod h1:0YlKVIfp9jMoi6qK/2QWq1bwC0//JqzkfnpthK5oNcc=
github.com/pingcap/tidb v1.1.0-beta.0.20210526134734-25b427673b49 h1:MahHTshw1HAIP+43UrCRcKtKgMxSFySPWysJqRiTuyM=
github.com/pingcap/tidb v1.1.0-beta.0.20210526134734-25b427673b49/go.mod h1:7JalxiNiujvDKRHzFp5U9rfjVm++T70ORT20t6IFlRE=
github.com/pingcap/tidb-dashboard v0.0.0-20210312062513-eef5d6404638/go.mod h1:OzFN8H0EDMMqeulPhPMw2i2JaiZWOKFQ7zdRPhENNgo=
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible h1:ceznmu/lLseGHP/jKyOa/3u/5H3wtLLLqkH2V3ssSjg=
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20210326161441-1164ca065d1b h1:sZHSH0mh8PcRbmZlsIqP7CEwnfFuBpmkGt5i9JStLWA=
github.com/pingcap/tipb v0.0.0-20210326161441-1164ca065d1b/go.mod h1:nsEhnMokcn7MRqd2J60yxpn/ac3ZH8A6GOJ9NslabUo=
github.com/pingcap/tipb v0.0.0-20210422074242-57dd881b81b1 h1:Kcp3jIcQrqG+pT1JQ0oWyRncVKQtDgnMFzRt3zJBaBo=
github.com/pingcap/tipb v0.0.0-20210422074242-57dd881b81b1/go.mod h1:nsEhnMokcn7MRqd2J60yxpn/ac3ZH8A6GOJ9NslabUo=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -826,6 +831,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down Expand Up @@ -936,6 +943,8 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b h1:QRR6H1YWRnHb4Y/HeNFCTJLFVxaq6wH4YuVdsUOr75U=
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
Expand Down Expand Up @@ -974,6 +983,8 @@ honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.1.3 h1:qTakTkI6ni6LFD5sBwwsdSO+AQqbSIxOauHTTQKZ/7o=
honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
honnef.co/go/tools v0.1.4 h1:SadWOkti5uVN1FAMgxn165+Mw00fuQKyk4Gyn/inxNQ=
honnef.co/go/tools v0.1.4/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
modernc.org/mathutil v1.2.2 h1:+yFk8hBprV+4c0U9GjFtL+dV3N8hOJ8JCituQcMShFY=
modernc.org/mathutil v1.2.2/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
Expand Down
1 change: 0 additions & 1 deletion pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ func sendChecksumRequest(
if err != nil {
return nil, errors.Trace(err)
}
res.Fetch(ctx)
defer func() {
if err1 := res.Close(); err1 != nil {
err = err1
Expand Down
3 changes: 0 additions & 3 deletions pkg/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,6 @@ func (t *transaction) Delete(k kv.Key) error {
return t.kvMemBuf.Delete(k)
}

// SetAssertion implements the kv.Transaction interface.
func (t *transaction) SetAssertion(kv.Key, kv.AssertionType) {}

func (t *transaction) GetUnionStore() kv.UnionStore {
return &t.kvUnionStore
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,6 @@ func (t *transaction) Set(k kv.Key, v []byte) error {
return t.kvMemBuf.Set(k, v)
}

// SetAssertion implements the kv.Transaction interface
func (t *transaction) SetAssertion(kv.Key, kv.AssertionType) {}

func (t *transaction) GetUnionStore() kv.UnionStore {
return &t.kvUnionStore
}
Expand Down
102 changes: 80 additions & 22 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,12 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"

"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/lightning/backend"
"github.com/pingcap/br/pkg/lightning/backend/kv"
"github.com/pingcap/br/pkg/lightning/common"
Expand All @@ -64,6 +67,7 @@ import (
"github.com/pingcap/br/pkg/lightning/metric"
"github.com/pingcap/br/pkg/lightning/tikv"
"github.com/pingcap/br/pkg/lightning/worker"
"github.com/pingcap/br/pkg/logutil"
split "github.com/pingcap/br/pkg/restore"
"github.com/pingcap/br/pkg/utils"
"github.com/pingcap/br/pkg/version"
Expand Down Expand Up @@ -362,6 +366,7 @@ type local struct {

engineMemCacheSize int
localWriterMemCacheSize int64
supportMultiIngest bool
}

// connPool is a lazy pool of gRPC channels.
Expand Down Expand Up @@ -480,9 +485,41 @@ func NewLocalBackend(
localWriterMemCacheSize: int64(cfg.LocalWriterMemCacheSize),
}
local.conns.conns = make(map[uint64]*connPool)
if err = local.checkMultiIngestSupport(ctx, pdCli); err != nil {
return backend.MakeBackend(nil), err
}

return backend.MakeBackend(local), nil
}

func (local *local) checkMultiIngestSupport(ctx context.Context, pdClient pd.Client) error {
stores, err := conn.GetAllTiKVStores(ctx, pdClient, conn.SkipTiFlash)
if err != nil {
return errors.Trace(err)
}
for _, s := range stores {
client, err := local.getImportClient(ctx, s.Id)
if err != nil {
return errors.Trace(err)
}
_, err = client.MultiIngest(ctx, &sst.MultiIngestRequest{})
if err != nil {
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unimplemented {
log.L().Info("multi ingest not support", zap.Any("unsupported store", s))
local.supportMultiIngest = false
return nil
}
}
return errors.Trace(err)
}
}

local.supportMultiIngest = true
log.L().Info("multi ingest support")
return nil
}

// lock locks a local file and returns the File instance if it exists.
func (local *local) lockEngine(engineID uuid.UUID, state importMutexState) *File {
if e, ok := local.engines.Load(engineID); ok {
Expand Down Expand Up @@ -681,11 +718,11 @@ func (local *local) CloseEngine(ctx context.Context, engineUUID uuid.UUID) error
return engineFile.flushEngineWithoutLock(ctx)
}

func (local *local) getImportClient(ctx context.Context, peer *metapb.Peer) (sst.ImportSSTClient, error) {
func (local *local) getImportClient(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error) {
local.conns.mu.Lock()
defer local.conns.mu.Unlock()

conn, err := local.getGrpcConnLocked(ctx, peer.GetStoreId())
conn, err := local.getGrpcConnLocked(ctx, storeID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -747,7 +784,7 @@ func (local *local) WriteToTiKV(
clients := make([]sst.ImportSST_WriteClient, 0, len(region.Region.GetPeers()))
requests := make([]*sst.WriteRequest, 0, len(region.Region.GetPeers()))
for _, peer := range region.Region.GetPeers() {
cli, err := local.getImportClient(ctx, peer)
cli, err := local.getImportClient(ctx, peer.StoreId)
if err != nil {
return nil, nil, stats, err
}
Expand Down Expand Up @@ -869,13 +906,13 @@ func (local *local) WriteToTiKV(
return leaderPeerMetas, remainRange, stats, nil
}

func (local *local) Ingest(ctx context.Context, meta *sst.SSTMeta, region *split.RegionInfo) (*sst.IngestResponse, error) {
func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *split.RegionInfo) (*sst.IngestResponse, error) {
leader := region.Leader
if leader == nil {
leader = region.Region.GetPeers()[0]
}

cli, err := local.getImportClient(ctx, leader)
cli, err := local.getImportClient(ctx, leader.StoreId)
if err != nil {
return nil, err
}
Expand All @@ -885,15 +922,24 @@ func (local *local) Ingest(ctx context.Context, meta *sst.SSTMeta, region *split
Peer: leader,
}

req := &sst.IngestRequest{
Context: reqCtx,
Sst: meta,
if !local.supportMultiIngest {
if len(metas) != 1 {
return nil, errors.New("batch ingest is not support")
}
req := &sst.IngestRequest{
Context: reqCtx,
Sst: metas[0],
}
resp, err := cli.Ingest(ctx, req)
return resp, errors.Trace(err)
}
resp, err := cli.Ingest(ctx, req)
if err != nil {
return nil, err

req := &sst.MultiIngestRequest{
Context: reqCtx,
Ssts: metas,
}
return resp, nil
resp, err := cli.MultiIngest(ctx, req)
return resp, errors.Trace(err)
}

func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit int64, keysLimit int64) []Range {
Expand Down Expand Up @@ -1183,10 +1229,22 @@ loopWrite:
return nil, err
}

for _, meta := range metas {
if len(metas) == 0 {
return nil, nil
}

batch := 1
if local.supportMultiIngest {
batch = len(metas)
}

for i := 0; i < len(metas); i += batch {
start := i * batch
end := utils.MinInt((i+1)*batch, len(metas))
ingestMetas := metas[start:end]
errCnt := 0
for errCnt < maxRetryTimes {
log.L().Debug("ingest meta", zap.Reflect("meta", meta))
log.L().Debug("ingest meta", zap.Reflect("meta", ingestMetas))
var resp *sst.IngestResponse
failpoint.Inject("FailIngestMeta", func(val failpoint.Value) {
// only inject the error once
Expand Down Expand Up @@ -1214,20 +1272,20 @@ loopWrite:
}
})
if resp == nil {
resp, err = local.Ingest(ctx, meta, region)
resp, err = local.Ingest(ctx, ingestMetas, region)
}
if err != nil {
if common.IsContextCanceledError(err) {
return nil, err
}
log.L().Warn("ingest failed", log.ShortError(err), log.ZapRedactReflect("meta", meta),
log.ZapRedactReflect("region", region))
log.L().Warn("ingest failed", log.ShortError(err), logutil.SSTMetas(ingestMetas),
logutil.Region(region.Region), logutil.Leader(region.Leader))
errCnt++
continue
}
var retryTy retryType
var newRegion *split.RegionInfo
retryTy, newRegion, err = local.isIngestRetryable(ctx, resp, region, meta)
retryTy, newRegion, err = local.isIngestRetryable(ctx, resp, region, ingestMetas)
if common.IsContextCanceledError(err) {
return nil, err
}
Expand All @@ -1237,8 +1295,8 @@ loopWrite:
}
switch retryTy {
case retryNone:
log.L().Warn("ingest failed noretry", log.ShortError(err), log.ZapRedactReflect("meta", meta),
log.ZapRedactReflect("region", region))
log.L().Warn("ingest failed noretry", log.ShortError(err), logutil.SSTMetas(ingestMetas),
logutil.Region(region.Region), logutil.Leader(region.Leader))
// met non-retryable error retry whole Write procedure
return remainRange, err
case retryWrite:
Expand Down Expand Up @@ -1575,7 +1633,7 @@ func (local *local) isIngestRetryable(
ctx context.Context,
resp *sst.IngestResponse,
region *split.RegionInfo,
meta *sst.SSTMeta,
metas []*sst.SSTMeta,
) (retryType, *split.RegionInfo, error) {
if resp.GetError() == nil {
return retryNone, nil, nil
Expand Down Expand Up @@ -1624,7 +1682,7 @@ func (local *local) isIngestRetryable(
if currentRegions := errPb.GetEpochNotMatch().GetCurrentRegions(); currentRegions != nil {
var currentRegion *metapb.Region
for _, r := range currentRegions {
if insideRegion(r, meta) {
if insideRegion(r, metas) {
currentRegion = r
break
}
Expand Down
24 changes: 16 additions & 8 deletions pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,13 +443,21 @@ func (s *localSuite) TestIsIngestRetryable(c *C) {
},
},
}
meta := &sst.SSTMeta{
Range: &sst.Range{
Start: []byte{1},
End: []byte{2},
metas := []*sst.SSTMeta{
{
Range: &sst.Range{
Start: []byte{1},
End: []byte{2},
},
},
{
Range: &sst.Range{
Start: []byte{1, 1},
End: []byte{2},
},
},
}
retryType, newRegion, err := local.isIngestRetryable(ctx, resp, region, meta)
retryType, newRegion, err := local.isIngestRetryable(ctx, resp, region, metas)
c.Assert(retryType, Equals, retryWrite)
c.Assert(newRegion.Leader.Id, Equals, uint64(2))
c.Assert(err, NotNil)
Expand All @@ -470,18 +478,18 @@ func (s *localSuite) TestIsIngestRetryable(c *C) {
},
},
}
retryType, newRegion, err = local.isIngestRetryable(ctx, resp, region, meta)
retryType, newRegion, err = local.isIngestRetryable(ctx, resp, region, metas)
c.Assert(retryType, Equals, retryWrite)
c.Assert(newRegion.Region.RegionEpoch.Version, Equals, uint64(2))
c.Assert(err, NotNil)

resp.Error = &errorpb.Error{Message: "raft: proposal dropped"}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, meta)
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
c.Assert(retryType, Equals, retryWrite)
c.Assert(err, NotNil)

resp.Error = &errorpb.Error{Message: "unknown error"}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, meta)
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
c.Assert(retryType, Equals, retryNone)
c.Assert(err, ErrorMatches, "non-retryable error: unknown error")
}
Expand Down
Loading