From 6618c4d2cd1757dc93fe2b3b6ed11d11e147e640 Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Thu, 22 Dec 2022 22:25:41 +0800 Subject: [PATCH 1/7] lightning: check write stall when switch-mode is disabled --- br/pkg/lightning/backend/local/local.go | 44 +++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index e32606207082e..cc88fd6a89483 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -91,6 +91,7 @@ const ( gRPCKeepAliveTime = 10 * time.Minute gRPCKeepAliveTimeout = 5 * time.Minute gRPCBackOffMaxDelay = 10 * time.Minute + writeStallSleepTime = 10 * time.Second // The max ranges count in a batch to split and scatter. maxBatchSplitRanges = 4096 @@ -381,6 +382,12 @@ type local struct { encBuilder backend.EncodingBuilder targetInfoGetter backend.TargetInfoGetter + + // When TiKV is in normal mode, ingesting too many SSTs will cause TiKV write stall. + // To avoid this, we should check write stall before ingesting SSTs. Note that, we + // must check both leader node and followers in client side, because followers will + // not check write stall as long as ingest command is accepted by leader. + shouldCheckWriteStall bool } func openDuplicateDB(storeDir string) (*pebble.DB, error) { @@ -503,6 +510,7 @@ func NewLocalBackend( logger: log.FromContext(ctx), encBuilder: NewEncodingBuilder(ctx), targetInfoGetter: NewTargetInfoGetter(tls, g, cfg.TiDB.PdAddr), + shouldCheckWriteStall: cfg.Cron.SwitchMode.Duration == 0, } if m, ok := metric.FromContext(ctx); ok { local.metrics = m @@ -1146,6 +1154,25 @@ func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *sp return resp, errors.Trace(err) } + if local.shouldCheckWriteStall { + for { + maybeWriteStall, err := local.checkWriteStall(ctx, region) + if err != nil { + return nil, err + } + if !maybeWriteStall { + break + } + log.FromContext(ctx).Warn("ingest maybe cause write stall, sleep and retry", + zap.Duration("duration", writeStallSleepTime)) + select { + case <-time.After(writeStallSleepTime): + case <-ctx.Done(): + return nil, errors.Trace(ctx.Err()) + } + } + } + req := &sst.MultiIngestRequest{ Context: reqCtx, Ssts: metas, @@ -1154,6 +1181,23 @@ func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *sp return resp, errors.Trace(err) } +func (local *local) checkWriteStall(ctx context.Context, region *split.RegionInfo) (bool, error) { + for _, peer := range region.Region.GetPeers() { + cli, err := local.getImportClient(ctx, peer.StoreId) + if err != nil { + return false, errors.Trace(err) + } + resp, err := cli.MultiIngest(ctx, &sst.MultiIngestRequest{}) + if err != nil { + return false, errors.Trace(err) + } + if resp.Error != nil && resp.Error.ServerIsBusy != nil { + return true, nil + } + } + return false, nil +} + func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit int64, keysLimit int64) []Range { ranges := make([]Range, 0, sizeProps.totalSize/uint64(sizeLimit)) curSize := uint64(0) From 76e317e1962133b01b9afd5432db213acde611a9 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 29 Dec 2022 14:46:46 +0800 Subject: [PATCH 2/7] lightning: add a mock unit test Signed-off-by: lance6716 --- br/pkg/lightning/backend/local/engine_test.go | 17 +- br/pkg/lightning/backend/local/local_test.go | 174 ++++++++++++++---- .../backend/local/localhelper_test.go | 102 +++++++--- 3 files changed, 225 insertions(+), 68 deletions(-) diff --git a/br/pkg/lightning/backend/local/engine_test.go b/br/pkg/lightning/backend/local/engine_test.go index c7ffe04b95285..eae0225bb519a 100644 --- a/br/pkg/lightning/backend/local/engine_test.go +++ b/br/pkg/lightning/backend/local/engine_test.go @@ -31,8 +31,17 @@ import ( "github.com/stretchr/testify/require" ) -func TestIngestSSTWithClosedEngine(t *testing.T) { +func makePebbleDB(t *testing.T, opt *pebble.Options) (*pebble.DB, string) { dir := t.TempDir() + db, err := pebble.Open(path.Join(dir, "test"), opt) + require.NoError(t, err) + tmpPath := filepath.Join(dir, "test.sst") + err = os.Mkdir(tmpPath, 0o755) + require.NoError(t, err) + return db, tmpPath +} + +func TestIngestSSTWithClosedEngine(t *testing.T) { opt := &pebble.Options{ MemTableSize: 1024 * 1024, MaxConcurrentCompactions: 16, @@ -41,11 +50,7 @@ func TestIngestSSTWithClosedEngine(t *testing.T) { DisableWAL: true, ReadOnly: false, } - db, err := pebble.Open(filepath.Join(dir, "test"), opt) - require.NoError(t, err) - tmpPath := filepath.Join(dir, "test.sst") - err = os.Mkdir(tmpPath, 0o755) - require.NoError(t, err) + db, tmpPath := makePebbleDB(t, opt) _, engineUUID := backend.MakeUUID("ww", 0) engineCtx, cancel := context.WithCancel(context.Background()) diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 1a399552becf9..3d49e169337a7 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -18,10 +18,10 @@ import ( "bytes" "context" "encoding/binary" + "fmt" "io" "math" "math/rand" - "os" "path/filepath" "sort" "strings" @@ -42,6 +42,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/log" + "github.com/pingcap/tidb/br/pkg/lightning/worker" "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/restore/split" @@ -57,6 +58,7 @@ import ( pd "github.com/tikv/pd/client" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) @@ -237,8 +239,6 @@ func TestRangeProperties(t *testing.T) { } func TestRangePropertiesWithPebble(t *testing.T) { - dir := t.TempDir() - sizeDistance := uint64(500) keysDistance := uint64(20) opt := &pebble.Options{ @@ -259,8 +259,7 @@ func TestRangePropertiesWithPebble(t *testing.T) { }, }, } - db, err := pebble.Open(filepath.Join(dir, "test"), opt) - require.NoError(t, err) + db, _ := makePebbleDB(t, opt) defer db.Close() // local collector @@ -277,7 +276,7 @@ func TestRangePropertiesWithPebble(t *testing.T) { key := make([]byte, 8) valueLen := rand.Intn(50) binary.BigEndian.PutUint64(key, uint64(i*100+j)) - err = wb.Set(key, value[:valueLen], writeOpt) + err := wb.Set(key, value[:valueLen], writeOpt) require.NoError(t, err) err = collector.Add(pebble.InternalKey{UserKey: key, Trailer: pebble.InternalKeyKindSet}, value[:valueLen]) require.NoError(t, err) @@ -304,7 +303,6 @@ func TestRangePropertiesWithPebble(t *testing.T) { } func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) { - dir := t.TempDir() opt := &pebble.Options{ MemTableSize: 1024 * 1024, MaxConcurrentCompactions: 16, @@ -313,12 +311,8 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) { DisableWAL: true, ReadOnly: false, } - db, err := pebble.Open(filepath.Join(dir, "test"), opt) - require.NoError(t, err) + db, tmpPath := makePebbleDB(t, opt) defer db.Close() - tmpPath := filepath.Join(dir, "test.sst") - err = os.Mkdir(tmpPath, 0o755) - require.NoError(t, err) _, engineUUID := backend.MakeUUID("ww", 0) engineCtx, cancel := context.WithCancel(context.Background()) @@ -564,7 +558,6 @@ func (i testIngester) ingest([]*sstMeta) error { } func TestLocalIngestLoop(t *testing.T) { - dir := t.TempDir() opt := &pebble.Options{ MemTableSize: 1024 * 1024, MaxConcurrentCompactions: 16, @@ -573,18 +566,14 @@ func TestLocalIngestLoop(t *testing.T) { DisableWAL: true, ReadOnly: false, } - db, err := pebble.Open(filepath.Join(dir, "test"), opt) - require.NoError(t, err) + db, tmpPath := makePebbleDB(t, opt) defer db.Close() - tmpPath := filepath.Join(dir, "test.sst") - err = os.Mkdir(tmpPath, 0o755) - require.NoError(t, err) _, engineUUID := backend.MakeUUID("ww", 0) engineCtx, cancel := context.WithCancel(context.Background()) f := Engine{ db: db, UUID: engineUUID, - sstDir: "", + sstDir: tmpPath, ctx: engineCtx, cancel: cancel, sstMetasChan: make(chan metaOrFlush, 64), @@ -637,7 +626,7 @@ func TestLocalIngestLoop(t *testing.T) { wg.Wait() f.mutex.RLock() - err = f.flushEngineWithoutLock(engineCtx) + err := f.flushEngineWithoutLock(engineCtx) require.NoError(t, err) f.mutex.RUnlock() @@ -732,7 +721,6 @@ func TestFilterOverlapRange(t *testing.T) { } func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) { - dir := t.TempDir() opt := &pebble.Options{ MemTableSize: 1024 * 1024, MaxConcurrentCompactions: 16, @@ -741,12 +729,8 @@ func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) { DisableWAL: true, ReadOnly: false, } - db, err := pebble.Open(filepath.Join(dir, "test"), opt) - require.NoError(t, err) + db, tmpPath := makePebbleDB(t, opt) defer db.Close() - tmpPath := filepath.Join(dir, "test.sst") - err = os.Mkdir(tmpPath, 0o755) - require.NoError(t, err) _, engineUUID := backend.MakeUUID("ww", 0) engineCtx, cancel := context.WithCancel(context.Background()) @@ -837,21 +821,26 @@ func TestMergeSSTsDuplicated(t *testing.T) { type mockPdClient struct { pd.Client - stores []*metapb.Store + stores []*metapb.Store + regions []*pd.Region } func (c *mockPdClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) { return c.stores, nil } +func (c *mockPdClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*pd.Region, error) { + return c.regions, nil +} + type mockGrpcErr struct{} func (e mockGrpcErr) GRPCStatus() *status.Status { - return status.New(codes.Unimplemented, "unimplmented") + return status.New(codes.Unimplemented, "unimplemented") } func (e mockGrpcErr) Error() string { - return "unimplmented" + return "unimplemented" } type mockImportClient struct { @@ -861,12 +850,24 @@ type mockImportClient struct { retry int cnt int multiIngestCheckFn func(s *metapb.Store) bool + apiInvokeRecorder map[string][]uint64 +} + +func newMockImportClient() *mockImportClient { + return &mockImportClient{ + multiIngestCheckFn: func(s *metapb.Store) bool { + return true + }, + } } func (c *mockImportClient) MultiIngest(context.Context, *sst.MultiIngestRequest, ...grpc.CallOption) (*sst.IngestResponse, error) { defer func() { c.cnt++ }() + if c.apiInvokeRecorder != nil { + c.apiInvokeRecorder["MultiIngest"] = append(c.apiInvokeRecorder["MultiIngest"], c.store.GetId()) + } if c.cnt < c.retry && c.err != nil { return nil, c.err } @@ -877,9 +878,61 @@ func (c *mockImportClient) MultiIngest(context.Context, *sst.MultiIngestRequest, return nil, nil } +type mockWriteClient struct { + writeResp *sst.WriteResponse +} + +func (m mockWriteClient) Send(request *sst.WriteRequest) error { + return nil +} + +func (m mockWriteClient) CloseAndRecv() (*sst.WriteResponse, error) { + return m.writeResp, nil +} + +func (m mockWriteClient) Header() (metadata.MD, error) { + //TODO implement me + panic("implement me") +} + +func (m mockWriteClient) Trailer() metadata.MD { + //TODO implement me + panic("implement me") +} + +func (m mockWriteClient) CloseSend() error { + //TODO implement me + panic("implement me") +} + +func (m mockWriteClient) Context() context.Context { + //TODO implement me + panic("implement me") +} + +func (m mockWriteClient) SendMsg(_ interface{}) error { + //TODO implement me + panic("implement me") +} + +func (m mockWriteClient) RecvMsg(_ interface{}) error { + //TODO implement me + panic("implement me") +} + +func (c *mockImportClient) Write(ctx context.Context, opts ...grpc.CallOption) (sst.ImportSST_WriteClient, error) { + if c.apiInvokeRecorder != nil { + c.apiInvokeRecorder["Write"] = append(c.apiInvokeRecorder["Write"], c.store.GetId()) + } + return mockWriteClient{writeResp: &sst.WriteResponse{Metas: []*sst.SSTMeta{ + {}, {}, {}, + }}}, nil +} + type mockImportClientFactory struct { - stores []*metapb.Store - createClientFn func(store *metapb.Store) sst.ImportSSTClient + stores []*metapb.Store + createClientFn func(store *metapb.Store) sst.ImportSSTClient + apiInvokeRecorder map[string][]uint64 } func (f *mockImportClientFactory) Create(_ context.Context, storeID uint64) (sst.ImportSSTClient, error) { @@ -888,7 +941,7 @@ func (f *mockImportClientFactory) Create(_ context.Context, storeID uint64) (sst return f.createClientFn(store), nil } } - return nil, errors.New("store not found") + return nil, errors.New(fmt.Sprintf("store %d not found", storeID)) } func (f *mockImportClientFactory) Close() {} @@ -1220,3 +1273,60 @@ func TestLocalIsRetryableTiKVWriteError(t *testing.T) { require.True(t, l.isRetryableImportTiKVError(io.EOF)) require.True(t, l.isRetryableImportTiKVError(errors.Trace(io.EOF))) } + +func TestServerIsBusy(t *testing.T) { + ctx := context.Background() + pdCli := &mockPdClient{} + pdCtl := &pdutil.PdController{} + pdCtl.SetPDClient(pdCli) + + keys := [][]byte{[]byte(""), []byte("a"), []byte("b"), []byte("")} + splitCli := initTestSplitClient3Replica(keys, nil) + apiInvokeRecorder := map[string][]uint64{} + + local := &local{ + pdCtl: pdCtl, + splitCli: splitCli, + importClientFactory: &mockImportClientFactory{ + stores: []*metapb.Store{ + // region ["", "a") is not used, skip (1, 2, 3) + {Id: 11}, {Id: 12}, {Id: 13}, // region ["a", "b") + {Id: 21}, {Id: 22}, {Id: 23}, // region ["b", "") + }, + createClientFn: func(store *metapb.Store) sst.ImportSSTClient { + importCli := newMockImportClient() + importCli.store = store + importCli.apiInvokeRecorder = apiInvokeRecorder + return importCli + }, + }, + logger: log.L(), + ingestConcurrency: worker.NewPool(ctx, 1, "ingest"), + writeLimiter: noopStoreWriteLimiter{}, + bufferPool: membuf.NewPool(), + supportMultiIngest: true, + } + + db, tmpPath := makePebbleDB(t, nil) + _, engineUUID := backend.MakeUUID("ww", 0) + engineCtx, cancel := context.WithCancel(context.Background()) + f := &Engine{ + db: db, + UUID: engineUUID, + sstDir: tmpPath, + ctx: engineCtx, + cancel: cancel, + sstMetasChan: make(chan metaOrFlush, 64), + keyAdapter: noopKeyAdapter{}, + logger: log.L(), + } + err := f.db.Set([]byte("a"), []byte("a"), nil) + require.NoError(t, err) + err = f.db.Set([]byte("b"), []byte("b"), nil) + require.NoError(t, err) + err = local.writeAndIngestByRange(ctx, f, []byte("a"), []byte("c"), 0, 0) + require.NoError(t, err) + + require.Equal(t, []uint64{11, 12, 13, 21, 22, 23}, apiInvokeRecorder["Write"]) + require.Equal(t, []uint64{11, 21}, apiInvokeRecorder["MultiIngest"]) +} diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index 6cbf7f2f14808..023fade304fae 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -47,7 +47,7 @@ func init() { splitRetryTimes = 2 } -type testClient struct { +type testSplitClient struct { mu sync.RWMutex stores map[uint64]*metapb.Store regions map[uint64]*split.RegionInfo @@ -57,17 +57,17 @@ type testClient struct { hook clientHook } -func newTestClient( +func newTestSplitClient( stores map[uint64]*metapb.Store, regions map[uint64]*split.RegionInfo, nextRegionID uint64, hook clientHook, -) *testClient { +) *testSplitClient { regionsInfo := &pdtypes.RegionTree{} for _, regionInfo := range regions { regionsInfo.SetRegion(pdtypes.NewRegionInfo(regionInfo.Region, regionInfo.Leader)) } - return &testClient{ + return &testSplitClient{ stores: stores, regions: regions, regionsInfo: regionsInfo, @@ -77,17 +77,17 @@ func newTestClient( } // ScatterRegions scatters regions in a batch. -func (c *testClient) ScatterRegions(ctx context.Context, regionInfo []*split.RegionInfo) error { +func (c *testSplitClient) ScatterRegions(ctx context.Context, regionInfo []*split.RegionInfo) error { return nil } -func (c *testClient) GetAllRegions() map[uint64]*split.RegionInfo { +func (c *testSplitClient) GetAllRegions() map[uint64]*split.RegionInfo { c.mu.RLock() defer c.mu.RUnlock() return c.regions } -func (c *testClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) { +func (c *testSplitClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) { c.mu.RLock() defer c.mu.RUnlock() store, ok := c.stores[storeID] @@ -97,19 +97,18 @@ func (c *testClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Stor return store, nil } -func (c *testClient) GetRegion(ctx context.Context, key []byte) (*split.RegionInfo, error) { +func (c *testSplitClient) GetRegion(ctx context.Context, key []byte) (*split.RegionInfo, error) { c.mu.RLock() defer c.mu.RUnlock() for _, region := range c.regions { - if bytes.Compare(key, region.Region.StartKey) >= 0 && - (len(region.Region.EndKey) == 0 || bytes.Compare(key, region.Region.EndKey) < 0) { + if bytes.Compare(key, region.Region.StartKey) >= 0 && beforeEnd(key, region.Region.EndKey) { return region, nil } } return nil, errors.Errorf("region not found: key=%s", string(key)) } -func (c *testClient) GetRegionByID(ctx context.Context, regionID uint64) (*split.RegionInfo, error) { +func (c *testSplitClient) GetRegionByID(ctx context.Context, regionID uint64) (*split.RegionInfo, error) { c.mu.RLock() defer c.mu.RUnlock() region, ok := c.regions[regionID] @@ -119,7 +118,7 @@ func (c *testClient) GetRegionByID(ctx context.Context, regionID uint64) (*split return region, nil } -func (c *testClient) SplitRegion( +func (c *testSplitClient) SplitRegion( ctx context.Context, regionInfo *split.RegionInfo, key []byte, @@ -130,7 +129,7 @@ func (c *testClient) SplitRegion( splitKey := codec.EncodeBytes([]byte{}, key) for _, region := range c.regions { if bytes.Compare(splitKey, region.Region.StartKey) >= 0 && - (len(region.Region.EndKey) == 0 || bytes.Compare(splitKey, region.Region.EndKey) < 0) { + beforeEnd(splitKey, region.Region.EndKey) { target = region } } @@ -159,7 +158,7 @@ func (c *testClient) SplitRegion( return newRegion, nil } -func (c *testClient) BatchSplitRegionsWithOrigin( +func (c *testSplitClient) BatchSplitRegionsWithOrigin( ctx context.Context, regionInfo *split.RegionInfo, keys [][]byte, ) (*split.RegionInfo, []*split.RegionInfo, error) { c.mu.Lock() @@ -234,24 +233,24 @@ func (c *testClient) BatchSplitRegionsWithOrigin( return target, newRegions, err } -func (c *testClient) BatchSplitRegions( +func (c *testSplitClient) BatchSplitRegions( ctx context.Context, regionInfo *split.RegionInfo, keys [][]byte, ) ([]*split.RegionInfo, error) { _, newRegions, err := c.BatchSplitRegionsWithOrigin(ctx, regionInfo, keys) return newRegions, err } -func (c *testClient) ScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) error { +func (c *testSplitClient) ScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) error { return nil } -func (c *testClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) { +func (c *testSplitClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) { return &pdpb.GetOperatorResponse{ Header: new(pdpb.ResponseHeader), }, nil } -func (c *testClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*split.RegionInfo, error) { +func (c *testSplitClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*split.RegionInfo, error) { if c.hook != nil { key, endKey, limit = c.hook.BeforeScanRegions(ctx, key, endKey, limit) } @@ -272,19 +271,19 @@ func (c *testClient) ScanRegions(ctx context.Context, key, endKey []byte, limit return regions, err } -func (c *testClient) GetPlacementRule(ctx context.Context, groupID, ruleID string) (r pdtypes.Rule, err error) { +func (c *testSplitClient) GetPlacementRule(ctx context.Context, groupID, ruleID string) (r pdtypes.Rule, err error) { return } -func (c *testClient) SetPlacementRule(ctx context.Context, rule pdtypes.Rule) error { +func (c *testSplitClient) SetPlacementRule(ctx context.Context, rule pdtypes.Rule) error { return nil } -func (c *testClient) DeletePlacementRule(ctx context.Context, groupID, ruleID string) error { +func (c *testSplitClient) DeletePlacementRule(ctx context.Context, groupID, ruleID string) error { return nil } -func (c *testClient) SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error { +func (c *testSplitClient) SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error { return nil } @@ -305,7 +304,7 @@ func cloneRegion(region *split.RegionInfo) *split.RegionInfo { // For keys ["", "aay", "bba", "bbh", "cca", ""], the key ranges of // regions are [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ). -func initTestClient(keys [][]byte, hook clientHook) *testClient { +func initTestSplitClient(keys [][]byte, hook clientHook) *testSplitClient { peers := make([]*metapb.Peer, 1) peers[0] = &metapb.Peer{ Id: 1, @@ -329,13 +328,56 @@ func initTestClient(keys [][]byte, hook clientHook) *testClient { EndKey: endKey, RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, }, + Leader: peers[0], } } stores := make(map[uint64]*metapb.Store) stores[1] = &metapb.Store{ Id: 1, } - return newTestClient(stores, regions, uint64(len(keys)), hook) + return newTestSplitClient(stores, regions, uint64(len(keys)), hook) +} + +// initTestSplitClient3Replica will create a client that each region has 3 replicas, and their IDs and StoreIDs are +// (1, 2, 3), (11, 12, 13), ... +// For keys ["", "aay", "bba", "bbh", "cca", ""], the key ranges of +// region ranges are [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ). +func initTestSplitClient3Replica(keys [][]byte, hook clientHook) *testSplitClient { + regions := make(map[uint64]*split.RegionInfo) + stores := make(map[uint64]*metapb.Store) + for i := uint64(1); i < uint64(len(keys)); i++ { + startKey := keys[i-1] + if len(startKey) != 0 { + startKey = codec.EncodeBytes([]byte{}, startKey) + } + endKey := keys[i] + if len(endKey) != 0 { + endKey = codec.EncodeBytes([]byte{}, endKey) + } + baseID := (i-1)*10 + 1 + peers := make([]*metapb.Peer, 3) + for j := 0; j < 3; j++ { + peers[j] = &metapb.Peer{ + Id: baseID + uint64(j), + StoreId: baseID + uint64(j), + } + } + + regions[baseID] = &split.RegionInfo{ + Region: &metapb.Region{ + Id: baseID, + Peers: peers, + StartKey: startKey, + EndKey: endKey, + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, + }, + Leader: peers[0], + } + stores[baseID] = &metapb.Store{ + Id: baseID, + } + } + return newTestSplitClient(stores, regions, uint64(len(keys)), hook) } func checkRegionRanges(t *testing.T, regions []*split.RegionInfo, keys [][]byte) { @@ -376,7 +418,7 @@ func (h *noopHook) AfterScanRegions(res []*split.RegionInfo, err error) ([]*spli type batchSplitHook interface { setup(t *testing.T) func() - check(t *testing.T, cli *testClient) + check(t *testing.T, cli *testSplitClient) } type defaultHook struct{} @@ -392,7 +434,7 @@ func (d defaultHook) setup(t *testing.T) func() { } } -func (d defaultHook) check(t *testing.T, cli *testClient) { +func (d defaultHook) check(t *testing.T, cli *testSplitClient) { // so with a batch split size of 4, there will be 7 time batch split // 1. region: [aay, bba), keys: [b, ba, bb] // 2. region: [bbh, cca), keys: [bc, bd, be, bf] @@ -414,7 +456,7 @@ func doTestBatchSplitRegionByRanges(ctx context.Context, t *testing.T, hook clie defer deferFunc() keys := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca"), []byte("")} - client := initTestClient(keys, hook) + client := initTestSplitClient(keys, hook) local := &local{ splitCli: client, g: glue.NewExternalTiDBGlue(nil, mysql.ModeNone), @@ -479,7 +521,7 @@ func (h batchSizeHook) setup(t *testing.T) func() { } } -func (h batchSizeHook) check(t *testing.T, cli *testClient) { +func (h batchSizeHook) check(t *testing.T, cli *testSplitClient) { // so with a batch split key size of 6, there will be 9 time batch split // 1. region: [aay, bba), keys: [b, ba, bb] // 2. region: [bbh, cca), keys: [bc, bd, be] @@ -583,7 +625,7 @@ func TestSplitAndScatterRegionInBatches(t *testing.T) { defer deferFunc() keys := [][]byte{[]byte(""), []byte("a"), []byte("b"), []byte("")} - client := initTestClient(keys, nil) + client := initTestSplitClient(keys, nil) local := &local{ splitCli: client, g: glue.NewExternalTiDBGlue(nil, mysql.ModeNone), @@ -670,7 +712,7 @@ func doTestBatchSplitByRangesWithClusteredIndex(t *testing.T, hook clientHook) { keys = append(keys, key) } keys = append(keys, tableEndKey, []byte("")) - client := initTestClient(keys, hook) + client := initTestSplitClient(keys, hook) local := &local{ splitCli: client, g: glue.NewExternalTiDBGlue(nil, mysql.ModeNone), From 90a3c7adb1c6569322b7e24bf58b796534945620 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 29 Dec 2022 14:53:19 +0800 Subject: [PATCH 3/7] add test Signed-off-by: lance6716 --- br/pkg/lightning/backend/local/local_test.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 3d49e169337a7..84768506e9eab 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -875,7 +875,7 @@ func (c *mockImportClient) MultiIngest(context.Context, *sst.MultiIngestRequest, if !c.multiIngestCheckFn(c.store) { return nil, mockGrpcErr{} } - return nil, nil + return &sst.IngestResponse{}, nil } type mockWriteClient struct { @@ -1300,11 +1300,12 @@ func TestServerIsBusy(t *testing.T) { return importCli }, }, - logger: log.L(), - ingestConcurrency: worker.NewPool(ctx, 1, "ingest"), - writeLimiter: noopStoreWriteLimiter{}, - bufferPool: membuf.NewPool(), - supportMultiIngest: true, + logger: log.L(), + ingestConcurrency: worker.NewPool(ctx, 1, "ingest"), + writeLimiter: noopStoreWriteLimiter{}, + bufferPool: membuf.NewPool(), + supportMultiIngest: true, + shouldCheckWriteStall: true, } db, tmpPath := makePebbleDB(t, nil) @@ -1328,5 +1329,5 @@ func TestServerIsBusy(t *testing.T) { require.NoError(t, err) require.Equal(t, []uint64{11, 12, 13, 21, 22, 23}, apiInvokeRecorder["Write"]) - require.Equal(t, []uint64{11, 21}, apiInvokeRecorder["MultiIngest"]) + require.Equal(t, []uint64{11, 12, 13, 11, 21, 22, 23, 21}, apiInvokeRecorder["MultiIngest"]) } From bc4dec4cd3c65ca97b71d492557aece326c5af65 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 29 Dec 2022 16:00:54 +0800 Subject: [PATCH 4/7] fix CI Signed-off-by: lance6716 --- br/pkg/lightning/backend/local/BUILD.bazel | 2 ++ br/pkg/lightning/backend/local/local_test.go | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index c034e6bdb2b3c..8a4e5670dfcf7 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -103,6 +103,7 @@ go_test( "//br/pkg/lightning/glue", "//br/pkg/lightning/log", "//br/pkg/lightning/mydump", + "//br/pkg/lightning/worker", "//br/pkg/membuf", "//br/pkg/mock", "//br/pkg/pdutil", @@ -139,6 +140,7 @@ go_test( "@com_github_tikv_pd_client//:client", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//metadata", "@org_golang_google_grpc//status", "@org_uber_go_atomic//:atomic", ], diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 84768506e9eab..d792c6dd6de51 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -941,7 +941,7 @@ func (f *mockImportClientFactory) Create(_ context.Context, storeID uint64) (sst return f.createClientFn(store), nil } } - return nil, errors.New(fmt.Sprintf("store %d not found", storeID)) + return nil, fmt.Errorf("store %d not found", storeID) } func (f *mockImportClientFactory) Close() {} @@ -1274,7 +1274,7 @@ func TestLocalIsRetryableTiKVWriteError(t *testing.T) { require.True(t, l.isRetryableImportTiKVError(errors.Trace(io.EOF))) } -func TestServerIsBusy(t *testing.T) { +func TestCheckPeersBusy(t *testing.T) { ctx := context.Background() pdCli := &mockPdClient{} pdCtl := &pdutil.PdController{} From 9c9d08761743b392eaafd6640da42fd30b023bed Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 29 Dec 2022 17:36:40 +0800 Subject: [PATCH 5/7] finish Signed-off-by: lance6716 --- br/pkg/lightning/backend/local/local_test.go | 21 +++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index d792c6dd6de51..3ab19957fb781 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -846,6 +846,7 @@ func (e mockGrpcErr) Error() string { type mockImportClient struct { sst.ImportSSTClient store *metapb.Store + resp *sst.IngestResponse err error retry int cnt int @@ -868,8 +869,8 @@ func (c *mockImportClient) MultiIngest(context.Context, *sst.MultiIngestRequest, if c.apiInvokeRecorder != nil { c.apiInvokeRecorder["MultiIngest"] = append(c.apiInvokeRecorder["MultiIngest"], c.store.GetId()) } - if c.cnt < c.retry && c.err != nil { - return nil, c.err + if c.cnt < c.retry && (c.err != nil || c.resp != nil) { + return c.resp, c.err } if !c.multiIngestCheckFn(c.store) { @@ -1283,7 +1284,12 @@ func TestCheckPeersBusy(t *testing.T) { keys := [][]byte{[]byte(""), []byte("a"), []byte("b"), []byte("")} splitCli := initTestSplitClient3Replica(keys, nil) apiInvokeRecorder := map[string][]uint64{} + serverIsBusyResp := &sst.IngestResponse{ + Error: &errorpb.Error{ + ServerIsBusy: &errorpb.ServerIsBusy{}, + }} + createTimeStore12 := 0 local := &local{ pdCtl: pdCtl, splitCli: splitCli, @@ -1297,6 +1303,14 @@ func TestCheckPeersBusy(t *testing.T) { importCli := newMockImportClient() importCli.store = store importCli.apiInvokeRecorder = apiInvokeRecorder + if store.Id == 12 { + createTimeStore12++ + // the second time to checkWriteStall + if createTimeStore12 == 2 { + importCli.retry = 1 + importCli.resp = serverIsBusyResp + } + } return importCli }, }, @@ -1329,5 +1343,6 @@ func TestCheckPeersBusy(t *testing.T) { require.NoError(t, err) require.Equal(t, []uint64{11, 12, 13, 21, 22, 23}, apiInvokeRecorder["Write"]) - require.Equal(t, []uint64{11, 12, 13, 11, 21, 22, 23, 21}, apiInvokeRecorder["MultiIngest"]) + // store 12 has a follower busy, so it will cause region peers (11, 12, 13) retry once + require.Equal(t, []uint64{11, 12, 11, 12, 13, 11, 21, 22, 23, 21}, apiInvokeRecorder["MultiIngest"]) } From bb07d3a2663c62a64ea6ab8ab1a0bfcdd89beacc Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 30 Dec 2022 11:06:19 +0800 Subject: [PATCH 6/7] address comments Signed-off-by: lance6716 --- br/pkg/lightning/backend/local/local_test.go | 32 +------------------- 1 file changed, 1 insertion(+), 31 deletions(-) diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 3ab19957fb781..22d6403d0a1df 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -58,7 +58,6 @@ import ( pd "github.com/tikv/pd/client" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) @@ -880,6 +879,7 @@ func (c *mockImportClient) MultiIngest(context.Context, *sst.MultiIngestRequest, } type mockWriteClient struct { + sst.ImportSST_WriteClient writeResp *sst.WriteResponse } @@ -891,36 +891,6 @@ func (m mockWriteClient) CloseAndRecv() (*sst.WriteResponse, error) { return m.writeResp, nil } -func (m mockWriteClient) Header() (metadata.MD, error) { - //TODO implement me - panic("implement me") -} - -func (m mockWriteClient) Trailer() metadata.MD { - //TODO implement me - panic("implement me") -} - -func (m mockWriteClient) CloseSend() error { - //TODO implement me - panic("implement me") -} - -func (m mockWriteClient) Context() context.Context { - //TODO implement me - panic("implement me") -} - -func (m mockWriteClient) SendMsg(_ interface{}) error { - //TODO implement me - panic("implement me") -} - -func (m mockWriteClient) RecvMsg(_ interface{}) error { - //TODO implement me - panic("implement me") -} - func (c *mockImportClient) Write(ctx context.Context, opts ...grpc.CallOption) (sst.ImportSST_WriteClient, error) { if c.apiInvokeRecorder != nil { c.apiInvokeRecorder["Write"] = append(c.apiInvokeRecorder["Write"], c.store.GetId()) From fca91a776c53b64864a1f83ef3ba27b2566cbbb8 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 30 Dec 2022 11:12:54 +0800 Subject: [PATCH 7/7] fix bazel Signed-off-by: lance6716 --- br/pkg/lightning/backend/local/BUILD.bazel | 1 - 1 file changed, 1 deletion(-) diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index 8a4e5670dfcf7..9524ab5febc2b 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -140,7 +140,6 @@ go_test( "@com_github_tikv_pd_client//:client", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//codes", - "@org_golang_google_grpc//metadata", "@org_golang_google_grpc//status", "@org_uber_go_atomic//:atomic", ],