diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index c034e6bdb2b3c..9524ab5febc2b 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -103,6 +103,7 @@ go_test( "//br/pkg/lightning/glue", "//br/pkg/lightning/log", "//br/pkg/lightning/mydump", + "//br/pkg/lightning/worker", "//br/pkg/membuf", "//br/pkg/mock", "//br/pkg/pdutil", diff --git a/br/pkg/lightning/backend/local/engine_test.go b/br/pkg/lightning/backend/local/engine_test.go index c7ffe04b95285..eae0225bb519a 100644 --- a/br/pkg/lightning/backend/local/engine_test.go +++ b/br/pkg/lightning/backend/local/engine_test.go @@ -31,8 +31,17 @@ import ( "github.com/stretchr/testify/require" ) -func TestIngestSSTWithClosedEngine(t *testing.T) { +func makePebbleDB(t *testing.T, opt *pebble.Options) (*pebble.DB, string) { dir := t.TempDir() + db, err := pebble.Open(path.Join(dir, "test"), opt) + require.NoError(t, err) + tmpPath := filepath.Join(dir, "test.sst") + err = os.Mkdir(tmpPath, 0o755) + require.NoError(t, err) + return db, tmpPath +} + +func TestIngestSSTWithClosedEngine(t *testing.T) { opt := &pebble.Options{ MemTableSize: 1024 * 1024, MaxConcurrentCompactions: 16, @@ -41,11 +50,7 @@ func TestIngestSSTWithClosedEngine(t *testing.T) { DisableWAL: true, ReadOnly: false, } - db, err := pebble.Open(filepath.Join(dir, "test"), opt) - require.NoError(t, err) - tmpPath := filepath.Join(dir, "test.sst") - err = os.Mkdir(tmpPath, 0o755) - require.NoError(t, err) + db, tmpPath := makePebbleDB(t, opt) _, engineUUID := backend.MakeUUID("ww", 0) engineCtx, cancel := context.WithCancel(context.Background()) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 6cbf763037262..eea3a734b0176 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -92,6 +92,7 @@ const ( gRPCKeepAliveTime = 10 * time.Minute gRPCKeepAliveTimeout = 5 * time.Minute gRPCBackOffMaxDelay = 10 * time.Minute + writeStallSleepTime = 10 * time.Second // The max ranges count in a batch to split and scatter. maxBatchSplitRanges = 4096 @@ -383,6 +384,12 @@ type local struct { encBuilder backend.EncodingBuilder targetInfoGetter backend.TargetInfoGetter + + // When TiKV is in normal mode, ingesting too many SSTs will cause TiKV write stall. + // To avoid this, we should check write stall before ingesting SSTs. Note that, we + // must check both leader node and followers in client side, because followers will + // not check write stall as long as ingest command is accepted by leader. + shouldCheckWriteStall bool } func openDuplicateDB(storeDir string) (*pebble.DB, error) { @@ -506,6 +513,7 @@ func NewLocalBackend( logger: log.FromContext(ctx), encBuilder: NewEncodingBuilder(ctx), targetInfoGetter: NewTargetInfoGetter(tls, g, cfg.TiDB.PdAddr), + shouldCheckWriteStall: cfg.Cron.SwitchMode.Duration == 0, } if m, ok := metric.FromContext(ctx); ok { local.metrics = m @@ -1151,6 +1159,25 @@ func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *sp return resp, errors.Trace(err) } + if local.shouldCheckWriteStall { + for { + maybeWriteStall, err := local.checkWriteStall(ctx, region) + if err != nil { + return nil, err + } + if !maybeWriteStall { + break + } + log.FromContext(ctx).Warn("ingest maybe cause write stall, sleep and retry", + zap.Duration("duration", writeStallSleepTime)) + select { + case <-time.After(writeStallSleepTime): + case <-ctx.Done(): + return nil, errors.Trace(ctx.Err()) + } + } + } + req := &sst.MultiIngestRequest{ Context: reqCtx, Ssts: metas, @@ -1159,6 +1186,23 @@ func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *sp return resp, errors.Trace(err) } +func (local *local) checkWriteStall(ctx context.Context, region *split.RegionInfo) (bool, error) { + for _, peer := range region.Region.GetPeers() { + cli, err := local.getImportClient(ctx, peer.StoreId) + if err != nil { + return false, errors.Trace(err) + } + resp, err := cli.MultiIngest(ctx, &sst.MultiIngestRequest{}) + if err != nil { + return false, errors.Trace(err) + } + if resp.Error != nil && resp.Error.ServerIsBusy != nil { + return true, nil + } + } + return false, nil +} + func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit int64, keysLimit int64) []Range { ranges := make([]Range, 0, sizeProps.totalSize/uint64(sizeLimit)) curSize := uint64(0) diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 9019ebaf6f62e..0913aa9b86801 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -18,10 +18,10 @@ import ( "bytes" "context" "encoding/binary" + "fmt" "io" "math" "math/rand" - "os" "path/filepath" "sort" "strings" @@ -42,6 +42,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/log" + "github.com/pingcap/tidb/br/pkg/lightning/worker" "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/restore/split" @@ -248,8 +249,6 @@ func TestRangeProperties(t *testing.T) { } func TestRangePropertiesWithPebble(t *testing.T) { - dir := t.TempDir() - sizeDistance := uint64(500) keysDistance := uint64(20) opt := &pebble.Options{ @@ -270,8 +269,7 @@ func TestRangePropertiesWithPebble(t *testing.T) { }, }, } - db, err := pebble.Open(filepath.Join(dir, "test"), opt) - require.NoError(t, err) + db, _ := makePebbleDB(t, opt) defer db.Close() // local collector @@ -288,7 +286,7 @@ func TestRangePropertiesWithPebble(t *testing.T) { key := make([]byte, 8) valueLen := rand.Intn(50) binary.BigEndian.PutUint64(key, uint64(i*100+j)) - err = wb.Set(key, value[:valueLen], writeOpt) + err := wb.Set(key, value[:valueLen], writeOpt) require.NoError(t, err) err = collector.Add(pebble.InternalKey{UserKey: key, Trailer: pebble.InternalKeyKindSet}, value[:valueLen]) require.NoError(t, err) @@ -315,7 +313,6 @@ func TestRangePropertiesWithPebble(t *testing.T) { } func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) { - dir := t.TempDir() opt := &pebble.Options{ MemTableSize: 1024 * 1024, MaxConcurrentCompactions: 16, @@ -324,12 +321,8 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) { DisableWAL: true, ReadOnly: false, } - db, err := pebble.Open(filepath.Join(dir, "test"), opt) - require.NoError(t, err) + db, tmpPath := makePebbleDB(t, opt) defer db.Close() - tmpPath := filepath.Join(dir, "test.sst") - err = os.Mkdir(tmpPath, 0o755) - require.NoError(t, err) _, engineUUID := backend.MakeUUID("ww", 0) engineCtx, cancel := context.WithCancel(context.Background()) @@ -575,7 +568,6 @@ func (i testIngester) ingest([]*sstMeta) error { } func TestLocalIngestLoop(t *testing.T) { - dir := t.TempDir() opt := &pebble.Options{ MemTableSize: 1024 * 1024, MaxConcurrentCompactions: 16, @@ -584,18 +576,14 @@ func TestLocalIngestLoop(t *testing.T) { DisableWAL: true, ReadOnly: false, } - db, err := pebble.Open(filepath.Join(dir, "test"), opt) - require.NoError(t, err) + db, tmpPath := makePebbleDB(t, opt) defer db.Close() - tmpPath := filepath.Join(dir, "test.sst") - err = os.Mkdir(tmpPath, 0o755) - require.NoError(t, err) _, engineUUID := backend.MakeUUID("ww", 0) engineCtx, cancel := context.WithCancel(context.Background()) f := Engine{ db: db, UUID: engineUUID, - sstDir: "", + sstDir: tmpPath, ctx: engineCtx, cancel: cancel, sstMetasChan: make(chan metaOrFlush, 64), @@ -648,7 +636,7 @@ func TestLocalIngestLoop(t *testing.T) { wg.Wait() f.mutex.RLock() - err = f.flushEngineWithoutLock(engineCtx) + err := f.flushEngineWithoutLock(engineCtx) require.NoError(t, err) f.mutex.RUnlock() @@ -743,7 +731,6 @@ func TestFilterOverlapRange(t *testing.T) { } func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) { - dir := t.TempDir() opt := &pebble.Options{ MemTableSize: 1024 * 1024, MaxConcurrentCompactions: 16, @@ -752,12 +739,8 @@ func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) { DisableWAL: true, ReadOnly: false, } - db, err := pebble.Open(filepath.Join(dir, "test"), opt) - require.NoError(t, err) + db, tmpPath := makePebbleDB(t, opt) defer db.Close() - tmpPath := filepath.Join(dir, "test.sst") - err = os.Mkdir(tmpPath, 0o755) - require.NoError(t, err) _, engineUUID := backend.MakeUUID("ww", 0) engineCtx, cancel := context.WithCancel(context.Background()) @@ -848,49 +831,90 @@ func TestMergeSSTsDuplicated(t *testing.T) { type mockPdClient struct { pd.Client - stores []*metapb.Store + stores []*metapb.Store + regions []*pd.Region } func (c *mockPdClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) { return c.stores, nil } +func (c *mockPdClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*pd.Region, error) { + return c.regions, nil +} + type mockGrpcErr struct{} func (e mockGrpcErr) GRPCStatus() *status.Status { - return status.New(codes.Unimplemented, "unimplmented") + return status.New(codes.Unimplemented, "unimplemented") } func (e mockGrpcErr) Error() string { - return "unimplmented" + return "unimplemented" } type mockImportClient struct { sst.ImportSSTClient store *metapb.Store + resp *sst.IngestResponse err error retry int cnt int multiIngestCheckFn func(s *metapb.Store) bool + apiInvokeRecorder map[string][]uint64 +} + +func newMockImportClient() *mockImportClient { + return &mockImportClient{ + multiIngestCheckFn: func(s *metapb.Store) bool { + return true + }, + } } func (c *mockImportClient) MultiIngest(context.Context, *sst.MultiIngestRequest, ...grpc.CallOption) (*sst.IngestResponse, error) { defer func() { c.cnt++ }() - if c.cnt < c.retry && c.err != nil { - return nil, c.err + if c.apiInvokeRecorder != nil { + c.apiInvokeRecorder["MultiIngest"] = append(c.apiInvokeRecorder["MultiIngest"], c.store.GetId()) + } + if c.cnt < c.retry && (c.err != nil || c.resp != nil) { + return c.resp, c.err } if !c.multiIngestCheckFn(c.store) { return nil, mockGrpcErr{} } - return nil, nil + return &sst.IngestResponse{}, nil +} + +type mockWriteClient struct { + sst.ImportSST_WriteClient + writeResp *sst.WriteResponse +} + +func (m mockWriteClient) Send(request *sst.WriteRequest) error { + return nil +} + +func (m mockWriteClient) CloseAndRecv() (*sst.WriteResponse, error) { + return m.writeResp, nil +} + +func (c *mockImportClient) Write(ctx context.Context, opts ...grpc.CallOption) (sst.ImportSST_WriteClient, error) { + if c.apiInvokeRecorder != nil { + c.apiInvokeRecorder["Write"] = append(c.apiInvokeRecorder["Write"], c.store.GetId()) + } + return mockWriteClient{writeResp: &sst.WriteResponse{Metas: []*sst.SSTMeta{ + {}, {}, {}, + }}}, nil } type mockImportClientFactory struct { - stores []*metapb.Store - createClientFn func(store *metapb.Store) sst.ImportSSTClient + stores []*metapb.Store + createClientFn func(store *metapb.Store) sst.ImportSSTClient + apiInvokeRecorder map[string][]uint64 } func (f *mockImportClientFactory) Create(_ context.Context, storeID uint64) (sst.ImportSSTClient, error) { @@ -899,7 +923,7 @@ func (f *mockImportClientFactory) Create(_ context.Context, storeID uint64) (sst return f.createClientFn(store), nil } } - return nil, errors.New("store not found") + return nil, fmt.Errorf("store %d not found", storeID) } func (f *mockImportClientFactory) Close() {} @@ -1231,3 +1255,75 @@ func TestLocalIsRetryableTiKVWriteError(t *testing.T) { require.True(t, l.isRetryableImportTiKVError(io.EOF)) require.True(t, l.isRetryableImportTiKVError(errors.Trace(io.EOF))) } + +func TestCheckPeersBusy(t *testing.T) { + ctx := context.Background() + pdCli := &mockPdClient{} + pdCtl := &pdutil.PdController{} + pdCtl.SetPDClient(pdCli) + + keys := [][]byte{[]byte(""), []byte("a"), []byte("b"), []byte("")} + splitCli := initTestSplitClient3Replica(keys, nil) + apiInvokeRecorder := map[string][]uint64{} + serverIsBusyResp := &sst.IngestResponse{ + Error: &errorpb.Error{ + ServerIsBusy: &errorpb.ServerIsBusy{}, + }} + + createTimeStore12 := 0 + local := &local{ + pdCtl: pdCtl, + splitCli: splitCli, + importClientFactory: &mockImportClientFactory{ + stores: []*metapb.Store{ + // region ["", "a") is not used, skip (1, 2, 3) + {Id: 11}, {Id: 12}, {Id: 13}, // region ["a", "b") + {Id: 21}, {Id: 22}, {Id: 23}, // region ["b", "") + }, + createClientFn: func(store *metapb.Store) sst.ImportSSTClient { + importCli := newMockImportClient() + importCli.store = store + importCli.apiInvokeRecorder = apiInvokeRecorder + if store.Id == 12 { + createTimeStore12++ + // the second time to checkWriteStall + if createTimeStore12 == 2 { + importCli.retry = 1 + importCli.resp = serverIsBusyResp + } + } + return importCli + }, + }, + logger: log.L(), + ingestConcurrency: worker.NewPool(ctx, 1, "ingest"), + writeLimiter: noopStoreWriteLimiter{}, + bufferPool: membuf.NewPool(), + supportMultiIngest: true, + shouldCheckWriteStall: true, + } + + db, tmpPath := makePebbleDB(t, nil) + _, engineUUID := backend.MakeUUID("ww", 0) + engineCtx, cancel := context.WithCancel(context.Background()) + f := &Engine{ + db: db, + UUID: engineUUID, + sstDir: tmpPath, + ctx: engineCtx, + cancel: cancel, + sstMetasChan: make(chan metaOrFlush, 64), + keyAdapter: noopKeyAdapter{}, + logger: log.L(), + } + err := f.db.Set([]byte("a"), []byte("a"), nil) + require.NoError(t, err) + err = f.db.Set([]byte("b"), []byte("b"), nil) + require.NoError(t, err) + err = local.writeAndIngestByRange(ctx, f, []byte("a"), []byte("c"), 0, 0) + require.NoError(t, err) + + require.Equal(t, []uint64{11, 12, 13, 21, 22, 23}, apiInvokeRecorder["Write"]) + // store 12 has a follower busy, so it will cause region peers (11, 12, 13) retry once + require.Equal(t, []uint64{11, 12, 11, 12, 13, 11, 21, 22, 23, 21}, apiInvokeRecorder["MultiIngest"]) +} diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index 6cbf7f2f14808..023fade304fae 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -47,7 +47,7 @@ func init() { splitRetryTimes = 2 } -type testClient struct { +type testSplitClient struct { mu sync.RWMutex stores map[uint64]*metapb.Store regions map[uint64]*split.RegionInfo @@ -57,17 +57,17 @@ type testClient struct { hook clientHook } -func newTestClient( +func newTestSplitClient( stores map[uint64]*metapb.Store, regions map[uint64]*split.RegionInfo, nextRegionID uint64, hook clientHook, -) *testClient { +) *testSplitClient { regionsInfo := &pdtypes.RegionTree{} for _, regionInfo := range regions { regionsInfo.SetRegion(pdtypes.NewRegionInfo(regionInfo.Region, regionInfo.Leader)) } - return &testClient{ + return &testSplitClient{ stores: stores, regions: regions, regionsInfo: regionsInfo, @@ -77,17 +77,17 @@ func newTestClient( } // ScatterRegions scatters regions in a batch. -func (c *testClient) ScatterRegions(ctx context.Context, regionInfo []*split.RegionInfo) error { +func (c *testSplitClient) ScatterRegions(ctx context.Context, regionInfo []*split.RegionInfo) error { return nil } -func (c *testClient) GetAllRegions() map[uint64]*split.RegionInfo { +func (c *testSplitClient) GetAllRegions() map[uint64]*split.RegionInfo { c.mu.RLock() defer c.mu.RUnlock() return c.regions } -func (c *testClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) { +func (c *testSplitClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) { c.mu.RLock() defer c.mu.RUnlock() store, ok := c.stores[storeID] @@ -97,19 +97,18 @@ func (c *testClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Stor return store, nil } -func (c *testClient) GetRegion(ctx context.Context, key []byte) (*split.RegionInfo, error) { +func (c *testSplitClient) GetRegion(ctx context.Context, key []byte) (*split.RegionInfo, error) { c.mu.RLock() defer c.mu.RUnlock() for _, region := range c.regions { - if bytes.Compare(key, region.Region.StartKey) >= 0 && - (len(region.Region.EndKey) == 0 || bytes.Compare(key, region.Region.EndKey) < 0) { + if bytes.Compare(key, region.Region.StartKey) >= 0 && beforeEnd(key, region.Region.EndKey) { return region, nil } } return nil, errors.Errorf("region not found: key=%s", string(key)) } -func (c *testClient) GetRegionByID(ctx context.Context, regionID uint64) (*split.RegionInfo, error) { +func (c *testSplitClient) GetRegionByID(ctx context.Context, regionID uint64) (*split.RegionInfo, error) { c.mu.RLock() defer c.mu.RUnlock() region, ok := c.regions[regionID] @@ -119,7 +118,7 @@ func (c *testClient) GetRegionByID(ctx context.Context, regionID uint64) (*split return region, nil } -func (c *testClient) SplitRegion( +func (c *testSplitClient) SplitRegion( ctx context.Context, regionInfo *split.RegionInfo, key []byte, @@ -130,7 +129,7 @@ func (c *testClient) SplitRegion( splitKey := codec.EncodeBytes([]byte{}, key) for _, region := range c.regions { if bytes.Compare(splitKey, region.Region.StartKey) >= 0 && - (len(region.Region.EndKey) == 0 || bytes.Compare(splitKey, region.Region.EndKey) < 0) { + beforeEnd(splitKey, region.Region.EndKey) { target = region } } @@ -159,7 +158,7 @@ func (c *testClient) SplitRegion( return newRegion, nil } -func (c *testClient) BatchSplitRegionsWithOrigin( +func (c *testSplitClient) BatchSplitRegionsWithOrigin( ctx context.Context, regionInfo *split.RegionInfo, keys [][]byte, ) (*split.RegionInfo, []*split.RegionInfo, error) { c.mu.Lock() @@ -234,24 +233,24 @@ func (c *testClient) BatchSplitRegionsWithOrigin( return target, newRegions, err } -func (c *testClient) BatchSplitRegions( +func (c *testSplitClient) BatchSplitRegions( ctx context.Context, regionInfo *split.RegionInfo, keys [][]byte, ) ([]*split.RegionInfo, error) { _, newRegions, err := c.BatchSplitRegionsWithOrigin(ctx, regionInfo, keys) return newRegions, err } -func (c *testClient) ScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) error { +func (c *testSplitClient) ScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) error { return nil } -func (c *testClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) { +func (c *testSplitClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) { return &pdpb.GetOperatorResponse{ Header: new(pdpb.ResponseHeader), }, nil } -func (c *testClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*split.RegionInfo, error) { +func (c *testSplitClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*split.RegionInfo, error) { if c.hook != nil { key, endKey, limit = c.hook.BeforeScanRegions(ctx, key, endKey, limit) } @@ -272,19 +271,19 @@ func (c *testClient) ScanRegions(ctx context.Context, key, endKey []byte, limit return regions, err } -func (c *testClient) GetPlacementRule(ctx context.Context, groupID, ruleID string) (r pdtypes.Rule, err error) { +func (c *testSplitClient) GetPlacementRule(ctx context.Context, groupID, ruleID string) (r pdtypes.Rule, err error) { return } -func (c *testClient) SetPlacementRule(ctx context.Context, rule pdtypes.Rule) error { +func (c *testSplitClient) SetPlacementRule(ctx context.Context, rule pdtypes.Rule) error { return nil } -func (c *testClient) DeletePlacementRule(ctx context.Context, groupID, ruleID string) error { +func (c *testSplitClient) DeletePlacementRule(ctx context.Context, groupID, ruleID string) error { return nil } -func (c *testClient) SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error { +func (c *testSplitClient) SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error { return nil } @@ -305,7 +304,7 @@ func cloneRegion(region *split.RegionInfo) *split.RegionInfo { // For keys ["", "aay", "bba", "bbh", "cca", ""], the key ranges of // regions are [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ). -func initTestClient(keys [][]byte, hook clientHook) *testClient { +func initTestSplitClient(keys [][]byte, hook clientHook) *testSplitClient { peers := make([]*metapb.Peer, 1) peers[0] = &metapb.Peer{ Id: 1, @@ -329,13 +328,56 @@ func initTestClient(keys [][]byte, hook clientHook) *testClient { EndKey: endKey, RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, }, + Leader: peers[0], } } stores := make(map[uint64]*metapb.Store) stores[1] = &metapb.Store{ Id: 1, } - return newTestClient(stores, regions, uint64(len(keys)), hook) + return newTestSplitClient(stores, regions, uint64(len(keys)), hook) +} + +// initTestSplitClient3Replica will create a client that each region has 3 replicas, and their IDs and StoreIDs are +// (1, 2, 3), (11, 12, 13), ... +// For keys ["", "aay", "bba", "bbh", "cca", ""], the key ranges of +// region ranges are [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ). +func initTestSplitClient3Replica(keys [][]byte, hook clientHook) *testSplitClient { + regions := make(map[uint64]*split.RegionInfo) + stores := make(map[uint64]*metapb.Store) + for i := uint64(1); i < uint64(len(keys)); i++ { + startKey := keys[i-1] + if len(startKey) != 0 { + startKey = codec.EncodeBytes([]byte{}, startKey) + } + endKey := keys[i] + if len(endKey) != 0 { + endKey = codec.EncodeBytes([]byte{}, endKey) + } + baseID := (i-1)*10 + 1 + peers := make([]*metapb.Peer, 3) + for j := 0; j < 3; j++ { + peers[j] = &metapb.Peer{ + Id: baseID + uint64(j), + StoreId: baseID + uint64(j), + } + } + + regions[baseID] = &split.RegionInfo{ + Region: &metapb.Region{ + Id: baseID, + Peers: peers, + StartKey: startKey, + EndKey: endKey, + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, + }, + Leader: peers[0], + } + stores[baseID] = &metapb.Store{ + Id: baseID, + } + } + return newTestSplitClient(stores, regions, uint64(len(keys)), hook) } func checkRegionRanges(t *testing.T, regions []*split.RegionInfo, keys [][]byte) { @@ -376,7 +418,7 @@ func (h *noopHook) AfterScanRegions(res []*split.RegionInfo, err error) ([]*spli type batchSplitHook interface { setup(t *testing.T) func() - check(t *testing.T, cli *testClient) + check(t *testing.T, cli *testSplitClient) } type defaultHook struct{} @@ -392,7 +434,7 @@ func (d defaultHook) setup(t *testing.T) func() { } } -func (d defaultHook) check(t *testing.T, cli *testClient) { +func (d defaultHook) check(t *testing.T, cli *testSplitClient) { // so with a batch split size of 4, there will be 7 time batch split // 1. region: [aay, bba), keys: [b, ba, bb] // 2. region: [bbh, cca), keys: [bc, bd, be, bf] @@ -414,7 +456,7 @@ func doTestBatchSplitRegionByRanges(ctx context.Context, t *testing.T, hook clie defer deferFunc() keys := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca"), []byte("")} - client := initTestClient(keys, hook) + client := initTestSplitClient(keys, hook) local := &local{ splitCli: client, g: glue.NewExternalTiDBGlue(nil, mysql.ModeNone), @@ -479,7 +521,7 @@ func (h batchSizeHook) setup(t *testing.T) func() { } } -func (h batchSizeHook) check(t *testing.T, cli *testClient) { +func (h batchSizeHook) check(t *testing.T, cli *testSplitClient) { // so with a batch split key size of 6, there will be 9 time batch split // 1. region: [aay, bba), keys: [b, ba, bb] // 2. region: [bbh, cca), keys: [bc, bd, be] @@ -583,7 +625,7 @@ func TestSplitAndScatterRegionInBatches(t *testing.T) { defer deferFunc() keys := [][]byte{[]byte(""), []byte("a"), []byte("b"), []byte("")} - client := initTestClient(keys, nil) + client := initTestSplitClient(keys, nil) local := &local{ splitCli: client, g: glue.NewExternalTiDBGlue(nil, mysql.ModeNone), @@ -670,7 +712,7 @@ func doTestBatchSplitByRangesWithClusteredIndex(t *testing.T, hook clientHook) { keys = append(keys, key) } keys = append(keys, tableEndKey, []byte("")) - client := initTestClient(keys, hook) + client := initTestSplitClient(keys, hook) local := &local{ splitCli: client, g: glue.NewExternalTiDBGlue(nil, mysql.ModeNone), diff --git a/ddl/ingest/config.go b/ddl/ingest/config.go index 7fd251a361939..f611369decae0 100644 --- a/ddl/ingest/config.go +++ b/ddl/ingest/config.go @@ -58,6 +58,8 @@ func generateLightningConfig(memRoot MemRoot, jobID int64, unique bool) (*config cfg.Security.CAPath = tidbCfg.Security.ClusterSSLCA cfg.Security.CertPath = tidbCfg.Security.ClusterSSLCert cfg.Security.KeyPath = tidbCfg.Security.ClusterSSLKey + // in DDL scenario, we don't switch import mode + cfg.Cron.SwitchMode = config.Duration{Duration: 0} return cfg, err }