Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
lightning/restore: support ingset multi ssts for same range (#1089) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored May 28, 2021
1 parent cb1c784 commit e81f002
Show file tree
Hide file tree
Showing 13 changed files with 324 additions and 106 deletions.
12 changes: 7 additions & 5 deletions go.mod1
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ require (
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3
github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd
github.com/pingcap/kvproto v0.0.0-20210308063835-39b884695fb8
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8
github.com/pingcap/kvproto v0.0.0-20210526125828-7790f94ac2ad
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4
github.com/pingcap/parser v0.0.0-20210325072920-0d17053a8a69
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 // indirect
github.com/pingcap/tidb v1.1.0-beta.0.20210416032353-4c49754750e5
github.com/pingcap/tidb v1.1.0-beta.0.20210528083935-67f3d1d2a914
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible
github.com/pingcap/tipb v0.0.0-20210326161441-1164ca065d1b
github.com/pingcap/tipb v0.0.0-20210422074242-57dd881b81b1
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0
Expand All @@ -48,9 +48,11 @@ require (
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/sys v0.0.0-20210324051608-47abb6519492
golang.org/x/text v0.3.5
golang.org/x/text v0.3.6
google.golang.org/api v0.22.0
google.golang.org/grpc v1.27.1
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
honnef.co/go/tools v0.1.4 // indirect
modernc.org/mathutil v1.2.2
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0
)
Expand Down
24 changes: 17 additions & 7 deletions go.sum1
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,8 @@ github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 h1:7KAv7KMGTTqSmYZtNdc
github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7/go.mod h1:iWMfgwqYW+e8n5lC/jjNEhwcjbRDpl5NT7n2h+4UNcI=
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3bPQ/0cuYh2H4rkg0tytX/07k=
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo6LNmIvEWzsW1hbBQfpUO4JWnuQRmva8=
github.com/ngaut/unistore v0.0.0-20210310131351-7ad6a204de87 h1:lVRrhmqIT2zMbmoalrgxQLwWzFd3VtFaaWy0fnMwPro=
github.com/ngaut/unistore v0.0.0-20210310131351-7ad6a204de87/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4=
github.com/ngaut/unistore v0.0.0-20210528070817-0a50b11eddfd h1:keEuTXvRjgG3GxwheNAokuMhXFe/d3ea+leB8xwaixA=
github.com/ngaut/unistore v0.0.0-20210528070817-0a50b11eddfd/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4=
github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
Expand Down Expand Up @@ -448,7 +448,7 @@ github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUM
github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pingcap/badger v1.5.1-0.20200908111422-2e78ee155d19 h1:IXpGy7y9HyoShAFmzW2OPF0xCA5EOoSTyZHwsgYk9Ro=
github.com/pingcap/badger v1.5.1-0.20200908111422-2e78ee155d19/go.mod h1:LyrqUOHZrUDf9oGi1yoz1+qw9ckSIhQb5eMa1acOLNQ=
github.com/pingcap/br v5.0.0-nightly.0.20210319135924-c01fcc757681+incompatible/go.mod h1:ymVmo50lQydxib0tmK5hHk4oteB7hZ0IMCArunwy3UQ=
github.com/pingcap/br v5.0.1-0.20210416053351-e5ca378903a1+incompatible/go.mod h1:ymVmo50lQydxib0tmK5hHk4oteB7hZ0IMCArunwy3UQ=
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ=
github.com/pingcap/check v0.0.0-20191107115940-caf2b9e6ccf4/go.mod h1:PYMCGwN0JHjoqGr3HrZoD+b8Tgx8bKnArhSq8YVzUMc=
github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12/go.mod h1:PYMCGwN0JHjoqGr3HrZoD+b8Tgx8bKnArhSq8YVzUMc=
Expand All @@ -474,25 +474,29 @@ github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200810113304-6157337686b1/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210308063835-39b884695fb8 h1:t72qxPxunoKykkAuO5glpWGdoP+RmvKvX0lvmyFV0fI=
github.com/pingcap/kvproto v0.0.0-20210308063835-39b884695fb8/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210526125828-7790f94ac2ad h1:2Zu34Pqg8LGGGTZ6h+TFKWmNciQyPIOPoY6CG2o3Lxk=
github.com/pingcap/kvproto v0.0.0-20210526125828-7790f94ac2ad/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8 h1:M+DNpOu/I3uDmwee6vcnoPd6GgSMqND4gxvDQ/W584U=
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 h1:ERrF0fTuIOnwfGbt71Ji3DKbOEaP189tjym50u8gpC8=
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/parser v0.0.0-20210325072920-0d17053a8a69 h1:NbBYs3yBHZYmVZFVzAgZoje6jEWnTYprp5D5ELMtXyI=
github.com/pingcap/parser v0.0.0-20210325072920-0d17053a8a69/go.mod h1:GbEr2PgY72/4XqPZzmzstlOU/+il/wrjeTNFs6ihsSE=
github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI=
github.com/pingcap/sysutil v0.0.0-20210221112134-a07bda3bde99/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI=
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 h1:A9KL9R+lWSVPH8IqUuH1QSTRJ5FGoY1bT2IcfPKsWD8=
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041UWP+NqYzrJ3fMgC/Hw9wnmQ/tUkp/JaHly8=
github.com/pingcap/tidb v1.1.0-beta.0.20210416032353-4c49754750e5 h1:G8vUpPdDXQZhcekOfr2h39nMXBwTPXHDNmySNlyaX4c=
github.com/pingcap/tidb v1.1.0-beta.0.20210416032353-4c49754750e5/go.mod h1:0YlKVIfp9jMoi6qK/2QWq1bwC0//JqzkfnpthK5oNcc=
github.com/pingcap/tidb v1.1.0-beta.0.20210528083935-67f3d1d2a914 h1:15V7wHJIJt9egOJkq5mZLNLbRb5iQt6mm0FvQiuf6Q4=
github.com/pingcap/tidb v1.1.0-beta.0.20210528083935-67f3d1d2a914/go.mod h1:Zh6Tn1nKO2QY1xjGlg7DuVeD1Kbmt42p5L03v1+9B5I=
github.com/pingcap/tidb-dashboard v0.0.0-20210312062513-eef5d6404638/go.mod h1:OzFN8H0EDMMqeulPhPMw2i2JaiZWOKFQ7zdRPhENNgo=
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible h1:ceznmu/lLseGHP/jKyOa/3u/5H3wtLLLqkH2V3ssSjg=
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20210326161441-1164ca065d1b h1:sZHSH0mh8PcRbmZlsIqP7CEwnfFuBpmkGt5i9JStLWA=
github.com/pingcap/tipb v0.0.0-20210326161441-1164ca065d1b/go.mod h1:nsEhnMokcn7MRqd2J60yxpn/ac3ZH8A6GOJ9NslabUo=
github.com/pingcap/tipb v0.0.0-20210422074242-57dd881b81b1 h1:Kcp3jIcQrqG+pT1JQ0oWyRncVKQtDgnMFzRt3zJBaBo=
github.com/pingcap/tipb v0.0.0-20210422074242-57dd881b81b1/go.mod h1:nsEhnMokcn7MRqd2J60yxpn/ac3ZH8A6GOJ9NslabUo=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -826,6 +830,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down Expand Up @@ -936,6 +942,8 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b h1:QRR6H1YWRnHb4Y/HeNFCTJLFVxaq6wH4YuVdsUOr75U=
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
Expand Down Expand Up @@ -974,6 +982,8 @@ honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.1.3 h1:qTakTkI6ni6LFD5sBwwsdSO+AQqbSIxOauHTTQKZ/7o=
honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
honnef.co/go/tools v0.1.4 h1:SadWOkti5uVN1FAMgxn165+Mw00fuQKyk4Gyn/inxNQ=
honnef.co/go/tools v0.1.4/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
modernc.org/mathutil v1.2.2 h1:+yFk8hBprV+4c0U9GjFtL+dV3N8hOJ8JCituQcMShFY=
modernc.org/mathutil v1.2.2/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
Expand Down
1 change: 0 additions & 1 deletion pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ func sendChecksumRequest(
if err != nil {
return nil, errors.Trace(err)
}
res.Fetch(ctx)
defer func() {
if err1 := res.Close(); err1 != nil {
err = err1
Expand Down
3 changes: 0 additions & 3 deletions pkg/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,6 @@ func (t *transaction) Delete(k kv.Key) error {
return t.kvMemBuf.Delete(k)
}

// SetAssertion implements the kv.Transaction interface.
func (t *transaction) SetAssertion(kv.Key, kv.AssertionType) {}

func (t *transaction) GetUnionStore() kv.UnionStore {
return &t.kvUnionStore
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,6 @@ func (t *transaction) Set(k kv.Key, v []byte) error {
return t.kvMemBuf.Set(k, v)
}

// SetAssertion implements the kv.Transaction interface
func (t *transaction) SetAssertion(kv.Key, kv.AssertionType) {}

func (t *transaction) GetUnionStore() kv.UnionStore {
return &t.kvUnionStore
}
Expand Down
102 changes: 80 additions & 22 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,12 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"

"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/lightning/backend"
"github.com/pingcap/br/pkg/lightning/backend/kv"
"github.com/pingcap/br/pkg/lightning/common"
Expand All @@ -64,6 +67,7 @@ import (
"github.com/pingcap/br/pkg/lightning/metric"
"github.com/pingcap/br/pkg/lightning/tikv"
"github.com/pingcap/br/pkg/lightning/worker"
"github.com/pingcap/br/pkg/logutil"
split "github.com/pingcap/br/pkg/restore"
"github.com/pingcap/br/pkg/utils"
"github.com/pingcap/br/pkg/version"
Expand Down Expand Up @@ -362,6 +366,7 @@ type local struct {

engineMemCacheSize int
localWriterMemCacheSize int64
supportMultiIngest bool
}

// connPool is a lazy pool of gRPC channels.
Expand Down Expand Up @@ -480,9 +485,41 @@ func NewLocalBackend(
localWriterMemCacheSize: int64(cfg.LocalWriterMemCacheSize),
}
local.conns.conns = make(map[uint64]*connPool)
if err = local.checkMultiIngestSupport(ctx, pdCli); err != nil {
return backend.MakeBackend(nil), err
}

return backend.MakeBackend(local), nil
}

func (local *local) checkMultiIngestSupport(ctx context.Context, pdClient pd.Client) error {
stores, err := conn.GetAllTiKVStores(ctx, pdClient, conn.SkipTiFlash)
if err != nil {
return errors.Trace(err)
}
for _, s := range stores {
client, err := local.getImportClient(ctx, s.Id)
if err != nil {
return errors.Trace(err)
}
_, err = client.MultiIngest(ctx, &sst.MultiIngestRequest{})
if err != nil {
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unimplemented {
log.L().Info("multi ingest not support", zap.Any("unsupported store", s))
local.supportMultiIngest = false
return nil
}
}
return errors.Trace(err)
}
}

local.supportMultiIngest = true
log.L().Info("multi ingest support")
return nil
}

// lock locks a local file and returns the File instance if it exists.
func (local *local) lockEngine(engineID uuid.UUID, state importMutexState) *File {
if e, ok := local.engines.Load(engineID); ok {
Expand Down Expand Up @@ -681,11 +718,11 @@ func (local *local) CloseEngine(ctx context.Context, engineUUID uuid.UUID) error
return engineFile.flushEngineWithoutLock(ctx)
}

func (local *local) getImportClient(ctx context.Context, peer *metapb.Peer) (sst.ImportSSTClient, error) {
func (local *local) getImportClient(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error) {
local.conns.mu.Lock()
defer local.conns.mu.Unlock()

conn, err := local.getGrpcConnLocked(ctx, peer.GetStoreId())
conn, err := local.getGrpcConnLocked(ctx, storeID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -747,7 +784,7 @@ func (local *local) WriteToTiKV(
clients := make([]sst.ImportSST_WriteClient, 0, len(region.Region.GetPeers()))
requests := make([]*sst.WriteRequest, 0, len(region.Region.GetPeers()))
for _, peer := range region.Region.GetPeers() {
cli, err := local.getImportClient(ctx, peer)
cli, err := local.getImportClient(ctx, peer.StoreId)
if err != nil {
return nil, nil, stats, err
}
Expand Down Expand Up @@ -869,13 +906,13 @@ func (local *local) WriteToTiKV(
return leaderPeerMetas, remainRange, stats, nil
}

func (local *local) Ingest(ctx context.Context, meta *sst.SSTMeta, region *split.RegionInfo) (*sst.IngestResponse, error) {
func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *split.RegionInfo) (*sst.IngestResponse, error) {
leader := region.Leader
if leader == nil {
leader = region.Region.GetPeers()[0]
}

cli, err := local.getImportClient(ctx, leader)
cli, err := local.getImportClient(ctx, leader.StoreId)
if err != nil {
return nil, err
}
Expand All @@ -885,15 +922,24 @@ func (local *local) Ingest(ctx context.Context, meta *sst.SSTMeta, region *split
Peer: leader,
}

req := &sst.IngestRequest{
Context: reqCtx,
Sst: meta,
if !local.supportMultiIngest {
if len(metas) != 1 {
return nil, errors.New("batch ingest is not support")
}
req := &sst.IngestRequest{
Context: reqCtx,
Sst: metas[0],
}
resp, err := cli.Ingest(ctx, req)
return resp, errors.Trace(err)
}
resp, err := cli.Ingest(ctx, req)
if err != nil {
return nil, err

req := &sst.MultiIngestRequest{
Context: reqCtx,
Ssts: metas,
}
return resp, nil
resp, err := cli.MultiIngest(ctx, req)
return resp, errors.Trace(err)
}

func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit int64, keysLimit int64) []Range {
Expand Down Expand Up @@ -1183,10 +1229,22 @@ loopWrite:
return nil, err
}

for _, meta := range metas {
if len(metas) == 0 {
return nil, nil
}

batch := 1
if local.supportMultiIngest {
batch = len(metas)
}

for i := 0; i < len(metas); i += batch {
start := i * batch
end := utils.MinInt((i+1)*batch, len(metas))
ingestMetas := metas[start:end]
errCnt := 0
for errCnt < maxRetryTimes {
log.L().Debug("ingest meta", zap.Reflect("meta", meta))
log.L().Debug("ingest meta", zap.Reflect("meta", ingestMetas))
var resp *sst.IngestResponse
failpoint.Inject("FailIngestMeta", func(val failpoint.Value) {
// only inject the error once
Expand Down Expand Up @@ -1214,20 +1272,20 @@ loopWrite:
}
})
if resp == nil {
resp, err = local.Ingest(ctx, meta, region)
resp, err = local.Ingest(ctx, ingestMetas, region)
}
if err != nil {
if common.IsContextCanceledError(err) {
return nil, err
}
log.L().Warn("ingest failed", log.ShortError(err), log.ZapRedactReflect("meta", meta),
log.ZapRedactReflect("region", region))
log.L().Warn("ingest failed", log.ShortError(err), logutil.SSTMetas(ingestMetas),
logutil.Region(region.Region), logutil.Leader(region.Leader))
errCnt++
continue
}
var retryTy retryType
var newRegion *split.RegionInfo
retryTy, newRegion, err = local.isIngestRetryable(ctx, resp, region, meta)
retryTy, newRegion, err = local.isIngestRetryable(ctx, resp, region, ingestMetas)
if common.IsContextCanceledError(err) {
return nil, err
}
Expand All @@ -1237,8 +1295,8 @@ loopWrite:
}
switch retryTy {
case retryNone:
log.L().Warn("ingest failed noretry", log.ShortError(err), log.ZapRedactReflect("meta", meta),
log.ZapRedactReflect("region", region))
log.L().Warn("ingest failed noretry", log.ShortError(err), logutil.SSTMetas(ingestMetas),
logutil.Region(region.Region), logutil.Leader(region.Leader))
// met non-retryable error retry whole Write procedure
return remainRange, err
case retryWrite:
Expand Down Expand Up @@ -1575,7 +1633,7 @@ func (local *local) isIngestRetryable(
ctx context.Context,
resp *sst.IngestResponse,
region *split.RegionInfo,
meta *sst.SSTMeta,
metas []*sst.SSTMeta,
) (retryType, *split.RegionInfo, error) {
if resp.GetError() == nil {
return retryNone, nil, nil
Expand Down Expand Up @@ -1624,7 +1682,7 @@ func (local *local) isIngestRetryable(
if currentRegions := errPb.GetEpochNotMatch().GetCurrentRegions(); currentRegions != nil {
var currentRegion *metapb.Region
for _, r := range currentRegions {
if insideRegion(r, meta) {
if insideRegion(r, metas) {
currentRegion = r
break
}
Expand Down
Loading

0 comments on commit e81f002

Please sign in to comment.